NaiRouter / proxy.py
justadri23's picture
Rename app.py to proxy.py
5a7415c verified
#!/usr/bin/env python3
"""
═══════════════════════════════════════════════════════════════════
Proxy Bidireccional: OpenAI ↔ Anthropic
═══════════════════════════════════════════════════════════════════
Endpoints expuestos:
POST /v1/chat/completions
β†’ Recibe formato OpenAI, convierte a Anthropic, reenvΓ­a
a la API de Anthropic y devuelve la respuesta en formato
OpenAI. Soporta streaming.
POST /v1/messages
β†’ Recibe formato Anthropic, convierte a OpenAI, reenvΓ­a
a la API de OpenAI y devuelve la respuesta en formato
Anthropic. Soporta streaming.
GET /v1/models β†’ Lista de modelos disponibles
GET /health β†’ Estado del servidor
Variables de entorno (ver .env.example):
ANTHROPIC_API_KEY, ANTHROPIC_BASE_URL,
OPENAI_API_KEY, OPENAI_BASE_URL,
PROXY_HOST, PROXY_PORT, PROXY_API_KEY
═══════════════════════════════════════════════════════════════════
"""
from __future__ import annotations
import json
import logging
import os
import re
import time
import uuid
from typing import Any, AsyncGenerator, Dict, List, Optional
import httpx
import uvicorn
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, StreamingResponse
# ────────────────────────────────────────────────────────────────
# ConfiguraciΓ³n
# ────────────────────────────────────────────────────────────────
load_dotenv()
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "")
ANTHROPIC_BASE_URL = os.getenv("ANTHROPIC_BASE_URL", "https://api.anthropic.com")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
OPENAI_BASE_URL = os.getenv("OPENAI_BASE_URL", "https://api.openai.com")
PROXY_API_KEY = os.getenv("PROXY_API_KEY", "")
PROXY_HOST = os.getenv("PROXY_HOST", "0.0.0.0")
PROXY_PORT = int(os.getenv("PROXY_PORT", "8000"))
ANTHROPIC_VERSION = "2023-06-01"
DEFAULT_MAX_TOKENS = 4096
HTTP_TIMEOUT = 300.0 # segundos
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger("proxy")
# ────────────────────────────────────────────────────────────────
# Mapeo de modelos
# ────────────────────────────────────────────────────────────────
MODEL_OAI_TO_ANTH: Dict[str, str] = {
"claude-opus-4.6": "claude-opus-4-6",
}
MODEL_ANTH_TO_OAI: Dict[str, str] = {
"claude-3-opus-20240229": "gpt-4",
}
# ────────────────────────────────────────────────────────────────
# FastAPI
# ────────────────────────────────────────────────────────────────
app = FastAPI(title="OpenAI ↔ Anthropic Proxy")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# ────────────────────────────────────────────────────────────────
# Utilidades
# ────────────────────────────────────────────────────────────────
def _gen_id(prefix: str = "chatcmpl") -> str:
return f"{prefix}-{uuid.uuid4().hex[:29]}"
def _now() -> int:
return int(time.time())
def _finish_to_openai(reason: Optional[str]) -> Optional[str]:
"""end_turn β†’ stop, max_tokens β†’ length"""
return {
"end_turn": "stop",
"max_tokens": "length",
"stop_sequence": "stop",
}.get(reason or "", reason)
def _finish_to_anthropic(reason: Optional[str]) -> Optional[str]:
"""stop β†’ end_turn, length β†’ max_tokens"""
return {
"stop": "end_turn",
"length": "max_tokens",
}.get(reason or "", reason)
# ────────────────────────────────────────────────────────────────
# ConversiΓ³n de contenido (texto + imΓ‘genes)
# ────────────────────────────────────────────────────────────────
def _oai_content_to_anth(content: Any) -> Any:
"""
OpenAI content (str | list[parts]) β†’ Anthropic content (str | list[blocks])
"""
if content is None:
return ""
if isinstance(content, str):
return content
blocks: list = []
for part in content:
ptype = part.get("type", "text")
if ptype == "text":
blocks.append({"type": "text", "text": part.get("text", "")})
elif ptype == "image_url":
url: str = part.get("image_url", {}).get("url", "")
match = re.match(
r"data:(image/[a-zA-Z+]+);base64,(.+)", url, re.DOTALL
)
if match:
blocks.append({
"type": "image",
"source": {
"type": "base64",
"media_type": match.group(1),
"data": match.group(2),
},
})
else:
# URL directa β€” Anthropic soporta URLs desde 2024
blocks.append({
"type": "image",
"source": {"type": "url", "url": url},
})
return blocks or ""
def _anth_content_to_text(content: Any) -> str:
"""Anthropic content (str | list[blocks]) β†’ texto plano."""
if isinstance(content, str):
return content
if isinstance(content, list):
return "".join(
b.get("text", "")
for b in content
if isinstance(b, dict) and b.get("type") == "text"
)
return str(content) if content else ""
def _anth_content_to_oai_parts(content: Any) -> Any:
"""Anthropic content β†’ OpenAI content (str o list de parts)."""
if isinstance(content, str):
return content
if not isinstance(content, list):
return str(content) if content else ""
has_images = any(
isinstance(b, dict) and b.get("type") == "image" for b in content
)
if not has_images:
return _anth_content_to_text(content)
parts: list = []
for b in content:
if not isinstance(b, dict):
continue
if b.get("type") == "text":
parts.append({"type": "text", "text": b.get("text", "")})
elif b.get("type") == "image":
src = b.get("source", {})
if src.get("type") == "base64":
data_url = (
f"data:{src.get('media_type','image/png')};"
f"base64,{src.get('data','')}"
)
parts.append({"type": "image_url", "image_url": {"url": data_url}})
elif src.get("type") == "url":
parts.append({"type": "image_url", "image_url": {"url": src.get("url", "")}})
return parts or ""
# ────────────────────────────────────────────────────────────────
# ConversiΓ³n de requests
# ────────────────────────────────────────────────────────────────
def _merge_consecutive(msgs: List[Dict]) -> List[Dict]:
"""Anthropic no permite roles consecutivos iguales."""
merged: List[Dict] = []
for m in msgs:
if not merged or merged[-1]["role"] != m["role"]:
merged.append(m)
continue
# fusionar contenido
prev, curr = merged[-1]["content"], m["content"]
if isinstance(prev, str) and isinstance(curr, str):
merged[-1]["content"] = prev + "\n" + curr
else:
if isinstance(prev, str):
prev = [{"type": "text", "text": prev}] if prev else []
if isinstance(curr, str):
curr = [{"type": "text", "text": curr}] if curr else []
merged[-1]["content"] = prev + curr
return merged
def openai_req_to_anthropic(oai: dict) -> dict:
"""
POST /v1/chat/completions body β†’ cuerpo para POST Anthropic /v1/messages
"""
system_parts: List[str] = []
conv_msgs: List[Dict] = []
for msg in oai.get("messages", []):
role = msg.get("role", "user")
raw_content = msg.get("content", "")
if role == "system":
text = (
raw_content
if isinstance(raw_content, str)
else _anth_content_to_text(raw_content)
)
system_parts.append(text)
else:
if role not in ("user", "assistant"):
role = "user" # 'tool' / 'function' β†’ user
conv_msgs.append({
"role": role,
"content": _oai_content_to_anth(raw_content),
})
conv_msgs = _merge_consecutive(conv_msgs)
# Anthropic exige que el primer mensaje sea 'user'
if conv_msgs and conv_msgs[0]["role"] != "user":
conv_msgs.insert(0, {"role": "user", "content": "[inicio de conversaciΓ³n]"})
if not conv_msgs:
conv_msgs = [{"role": "user", "content": "Hola"}]
anth: Dict[str, Any] = {
"model": MODEL_OAI_TO_ANTH.get(
oai.get("model", ""), oai.get("model", "claude-sonnet-4-20250514")
),
"messages": conv_msgs,
"max_tokens": oai.get("max_tokens")
or oai.get("max_completion_tokens")
or DEFAULT_MAX_TOKENS,
}
if system_parts:
anth["system"] = "\n\n".join(system_parts)
for key in ("temperature", "top_p"):
if key in oai and oai[key] is not None:
anth[key] = oai[key]
if oai.get("stop") is not None:
s = oai["stop"]
anth["stop_sequences"] = [s] if isinstance(s, str) else s
if oai.get("stream"):
anth["stream"] = True
return anth
def anthropic_req_to_openai(anth: dict) -> dict:
"""
POST /v1/messages body β†’ cuerpo para POST OpenAI /v1/chat/completions
"""
oai_msgs: List[Dict] = []
# system
system = anth.get("system", "")
if system:
if isinstance(system, list):
system = " ".join(
b.get("text", "") for b in system if b.get("type") == "text"
)
oai_msgs.append({"role": "system", "content": system})
for msg in anth.get("messages", []):
oai_msgs.append({
"role": msg.get("role", "user"),
"content": _anth_content_to_oai_parts(msg.get("content", "")),
})
oai: Dict[str, Any] = {
"model": MODEL_ANTH_TO_OAI.get(
anth.get("model", ""), anth.get("model", "gpt-4o")
),
"messages": oai_msgs,
}
if "max_tokens" in anth:
oai["max_tokens"] = anth["max_tokens"]
for key in ("temperature", "top_p"):
if key in anth and anth[key] is not None:
oai[key] = anth[key]
if "stop_sequences" in anth:
oai["stop"] = anth["stop_sequences"]
if anth.get("stream"):
oai["stream"] = True
oai["stream_options"] = {"include_usage": True}
return oai
# ────────────────────────────────────────────────────────────────
# ConversiΓ³n de responses (no-stream)
# ────────────────────────────────────────────────────────────────
def anthropic_resp_to_openai(anth: dict, model: str) -> dict:
usage = anth.get("usage", {})
inp = usage.get("input_tokens", 0)
out = usage.get("output_tokens", 0)
return {
"id": _gen_id(),
"object": "chat.completion",
"created": _now(),
"model": model,
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": _anth_content_to_text(anth.get("content", [])),
},
"finish_reason": _finish_to_openai(anth.get("stop_reason")),
}],
"usage": {
"prompt_tokens": inp,
"completion_tokens": out,
"total_tokens": inp + out,
},
}
def openai_resp_to_anthropic(oai: dict, model: str) -> dict:
choice = (oai.get("choices") or [{}])[0]
usage = oai.get("usage", {})
content_text = choice.get("message", {}).get("content", "")
return {
"id": f"msg_{uuid.uuid4().hex[:24]}",
"type": "message",
"role": "assistant",
"model": model,
"content": [{"type": "text", "text": content_text}] if content_text else [],
"stop_reason": _finish_to_anthropic(choice.get("finish_reason")),
"stop_sequence": None,
"usage": {
"input_tokens": usage.get("prompt_tokens", 0),
"output_tokens": usage.get("completion_tokens", 0),
},
}
# ────────────────────────────────────────────────────────────────
# Streaming: Anthropic SSE β†’ OpenAI SSE
# ────────────────────────────────────────────────────────────────
async def _stream_anth_as_oai(
resp: httpx.Response,
model: str,
) -> AsyncGenerator[str, None]:
rid = _gen_id()
created = _now()
inp_tok = 0
# primer chunk: role
yield _sse({
"id": rid, "object": "chat.completion.chunk",
"created": created, "model": model,
"choices": [{"index": 0,
"delta": {"role": "assistant", "content": ""},
"finish_reason": None}],
})
async for line in resp.aiter_lines():
line = line.strip()
if not line or line.startswith("event:"):
continue
if not line.startswith("data:"):
continue
raw = line[5:].strip()
if not raw:
continue
try:
data = json.loads(raw)
except json.JSONDecodeError:
continue
evt = data.get("type", "")
if evt == "message_start":
inp_tok = (
data.get("message", {})
.get("usage", {})
.get("input_tokens", 0)
)
elif evt == "content_block_delta":
text = data.get("delta", {}).get("text", "")
if text:
yield _sse({
"id": rid, "object": "chat.completion.chunk",
"created": created, "model": model,
"choices": [{"index": 0,
"delta": {"content": text},
"finish_reason": None}],
})
elif evt == "message_delta":
sr = data.get("delta", {}).get("stop_reason")
out = data.get("usage", {}).get("output_tokens", 0)
yield _sse({
"id": rid, "object": "chat.completion.chunk",
"created": created, "model": model,
"choices": [{"index": 0,
"delta": {},
"finish_reason": _finish_to_openai(sr)}],
"usage": {
"prompt_tokens": inp_tok,
"completion_tokens": out,
"total_tokens": inp_tok + out,
},
})
yield "data: [DONE]\n\n"
# ────────────────────────────────────────────────────────────────
# Streaming: OpenAI SSE β†’ Anthropic SSE
# ────────────────────────────────────────────────────────────────
async def _stream_oai_as_anth(
resp: httpx.Response,
model: str,
) -> AsyncGenerator[str, None]:
msg_id = f"msg_{uuid.uuid4().hex[:24]}"
# message_start
yield _anth_sse("message_start", {
"type": "message_start",
"message": {
"id": msg_id, "type": "message", "role": "assistant",
"content": [], "model": model,
"stop_reason": None, "stop_sequence": None,
"usage": {"input_tokens": 0, "output_tokens": 0},
},
})
# content_block_start
yield _anth_sse("content_block_start", {
"type": "content_block_start", "index": 0,
"content_block": {"type": "text", "text": ""},
})
finish_reason = None
p_tok = 0
c_tok = 0
async for line in resp.aiter_lines():
line = line.strip()
if not line.startswith("data:"):
continue
raw = line[5:].strip()
if raw == "[DONE]":
break
if not raw:
continue
try:
data = json.loads(raw)
except json.JSONDecodeError:
continue
# uso acumulado
if "usage" in data:
p_tok = data["usage"].get("prompt_tokens", p_tok)
c_tok = data["usage"].get("completion_tokens", c_tok)
choices = data.get("choices", [])
if not choices:
continue
choice = choices[0]
delta = choice.get("delta", {})
fr = choice.get("finish_reason")
if fr:
finish_reason = fr
text = delta.get("content", "")
if text:
yield _anth_sse("content_block_delta", {
"type": "content_block_delta", "index": 0,
"delta": {"type": "text_delta", "text": text},
})
# content_block_stop
yield _anth_sse("content_block_stop", {
"type": "content_block_stop", "index": 0,
})
# message_delta
yield _anth_sse("message_delta", {
"type": "message_delta",
"delta": {
"stop_reason": _finish_to_anthropic(finish_reason),
"stop_sequence": None,
},
"usage": {"output_tokens": c_tok},
})
# message_stop
yield _anth_sse("message_stop", {"type": "message_stop"})
# helpers SSE
def _sse(obj: dict) -> str:
return f"data: {json.dumps(obj, ensure_ascii=False)}\n\n"
def _anth_sse(event: str, obj: dict) -> str:
return f"event: {event}\ndata: {json.dumps(obj, ensure_ascii=False)}\n\n"
# ────────────────────────────────────────────────────────────────
# AutenticaciΓ³n del proxy (opcional)
# ────────────────────────────────────────────────────────────────
def _check_auth(request: Request) -> None:
if not PROXY_API_KEY:
return
auth = request.headers.get("authorization", "")
x_key = request.headers.get("x-api-key", "")
token = ""
if auth.lower().startswith("bearer "):
token = auth[7:]
elif x_key:
token = x_key
if token != PROXY_API_KEY:
raise HTTPException(status_code=401, detail="API key invΓ‘lida")
def _require_key(name: str, value: str) -> None:
if not value:
raise HTTPException(
status_code=500,
detail=f"Variable de entorno {name} no configurada en el servidor",
)
# ────────────────────────────────────────────────────────────────
# ENDPOINT: /v1/chat/completions
# Recibe OpenAI β†’ reenvΓ­a a Anthropic β†’ devuelve OpenAI
# ────────────────────────────────────────────────────────────────
@app.post("/v1/chat/completions")
@app.post("/chat/completions")
async def ep_chat_completions(request: Request):
_check_auth(request)
_require_key("ANTHROPIC_API_KEY", ANTHROPIC_API_KEY)
body = await request.json()
is_stream = body.get("stream", False)
model_in = body.get("model", "")
anth_body = openai_req_to_anthropic(body)
url = f"{ANTHROPIC_BASE_URL.rstrip('/')}/v1/messages"
headers = {
"x-api-key": ANTHROPIC_API_KEY,
"anthropic-version": ANTHROPIC_VERSION,
"content-type": "application/json",
"accept": "application/json",
}
logger.info(
"[OAI→ANTH] %s → %s stream=%s",
model_in, anth_body["model"], is_stream,
)
if is_stream:
return await _proxy_stream(
url, headers, anth_body,
converter=_stream_anth_as_oai,
model_label=model_in,
)
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
resp = await client.post(url, json=anth_body, headers=headers)
if resp.status_code != 200:
raise HTTPException(status_code=resp.status_code, detail=resp.text)
return JSONResponse(anthropic_resp_to_openai(resp.json(), model_in))
# ────────────────────────────────────────────────────────────────
# ENDPOINT: /v1/messages
# Recibe Anthropic β†’ reenvΓ­a a OpenAI β†’ devuelve Anthropic
# ────────────────────────────────────────────────────────────────
@app.post("/v1/messages")
@app.post("/messages")
async def ep_messages(request: Request):
_check_auth(request)
_require_key("OPENAI_API_KEY", OPENAI_API_KEY)
body = await request.json()
is_stream = body.get("stream", False)
model_in = body.get("model", "")
oai_body = anthropic_req_to_openai(body)
url = f"{OPENAI_BASE_URL.rstrip('/')}/v1/chat/completions"
headers = {
"Authorization": f"Bearer {OPENAI_API_KEY}",
"Content-Type": "application/json",
}
logger.info(
"[ANTH→OAI] %s → %s stream=%s",
model_in, oai_body["model"], is_stream,
)
if is_stream:
return await _proxy_stream(
url, headers, oai_body,
converter=_stream_oai_as_anth,
model_label=model_in,
)
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
resp = await client.post(url, json=oai_body, headers=headers)
if resp.status_code != 200:
raise HTTPException(status_code=resp.status_code, detail=resp.text)
return JSONResponse(openai_resp_to_anthropic(resp.json(), model_in))
# ────────────────────────────────────────────────────────────────
# Utilidad genΓ©rica para streaming
# ────────────────────────────────────────────────────────────────
async def _proxy_stream(
url: str,
headers: dict,
body: dict,
converter,
model_label: str,
) -> StreamingResponse:
client = httpx.AsyncClient(timeout=httpx.Timeout(timeout=HTTP_TIMEOUT))
try:
req = client.build_request("POST", url, json=body, headers=headers)
resp = await client.send(req, stream=True)
if resp.status_code != 200:
err = await resp.aread()
await resp.aclose()
await client.aclose()
raise HTTPException(status_code=resp.status_code, detail=err.decode())
except HTTPException:
raise
except Exception as exc:
await client.aclose()
raise HTTPException(status_code=502, detail=str(exc))
async def _generate():
try:
async for chunk in converter(resp, model_label):
yield chunk
finally:
await resp.aclose()
await client.aclose()
return StreamingResponse(
_generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
"Connection": "keep-alive",
},
)
# ────────────────────────────────────────────────────────────────
# Listado de modelos & Health
# ────────────────────────────────────────────────────────────────
@app.get("/v1/models")
@app.get("/models")
async def ep_list_models():
seen: set = set()
data: list = []
for mid in list(MODEL_OAI_TO_ANTH) + list(MODEL_ANTH_TO_OAI):
if mid not in seen:
seen.add(mid)
data.append({
"id": mid,
"object": "model",
"created": _now(),
"owned_by": "proxy",
})
return {"object": "list", "data": data}
@app.get("/health")
@app.get("/")
async def ep_health():
return {
"status": "ok",
"anthropic_configured": bool(ANTHROPIC_API_KEY),
"openai_configured": bool(OPENAI_API_KEY),
"endpoints": {
"/v1/chat/completions": "OpenAI fmt β†’ Anthropic API β†’ OpenAI fmt",
"/v1/messages": "Anthropic fmt β†’ OpenAI API β†’ Anthropic fmt",
},
}
# ────────────────────────────────────────────────────────────────
# Arranque
# ────────────────────────────────────────────────────────────────
if __name__ == "__main__":
banner = """
╔═══════════════════════════════════════════════╗
β•‘ Proxy Bidireccional OpenAI ↔ Anthropic β•‘
β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
"""
print(banner)
warns = []
if not ANTHROPIC_API_KEY:
warns.append("ANTHROPIC_API_KEY β†’ /v1/chat/completions NO funcionarΓ‘")
if not OPENAI_API_KEY:
warns.append("OPENAI_API_KEY β†’ /v1/messages NO funcionarΓ‘")
for w in warns:
logger.warning("⚠ %s", w)
logger.info("Anthropic URL : %s", ANTHROPIC_BASE_URL)
logger.info("OpenAI URL : %s", OPENAI_BASE_URL)
logger.info("Escuchando en : http://%s:%s", PROXY_HOST, PROXY_PORT)
uvicorn.run(
app,
host=PROXY_HOST,
port=int(PROXY_PORT),
log_level="info",
)