#!/usr/bin/env python3 """ Anthropic → OpenAI API Translator Proxy Accepts Anthropic /v1/messages format, translates to OpenAI /v1/chat/completions, routes through Cloudflare proxy to NVIDIA API, translates response back. Handles: text, tool_use, tool_result, streaming SSE """ import os, json, logging, uuid, time, traceback from aiohttp import web, ClientSession, ClientTimeout log = logging.getLogger("anthropic-proxy") logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") NVIDIA_API_KEY = os.environ.get("NVIDIA_API_KEY", "") MODEL = os.environ.get("MODEL", "z-ai/glm-5.1") PROXY_URL = os.environ.get("PROXY_URL", "") PROXY_SECRET = os.environ.get("PROXY_SECRET", "") TARGET_URL = f"{PROXY_URL}/v1/chat/completions" if PROXY_URL else "https://integrate.api.nvidia.com/v1/chat/completions" REQUEST_COUNT = 0 def anthropic_tools_to_openai(tools: list) -> list: """Convert Anthropic tool definitions to OpenAI function tools.""" result = [] for t in tools: result.append({ "type": "function", "function": { "name": t.get("name", ""), "description": t.get("description", ""), "parameters": t.get("input_schema", t.get("parameters", {})) } }) return result def anthropic_messages_to_openai(messages: list) -> list: """Convert Anthropic message array to OpenAI message array.""" result = [] for msg in messages: role = msg.get("role", "user") content = msg.get("content", "") if isinstance(content, str): result.append({"role": role, "content": content}) continue if not isinstance(content, list): result.append({"role": role, "content": str(content)}) continue # Content is a list of blocks text_parts = [] tool_calls = [] tool_results = [] for block in content: btype = block.get("type", "") if btype == "text": text_parts.append(block.get("text", "")) elif btype == "tool_use": tool_calls.append({ "id": block.get("id", f"call_{uuid.uuid4().hex[:8]}"), "type": "function", "function": { "name": block.get("name", ""), "arguments": json.dumps(block.get("input", {}), ensure_ascii=False) } }) elif btype == "tool_result": # Flatten tool result content tc_content = block.get("content", "") if isinstance(tc_content, list): tc_text = "\n".join( b.get("text", "") for b in tc_content if b.get("type") == "text" ) or json.dumps(tc_content, ensure_ascii=False) else: tc_text = str(tc_content) tool_results.append({ "role": "tool", "tool_call_id": block.get("tool_use_id", ""), "content": tc_text }) # Build messages from blocks if role == "assistant": m = {"role": "assistant", "content": "\n".join(text_parts) if text_parts else ""} if tool_calls: m["tool_calls"] = tool_calls if not m["content"]: m["content"] = None result.append(m) elif tool_results: # Tool results become separate tool messages if text_parts: result.append({"role": "user", "content": "\n".join(text_parts)}) result.extend(tool_results) else: result.append({"role": role, "content": "\n".join(text_parts) or ""}) return result def anthropic_to_openai_request(body: dict) -> dict: """Convert full Anthropic /v1/messages request to OpenAI /v1/chat/completions.""" messages = [] # System message system = body.get("system", "") if isinstance(system, str) and system: messages.append({"role": "system", "content": system}) elif isinstance(system, list): text = "\n".join(b.get("text", "") for b in system if b.get("type") == "text") if text: messages.append({"role": "system", "content": text}) # Convert messages messages.extend(anthropic_messages_to_openai(body.get("messages", []))) oai = { "model": MODEL, "messages": messages, "max_tokens": body.get("max_tokens", 4096), "stream": body.get("stream", False), } if body.get("temperature") is not None: oai["temperature"] = body["temperature"] if body.get("top_p") is not None: oai["top_p"] = body["top_p"] # Tools tools = body.get("tools", []) if tools: oai["tools"] = anthropic_tools_to_openai(tools) oai["tool_choice"] = "auto" return oai def openai_to_anthropic_response(oai: dict, model_name: str) -> dict: """Convert OpenAI chat completion to Anthropic messages response.""" choices = oai.get("choices", []) if not choices: return { "id": f"msg_{uuid.uuid4().hex[:24]}", "type": "message", "role": "assistant", "content": [{"type": "text", "text": ""}], "model": model_name, "stop_reason": "end_turn", "stop_sequence": None, "usage": {"input_tokens": 0, "output_tokens": 0} } message = choices[0].get("message", {}) content = [] text = message.get("content", "") if text: content.append({"type": "text", "text": text}) for tc in message.get("tool_calls", []): fn = tc.get("function", {}) try: inp = json.loads(fn.get("arguments", "{}")) except json.JSONDecodeError: inp = {"raw": fn.get("arguments", "")} content.append({ "type": "tool_use", "id": tc.get("id", f"toolu_{uuid.uuid4().hex[:12]}"), "name": fn.get("name", ""), "input": inp }) if not content: content.append({"type": "text", "text": ""}) finish = choices[0].get("finish_reason", "stop") stop_map = {"stop": "end_turn", "tool_calls": "tool_use", "length": "max_tokens"} usage = oai.get("usage", {}) return { "id": f"msg_{uuid.uuid4().hex[:24]}", "type": "message", "role": "assistant", "content": content, "model": model_name, "stop_reason": stop_map.get(finish, "end_turn"), "stop_sequence": None, "usage": { "input_tokens": usage.get("prompt_tokens", 0), "output_tokens": usage.get("completion_tokens", 0), } } async def sse_write(resp: web.StreamResponse, event: str, data: dict): """Write one SSE event.""" payload = f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n" await resp.write(payload.encode("utf-8")) async def handle_messages(request: web.Request) -> web.StreamResponse: """Handle POST /v1/messages — Anthropic API format.""" global REQUEST_COUNT REQUEST_COUNT += 1 req_id = REQUEST_COUNT try: body = await request.json() except Exception as e: log.error(f"[req#{req_id}] Invalid JSON: {e}") return web.json_response({"type": "error", "error": {"type": "invalid_request_error", "message": str(e)}}, status=400) model_name = body.get("model", "claude-sonnet-4-20250514") is_stream = body.get("stream", False) n_tools = len(body.get("tools", [])) n_msgs = len(body.get("messages", [])) log.info(f"[req#{req_id}] {model_name} → {MODEL} | msgs={n_msgs} tools={n_tools} stream={is_stream}") # Translate try: oai_body = anthropic_to_openai_request(body) except Exception as e: log.error(f"[req#{req_id}] Translation error: {e}\n{traceback.format_exc()}") return web.json_response( {"type": "error", "error": {"type": "api_error", "message": f"Translation error: {e}"}}, status=500 ) log.info(f"[req#{req_id}] OpenAI request: {len(oai_body['messages'])} messages, {len(oai_body.get('tools', []))} tools") headers = { "Authorization": f"Bearer {NVIDIA_API_KEY}", "Content-Type": "application/json", } if PROXY_URL and PROXY_SECRET: headers["x-target-host"] = "integrate.api.nvidia.com" headers["x-proxy-key"] = PROXY_SECRET timeout = ClientTimeout(total=300, connect=30) async with ClientSession(timeout=timeout) as session: try: async with session.post(TARGET_URL, json=oai_body, headers=headers) as resp: if resp.status != 200: error_text = await resp.text() log.error(f"[req#{req_id}] NVIDIA API {resp.status}: {error_text[:500]}") return web.json_response( {"type": "error", "error": {"type": "api_error", "message": f"Backend error {resp.status}: {error_text[:300]}"}}, status=resp.status ) # ─── Non-streaming ─── if not is_stream: oai_resp = await resp.json() anthropic_resp = openai_to_anthropic_response(oai_resp, model_name) log.info(f"[req#{req_id}] Response: {len(anthropic_resp['content'])} blocks, stop={anthropic_resp['stop_reason']}") return web.json_response(anthropic_resp) # ─── Streaming ─── response = web.StreamResponse( status=200, reason="OK", headers={"Content-Type": "text/event-stream", "Cache-Control": "no-cache"} ) await response.prepare(request) msg_id = f"msg_{uuid.uuid4().hex[:24]}" # message_start await sse_write(response, "message_start", { "type": "message_start", "message": { "id": msg_id, "type": "message", "role": "assistant", "content": [], "model": model_name, "usage": {"input_tokens": 0, "output_tokens": 0} } }) block_open = False block_index = 0 full_text = "" tool_buffers = {} # tc_id -> {name, args} current_tool_id = None async for raw_line in resp.content: line = raw_line.decode("utf-8", errors="replace").strip() if not line.startswith("data: "): continue data_str = line[6:].strip() if data_str == "[DONE]": break try: chunk = json.loads(data_str) except json.JSONDecodeError: continue choices = chunk.get("choices", []) if not choices: continue delta = choices[0].get("delta", {}) finish = choices[0].get("finish_reason") # ── Text delta ── text = delta.get("content") if text: if not block_open or current_tool_id is not None: # Close any tool block first if block_open: await sse_write(response, "content_block_stop", {"type": "content_block_stop", "index": block_index}) block_index += 1 # Open text block await sse_write(response, "content_block_start", { "type": "content_block_start", "index": block_index, "content_block": {"type": "text", "text": ""} }) block_open = True current_tool_id = None await sse_write(response, "content_block_delta", { "type": "content_block_delta", "index": block_index, "delta": {"type": "text_delta", "text": text} }) full_text += text # ── Tool call delta ── tool_calls = delta.get("tool_calls", []) for tc in tool_calls: tc_id = tc.get("id") fn = tc.get("function", {}) fn_name = fn.get("name") fn_args = fn.get("arguments", "") if tc_id and tc_id not in tool_buffers: # New tool call — close previous block if block_open: await sse_write(response, "content_block_stop", {"type": "content_block_stop", "index": block_index}) block_index += 1 tool_buffers[tc_id] = {"name": fn_name or "", "args": fn_args} current_tool_id = tc_id await sse_write(response, "content_block_start", { "type": "content_block_start", "index": block_index, "content_block": { "type": "tool_use", "id": tc_id, "name": fn_name or "" } }) block_open = True if fn_args: await sse_write(response, "content_block_delta", { "type": "content_block_delta", "index": block_index, "delta": {"type": "input_json_delta", "partial_json": fn_args} }) elif fn_args: # Continuing an existing tool call if current_tool_id and current_tool_id in tool_buffers: tool_buffers[current_tool_id]["args"] += fn_args await sse_write(response, "content_block_delta", { "type": "content_block_delta", "index": block_index, "delta": {"type": "input_json_delta", "partial_json": fn_args} }) if finish: break # Close last block if block_open: await sse_write(response, "content_block_stop", {"type": "content_block_stop", "index": block_index}) # Determine stop reason stop_reason = "tool_use" if tool_buffers else "end_turn" await sse_write(response, "message_delta", { "type": "message_delta", "delta": {"stop_reason": stop_reason, "stop_sequence": None}, "usage": {"output_tokens": max(1, len(full_text.split()))} }) await sse_write(response, "message_stop", {"type": "message_stop"}) log.info(f"[req#{req_id}] Stream done: {len(full_text)} chars text, {len(tool_buffers)} tool calls, stop={stop_reason}") return response except Exception as e: log.error(f"[req#{req_id}] Error: {e}\n{traceback.format_exc()}") return web.json_response( {"type": "error", "error": {"type": "api_error", "message": str(e)}}, status=500 ) async def handle_health(request: web.Request): return web.json_response({ "status": "ok", "proxy": "anthropic-to-openai", "target_model": MODEL, "target_url": TARGET_URL, "requests_served": REQUEST_COUNT, }) def create_app(): app = web.Application() app.router.add_post("/v1/messages", handle_messages) app.router.add_post("/messages", handle_messages) app.router.add_get("/v1/messages", handle_health) app.router.add_get("/health", handle_health) app.router.add_get("/", handle_health) return app if __name__ == "__main__": app = create_app() log.info(f"Anthropic→OpenAI proxy starting on :4000 → {MODEL} via {TARGET_URL}") web.run_app(app, host="0.0.0.0", port=4000)