Spaces:
Paused
Paused
| #!/usr/bin/env python3 | |
| """ | |
| Anthropic → OpenAI API Translator Proxy | |
| Accepts Anthropic /v1/messages format, translates to OpenAI /v1/chat/completions, | |
| routes through Cloudflare proxy to NVIDIA API, translates response back. | |
| Handles: text, tool_use, tool_result, streaming SSE | |
| """ | |
| import os, json, logging, uuid, time, traceback | |
| from aiohttp import web, ClientSession, ClientTimeout | |
| log = logging.getLogger("anthropic-proxy") | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") | |
| NVIDIA_API_KEY = os.environ.get("NVIDIA_API_KEY", "") | |
| MODEL = os.environ.get("MODEL", "z-ai/glm-5.1") | |
| PROXY_URL = os.environ.get("PROXY_URL", "") | |
| PROXY_SECRET = os.environ.get("PROXY_SECRET", "") | |
| TARGET_URL = f"{PROXY_URL}/v1/chat/completions" if PROXY_URL else "https://integrate.api.nvidia.com/v1/chat/completions" | |
| REQUEST_COUNT = 0 | |
| def anthropic_tools_to_openai(tools: list) -> list: | |
| """Convert Anthropic tool definitions to OpenAI function tools.""" | |
| result = [] | |
| for t in tools: | |
| result.append({ | |
| "type": "function", | |
| "function": { | |
| "name": t.get("name", ""), | |
| "description": t.get("description", ""), | |
| "parameters": t.get("input_schema", t.get("parameters", {})) | |
| } | |
| }) | |
| return result | |
| def anthropic_messages_to_openai(messages: list) -> list: | |
| """Convert Anthropic message array to OpenAI message array.""" | |
| result = [] | |
| for msg in messages: | |
| role = msg.get("role", "user") | |
| content = msg.get("content", "") | |
| if isinstance(content, str): | |
| result.append({"role": role, "content": content}) | |
| continue | |
| if not isinstance(content, list): | |
| result.append({"role": role, "content": str(content)}) | |
| continue | |
| # Content is a list of blocks | |
| text_parts = [] | |
| tool_calls = [] | |
| tool_results = [] | |
| for block in content: | |
| btype = block.get("type", "") | |
| if btype == "text": | |
| text_parts.append(block.get("text", "")) | |
| elif btype == "tool_use": | |
| tool_calls.append({ | |
| "id": block.get("id", f"call_{uuid.uuid4().hex[:8]}"), | |
| "type": "function", | |
| "function": { | |
| "name": block.get("name", ""), | |
| "arguments": json.dumps(block.get("input", {}), ensure_ascii=False) | |
| } | |
| }) | |
| elif btype == "tool_result": | |
| # Flatten tool result content | |
| tc_content = block.get("content", "") | |
| if isinstance(tc_content, list): | |
| tc_text = "\n".join( | |
| b.get("text", "") for b in tc_content if b.get("type") == "text" | |
| ) or json.dumps(tc_content, ensure_ascii=False) | |
| else: | |
| tc_text = str(tc_content) | |
| tool_results.append({ | |
| "role": "tool", | |
| "tool_call_id": block.get("tool_use_id", ""), | |
| "content": tc_text | |
| }) | |
| # Build messages from blocks | |
| if role == "assistant": | |
| m = {"role": "assistant", "content": "\n".join(text_parts) if text_parts else ""} | |
| if tool_calls: | |
| m["tool_calls"] = tool_calls | |
| if not m["content"]: | |
| m["content"] = None | |
| result.append(m) | |
| elif tool_results: | |
| # Tool results become separate tool messages | |
| if text_parts: | |
| result.append({"role": "user", "content": "\n".join(text_parts)}) | |
| result.extend(tool_results) | |
| else: | |
| result.append({"role": role, "content": "\n".join(text_parts) or ""}) | |
| return result | |
| def anthropic_to_openai_request(body: dict) -> dict: | |
| """Convert full Anthropic /v1/messages request to OpenAI /v1/chat/completions.""" | |
| messages = [] | |
| # System message | |
| system = body.get("system", "") | |
| if isinstance(system, str) and system: | |
| messages.append({"role": "system", "content": system}) | |
| elif isinstance(system, list): | |
| text = "\n".join(b.get("text", "") for b in system if b.get("type") == "text") | |
| if text: | |
| messages.append({"role": "system", "content": text}) | |
| # Convert messages | |
| messages.extend(anthropic_messages_to_openai(body.get("messages", []))) | |
| oai = { | |
| "model": MODEL, | |
| "messages": messages, | |
| "max_tokens": body.get("max_tokens", 4096), | |
| "stream": body.get("stream", False), | |
| } | |
| if body.get("temperature") is not None: | |
| oai["temperature"] = body["temperature"] | |
| if body.get("top_p") is not None: | |
| oai["top_p"] = body["top_p"] | |
| # Tools | |
| tools = body.get("tools", []) | |
| if tools: | |
| oai["tools"] = anthropic_tools_to_openai(tools) | |
| oai["tool_choice"] = "auto" | |
| return oai | |
| def openai_to_anthropic_response(oai: dict, model_name: str) -> dict: | |
| """Convert OpenAI chat completion to Anthropic messages response.""" | |
| choices = oai.get("choices", []) | |
| if not choices: | |
| return { | |
| "id": f"msg_{uuid.uuid4().hex[:24]}", | |
| "type": "message", "role": "assistant", | |
| "content": [{"type": "text", "text": ""}], | |
| "model": model_name, | |
| "stop_reason": "end_turn", | |
| "stop_sequence": None, | |
| "usage": {"input_tokens": 0, "output_tokens": 0} | |
| } | |
| message = choices[0].get("message", {}) | |
| content = [] | |
| text = message.get("content", "") | |
| if text: | |
| content.append({"type": "text", "text": text}) | |
| for tc in message.get("tool_calls", []): | |
| fn = tc.get("function", {}) | |
| try: | |
| inp = json.loads(fn.get("arguments", "{}")) | |
| except json.JSONDecodeError: | |
| inp = {"raw": fn.get("arguments", "")} | |
| content.append({ | |
| "type": "tool_use", | |
| "id": tc.get("id", f"toolu_{uuid.uuid4().hex[:12]}"), | |
| "name": fn.get("name", ""), | |
| "input": inp | |
| }) | |
| if not content: | |
| content.append({"type": "text", "text": ""}) | |
| finish = choices[0].get("finish_reason", "stop") | |
| stop_map = {"stop": "end_turn", "tool_calls": "tool_use", "length": "max_tokens"} | |
| usage = oai.get("usage", {}) | |
| return { | |
| "id": f"msg_{uuid.uuid4().hex[:24]}", | |
| "type": "message", | |
| "role": "assistant", | |
| "content": content, | |
| "model": model_name, | |
| "stop_reason": stop_map.get(finish, "end_turn"), | |
| "stop_sequence": None, | |
| "usage": { | |
| "input_tokens": usage.get("prompt_tokens", 0), | |
| "output_tokens": usage.get("completion_tokens", 0), | |
| } | |
| } | |
| async def sse_write(resp: web.StreamResponse, event: str, data: dict): | |
| """Write one SSE event.""" | |
| payload = f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n" | |
| await resp.write(payload.encode("utf-8")) | |
| async def handle_messages(request: web.Request) -> web.StreamResponse: | |
| """Handle POST /v1/messages — Anthropic API format.""" | |
| global REQUEST_COUNT | |
| REQUEST_COUNT += 1 | |
| req_id = REQUEST_COUNT | |
| try: | |
| body = await request.json() | |
| except Exception as e: | |
| log.error(f"[req#{req_id}] Invalid JSON: {e}") | |
| return web.json_response({"type": "error", "error": {"type": "invalid_request_error", "message": str(e)}}, status=400) | |
| model_name = body.get("model", "claude-sonnet-4-20250514") | |
| is_stream = body.get("stream", False) | |
| n_tools = len(body.get("tools", [])) | |
| n_msgs = len(body.get("messages", [])) | |
| log.info(f"[req#{req_id}] {model_name} → {MODEL} | msgs={n_msgs} tools={n_tools} stream={is_stream}") | |
| # Translate | |
| try: | |
| oai_body = anthropic_to_openai_request(body) | |
| except Exception as e: | |
| log.error(f"[req#{req_id}] Translation error: {e}\n{traceback.format_exc()}") | |
| return web.json_response( | |
| {"type": "error", "error": {"type": "api_error", "message": f"Translation error: {e}"}}, | |
| status=500 | |
| ) | |
| log.info(f"[req#{req_id}] OpenAI request: {len(oai_body['messages'])} messages, {len(oai_body.get('tools', []))} tools") | |
| headers = { | |
| "Authorization": f"Bearer {NVIDIA_API_KEY}", | |
| "Content-Type": "application/json", | |
| } | |
| if PROXY_URL and PROXY_SECRET: | |
| headers["x-target-host"] = "integrate.api.nvidia.com" | |
| headers["x-proxy-key"] = PROXY_SECRET | |
| timeout = ClientTimeout(total=300, connect=30) | |
| async with ClientSession(timeout=timeout) as session: | |
| try: | |
| async with session.post(TARGET_URL, json=oai_body, headers=headers) as resp: | |
| if resp.status != 200: | |
| error_text = await resp.text() | |
| log.error(f"[req#{req_id}] NVIDIA API {resp.status}: {error_text[:500]}") | |
| return web.json_response( | |
| {"type": "error", "error": {"type": "api_error", "message": f"Backend error {resp.status}: {error_text[:300]}"}}, | |
| status=resp.status | |
| ) | |
| # ─── Non-streaming ─── | |
| if not is_stream: | |
| oai_resp = await resp.json() | |
| anthropic_resp = openai_to_anthropic_response(oai_resp, model_name) | |
| log.info(f"[req#{req_id}] Response: {len(anthropic_resp['content'])} blocks, stop={anthropic_resp['stop_reason']}") | |
| return web.json_response(anthropic_resp) | |
| # ─── Streaming ─── | |
| response = web.StreamResponse( | |
| status=200, reason="OK", | |
| headers={"Content-Type": "text/event-stream", "Cache-Control": "no-cache"} | |
| ) | |
| await response.prepare(request) | |
| msg_id = f"msg_{uuid.uuid4().hex[:24]}" | |
| # message_start | |
| await sse_write(response, "message_start", { | |
| "type": "message_start", | |
| "message": { | |
| "id": msg_id, "type": "message", "role": "assistant", | |
| "content": [], "model": model_name, | |
| "usage": {"input_tokens": 0, "output_tokens": 0} | |
| } | |
| }) | |
| block_open = False | |
| block_index = 0 | |
| full_text = "" | |
| tool_buffers = {} # tc_id -> {name, args} | |
| current_tool_id = None | |
| async for raw_line in resp.content: | |
| line = raw_line.decode("utf-8", errors="replace").strip() | |
| if not line.startswith("data: "): | |
| continue | |
| data_str = line[6:].strip() | |
| if data_str == "[DONE]": | |
| break | |
| try: | |
| chunk = json.loads(data_str) | |
| except json.JSONDecodeError: | |
| continue | |
| choices = chunk.get("choices", []) | |
| if not choices: | |
| continue | |
| delta = choices[0].get("delta", {}) | |
| finish = choices[0].get("finish_reason") | |
| # ── Text delta ── | |
| text = delta.get("content") | |
| if text: | |
| if not block_open or current_tool_id is not None: | |
| # Close any tool block first | |
| if block_open: | |
| await sse_write(response, "content_block_stop", | |
| {"type": "content_block_stop", "index": block_index}) | |
| block_index += 1 | |
| # Open text block | |
| await sse_write(response, "content_block_start", { | |
| "type": "content_block_start", "index": block_index, | |
| "content_block": {"type": "text", "text": ""} | |
| }) | |
| block_open = True | |
| current_tool_id = None | |
| await sse_write(response, "content_block_delta", { | |
| "type": "content_block_delta", "index": block_index, | |
| "delta": {"type": "text_delta", "text": text} | |
| }) | |
| full_text += text | |
| # ── Tool call delta ── | |
| tool_calls = delta.get("tool_calls", []) | |
| for tc in tool_calls: | |
| tc_id = tc.get("id") | |
| fn = tc.get("function", {}) | |
| fn_name = fn.get("name") | |
| fn_args = fn.get("arguments", "") | |
| if tc_id and tc_id not in tool_buffers: | |
| # New tool call — close previous block | |
| if block_open: | |
| await sse_write(response, "content_block_stop", | |
| {"type": "content_block_stop", "index": block_index}) | |
| block_index += 1 | |
| tool_buffers[tc_id] = {"name": fn_name or "", "args": fn_args} | |
| current_tool_id = tc_id | |
| await sse_write(response, "content_block_start", { | |
| "type": "content_block_start", "index": block_index, | |
| "content_block": { | |
| "type": "tool_use", | |
| "id": tc_id, | |
| "name": fn_name or "" | |
| } | |
| }) | |
| block_open = True | |
| if fn_args: | |
| await sse_write(response, "content_block_delta", { | |
| "type": "content_block_delta", "index": block_index, | |
| "delta": {"type": "input_json_delta", "partial_json": fn_args} | |
| }) | |
| elif fn_args: | |
| # Continuing an existing tool call | |
| if current_tool_id and current_tool_id in tool_buffers: | |
| tool_buffers[current_tool_id]["args"] += fn_args | |
| await sse_write(response, "content_block_delta", { | |
| "type": "content_block_delta", "index": block_index, | |
| "delta": {"type": "input_json_delta", "partial_json": fn_args} | |
| }) | |
| if finish: | |
| break | |
| # Close last block | |
| if block_open: | |
| await sse_write(response, "content_block_stop", | |
| {"type": "content_block_stop", "index": block_index}) | |
| # Determine stop reason | |
| stop_reason = "tool_use" if tool_buffers else "end_turn" | |
| await sse_write(response, "message_delta", { | |
| "type": "message_delta", | |
| "delta": {"stop_reason": stop_reason, "stop_sequence": None}, | |
| "usage": {"output_tokens": max(1, len(full_text.split()))} | |
| }) | |
| await sse_write(response, "message_stop", {"type": "message_stop"}) | |
| log.info(f"[req#{req_id}] Stream done: {len(full_text)} chars text, {len(tool_buffers)} tool calls, stop={stop_reason}") | |
| return response | |
| except Exception as e: | |
| log.error(f"[req#{req_id}] Error: {e}\n{traceback.format_exc()}") | |
| return web.json_response( | |
| {"type": "error", "error": {"type": "api_error", "message": str(e)}}, | |
| status=500 | |
| ) | |
| async def handle_health(request: web.Request): | |
| return web.json_response({ | |
| "status": "ok", | |
| "proxy": "anthropic-to-openai", | |
| "target_model": MODEL, | |
| "target_url": TARGET_URL, | |
| "requests_served": REQUEST_COUNT, | |
| }) | |
| def create_app(): | |
| app = web.Application() | |
| app.router.add_post("/v1/messages", handle_messages) | |
| app.router.add_post("/messages", handle_messages) | |
| app.router.add_get("/v1/messages", handle_health) | |
| app.router.add_get("/health", handle_health) | |
| app.router.add_get("/", handle_health) | |
| return app | |
| if __name__ == "__main__": | |
| app = create_app() | |
| log.info(f"Anthropic→OpenAI proxy starting on :4000 → {MODEL} via {TARGET_URL}") | |
| web.run_app(app, host="0.0.0.0", port=4000) | |