ClaudeCode / proxy.py
ayoub5550's picture
Upload proxy.py with huggingface_hub
39ecbbc verified
Raw
History Blame Contribute Delete
16.8 kB
#!/usr/bin/env python3
"""
Anthropic → OpenAI API Translator Proxy
Accepts Anthropic /v1/messages format, translates to OpenAI /v1/chat/completions,
routes through Cloudflare proxy to NVIDIA API, translates response back.
Handles: text, tool_use, tool_result, streaming SSE
"""
import os, json, logging, uuid, time, traceback
from aiohttp import web, ClientSession, ClientTimeout
log = logging.getLogger("anthropic-proxy")
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
NVIDIA_API_KEY = os.environ.get("NVIDIA_API_KEY", "")
MODEL = os.environ.get("MODEL", "z-ai/glm-5.1")
PROXY_URL = os.environ.get("PROXY_URL", "")
PROXY_SECRET = os.environ.get("PROXY_SECRET", "")
TARGET_URL = f"{PROXY_URL}/v1/chat/completions" if PROXY_URL else "https://integrate.api.nvidia.com/v1/chat/completions"
REQUEST_COUNT = 0
def anthropic_tools_to_openai(tools: list) -> list:
"""Convert Anthropic tool definitions to OpenAI function tools."""
result = []
for t in tools:
result.append({
"type": "function",
"function": {
"name": t.get("name", ""),
"description": t.get("description", ""),
"parameters": t.get("input_schema", t.get("parameters", {}))
}
})
return result
def anthropic_messages_to_openai(messages: list) -> list:
"""Convert Anthropic message array to OpenAI message array."""
result = []
for msg in messages:
role = msg.get("role", "user")
content = msg.get("content", "")
if isinstance(content, str):
result.append({"role": role, "content": content})
continue
if not isinstance(content, list):
result.append({"role": role, "content": str(content)})
continue
# Content is a list of blocks
text_parts = []
tool_calls = []
tool_results = []
for block in content:
btype = block.get("type", "")
if btype == "text":
text_parts.append(block.get("text", ""))
elif btype == "tool_use":
tool_calls.append({
"id": block.get("id", f"call_{uuid.uuid4().hex[:8]}"),
"type": "function",
"function": {
"name": block.get("name", ""),
"arguments": json.dumps(block.get("input", {}), ensure_ascii=False)
}
})
elif btype == "tool_result":
# Flatten tool result content
tc_content = block.get("content", "")
if isinstance(tc_content, list):
tc_text = "\n".join(
b.get("text", "") for b in tc_content if b.get("type") == "text"
) or json.dumps(tc_content, ensure_ascii=False)
else:
tc_text = str(tc_content)
tool_results.append({
"role": "tool",
"tool_call_id": block.get("tool_use_id", ""),
"content": tc_text
})
# Build messages from blocks
if role == "assistant":
m = {"role": "assistant", "content": "\n".join(text_parts) if text_parts else ""}
if tool_calls:
m["tool_calls"] = tool_calls
if not m["content"]:
m["content"] = None
result.append(m)
elif tool_results:
# Tool results become separate tool messages
if text_parts:
result.append({"role": "user", "content": "\n".join(text_parts)})
result.extend(tool_results)
else:
result.append({"role": role, "content": "\n".join(text_parts) or ""})
return result
def anthropic_to_openai_request(body: dict) -> dict:
"""Convert full Anthropic /v1/messages request to OpenAI /v1/chat/completions."""
messages = []
# System message
system = body.get("system", "")
if isinstance(system, str) and system:
messages.append({"role": "system", "content": system})
elif isinstance(system, list):
text = "\n".join(b.get("text", "") for b in system if b.get("type") == "text")
if text:
messages.append({"role": "system", "content": text})
# Convert messages
messages.extend(anthropic_messages_to_openai(body.get("messages", [])))
oai = {
"model": MODEL,
"messages": messages,
"max_tokens": body.get("max_tokens", 4096),
"stream": body.get("stream", False),
}
if body.get("temperature") is not None:
oai["temperature"] = body["temperature"]
if body.get("top_p") is not None:
oai["top_p"] = body["top_p"]
# Tools
tools = body.get("tools", [])
if tools:
oai["tools"] = anthropic_tools_to_openai(tools)
oai["tool_choice"] = "auto"
return oai
def openai_to_anthropic_response(oai: dict, model_name: str) -> dict:
"""Convert OpenAI chat completion to Anthropic messages response."""
choices = oai.get("choices", [])
if not choices:
return {
"id": f"msg_{uuid.uuid4().hex[:24]}",
"type": "message", "role": "assistant",
"content": [{"type": "text", "text": ""}],
"model": model_name,
"stop_reason": "end_turn",
"stop_sequence": None,
"usage": {"input_tokens": 0, "output_tokens": 0}
}
message = choices[0].get("message", {})
content = []
text = message.get("content", "")
if text:
content.append({"type": "text", "text": text})
for tc in message.get("tool_calls", []):
fn = tc.get("function", {})
try:
inp = json.loads(fn.get("arguments", "{}"))
except json.JSONDecodeError:
inp = {"raw": fn.get("arguments", "")}
content.append({
"type": "tool_use",
"id": tc.get("id", f"toolu_{uuid.uuid4().hex[:12]}"),
"name": fn.get("name", ""),
"input": inp
})
if not content:
content.append({"type": "text", "text": ""})
finish = choices[0].get("finish_reason", "stop")
stop_map = {"stop": "end_turn", "tool_calls": "tool_use", "length": "max_tokens"}
usage = oai.get("usage", {})
return {
"id": f"msg_{uuid.uuid4().hex[:24]}",
"type": "message",
"role": "assistant",
"content": content,
"model": model_name,
"stop_reason": stop_map.get(finish, "end_turn"),
"stop_sequence": None,
"usage": {
"input_tokens": usage.get("prompt_tokens", 0),
"output_tokens": usage.get("completion_tokens", 0),
}
}
async def sse_write(resp: web.StreamResponse, event: str, data: dict):
"""Write one SSE event."""
payload = f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
await resp.write(payload.encode("utf-8"))
async def handle_messages(request: web.Request) -> web.StreamResponse:
"""Handle POST /v1/messages — Anthropic API format."""
global REQUEST_COUNT
REQUEST_COUNT += 1
req_id = REQUEST_COUNT
try:
body = await request.json()
except Exception as e:
log.error(f"[req#{req_id}] Invalid JSON: {e}")
return web.json_response({"type": "error", "error": {"type": "invalid_request_error", "message": str(e)}}, status=400)
model_name = body.get("model", "claude-sonnet-4-20250514")
is_stream = body.get("stream", False)
n_tools = len(body.get("tools", []))
n_msgs = len(body.get("messages", []))
log.info(f"[req#{req_id}] {model_name}{MODEL} | msgs={n_msgs} tools={n_tools} stream={is_stream}")
# Translate
try:
oai_body = anthropic_to_openai_request(body)
except Exception as e:
log.error(f"[req#{req_id}] Translation error: {e}\n{traceback.format_exc()}")
return web.json_response(
{"type": "error", "error": {"type": "api_error", "message": f"Translation error: {e}"}},
status=500
)
log.info(f"[req#{req_id}] OpenAI request: {len(oai_body['messages'])} messages, {len(oai_body.get('tools', []))} tools")
headers = {
"Authorization": f"Bearer {NVIDIA_API_KEY}",
"Content-Type": "application/json",
}
if PROXY_URL and PROXY_SECRET:
headers["x-target-host"] = "integrate.api.nvidia.com"
headers["x-proxy-key"] = PROXY_SECRET
timeout = ClientTimeout(total=300, connect=30)
async with ClientSession(timeout=timeout) as session:
try:
async with session.post(TARGET_URL, json=oai_body, headers=headers) as resp:
if resp.status != 200:
error_text = await resp.text()
log.error(f"[req#{req_id}] NVIDIA API {resp.status}: {error_text[:500]}")
return web.json_response(
{"type": "error", "error": {"type": "api_error", "message": f"Backend error {resp.status}: {error_text[:300]}"}},
status=resp.status
)
# ─── Non-streaming ───
if not is_stream:
oai_resp = await resp.json()
anthropic_resp = openai_to_anthropic_response(oai_resp, model_name)
log.info(f"[req#{req_id}] Response: {len(anthropic_resp['content'])} blocks, stop={anthropic_resp['stop_reason']}")
return web.json_response(anthropic_resp)
# ─── Streaming ───
response = web.StreamResponse(
status=200, reason="OK",
headers={"Content-Type": "text/event-stream", "Cache-Control": "no-cache"}
)
await response.prepare(request)
msg_id = f"msg_{uuid.uuid4().hex[:24]}"
# message_start
await sse_write(response, "message_start", {
"type": "message_start",
"message": {
"id": msg_id, "type": "message", "role": "assistant",
"content": [], "model": model_name,
"usage": {"input_tokens": 0, "output_tokens": 0}
}
})
block_open = False
block_index = 0
full_text = ""
tool_buffers = {} # tc_id -> {name, args}
current_tool_id = None
async for raw_line in resp.content:
line = raw_line.decode("utf-8", errors="replace").strip()
if not line.startswith("data: "):
continue
data_str = line[6:].strip()
if data_str == "[DONE]":
break
try:
chunk = json.loads(data_str)
except json.JSONDecodeError:
continue
choices = chunk.get("choices", [])
if not choices:
continue
delta = choices[0].get("delta", {})
finish = choices[0].get("finish_reason")
# ── Text delta ──
text = delta.get("content")
if text:
if not block_open or current_tool_id is not None:
# Close any tool block first
if block_open:
await sse_write(response, "content_block_stop",
{"type": "content_block_stop", "index": block_index})
block_index += 1
# Open text block
await sse_write(response, "content_block_start", {
"type": "content_block_start", "index": block_index,
"content_block": {"type": "text", "text": ""}
})
block_open = True
current_tool_id = None
await sse_write(response, "content_block_delta", {
"type": "content_block_delta", "index": block_index,
"delta": {"type": "text_delta", "text": text}
})
full_text += text
# ── Tool call delta ──
tool_calls = delta.get("tool_calls", [])
for tc in tool_calls:
tc_id = tc.get("id")
fn = tc.get("function", {})
fn_name = fn.get("name")
fn_args = fn.get("arguments", "")
if tc_id and tc_id not in tool_buffers:
# New tool call — close previous block
if block_open:
await sse_write(response, "content_block_stop",
{"type": "content_block_stop", "index": block_index})
block_index += 1
tool_buffers[tc_id] = {"name": fn_name or "", "args": fn_args}
current_tool_id = tc_id
await sse_write(response, "content_block_start", {
"type": "content_block_start", "index": block_index,
"content_block": {
"type": "tool_use",
"id": tc_id,
"name": fn_name or ""
}
})
block_open = True
if fn_args:
await sse_write(response, "content_block_delta", {
"type": "content_block_delta", "index": block_index,
"delta": {"type": "input_json_delta", "partial_json": fn_args}
})
elif fn_args:
# Continuing an existing tool call
if current_tool_id and current_tool_id in tool_buffers:
tool_buffers[current_tool_id]["args"] += fn_args
await sse_write(response, "content_block_delta", {
"type": "content_block_delta", "index": block_index,
"delta": {"type": "input_json_delta", "partial_json": fn_args}
})
if finish:
break
# Close last block
if block_open:
await sse_write(response, "content_block_stop",
{"type": "content_block_stop", "index": block_index})
# Determine stop reason
stop_reason = "tool_use" if tool_buffers else "end_turn"
await sse_write(response, "message_delta", {
"type": "message_delta",
"delta": {"stop_reason": stop_reason, "stop_sequence": None},
"usage": {"output_tokens": max(1, len(full_text.split()))}
})
await sse_write(response, "message_stop", {"type": "message_stop"})
log.info(f"[req#{req_id}] Stream done: {len(full_text)} chars text, {len(tool_buffers)} tool calls, stop={stop_reason}")
return response
except Exception as e:
log.error(f"[req#{req_id}] Error: {e}\n{traceback.format_exc()}")
return web.json_response(
{"type": "error", "error": {"type": "api_error", "message": str(e)}},
status=500
)
async def handle_health(request: web.Request):
return web.json_response({
"status": "ok",
"proxy": "anthropic-to-openai",
"target_model": MODEL,
"target_url": TARGET_URL,
"requests_served": REQUEST_COUNT,
})
def create_app():
app = web.Application()
app.router.add_post("/v1/messages", handle_messages)
app.router.add_post("/messages", handle_messages)
app.router.add_get("/v1/messages", handle_health)
app.router.add_get("/health", handle_health)
app.router.add_get("/", handle_health)
return app
if __name__ == "__main__":
app = create_app()
log.info(f"Anthropic→OpenAI proxy starting on :4000 → {MODEL} via {TARGET_URL}")
web.run_app(app, host="0.0.0.0", port=4000)