#!/usr/bin/env python3 """ ═══════════════════════════════════════════════════════════════════ Proxy Bidireccional: OpenAI ↔ Anthropic ═══════════════════════════════════════════════════════════════════ Endpoints expuestos: POST /v1/chat/completions → Recibe formato OpenAI, convierte a Anthropic, reenvía a la API de Anthropic y devuelve la respuesta en formato OpenAI. Soporta streaming. POST /v1/messages → Recibe formato Anthropic, convierte a OpenAI, reenvía a la API de OpenAI y devuelve la respuesta en formato Anthropic. Soporta streaming. GET /v1/models → Lista de modelos disponibles GET /health → Estado del servidor Variables de entorno (ver .env.example): ANTHROPIC_API_KEY, ANTHROPIC_BASE_URL, OPENAI_API_KEY, OPENAI_BASE_URL, PROXY_HOST, PROXY_PORT, PROXY_API_KEY ═══════════════════════════════════════════════════════════════════ """ from __future__ import annotations import json import logging import os import re import time import uuid from typing import Any, AsyncGenerator, Dict, List, Optional import httpx import uvicorn from dotenv import load_dotenv from fastapi import FastAPI, HTTPException, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, StreamingResponse # ──────────────────────────────────────────────────────────────── # Configuración # ──────────────────────────────────────────────────────────────── load_dotenv() ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "") ANTHROPIC_BASE_URL = os.getenv("ANTHROPIC_BASE_URL", "https://api.anthropic.com") OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") OPENAI_BASE_URL = os.getenv("OPENAI_BASE_URL", "https://api.openai.com") PROXY_API_KEY = os.getenv("PROXY_API_KEY", "") PROXY_HOST = os.getenv("PROXY_HOST", "0.0.0.0") PROXY_PORT = int(os.getenv("PROXY_PORT", "8000")) ANTHROPIC_VERSION = "2023-06-01" DEFAULT_MAX_TOKENS = 4096 HTTP_TIMEOUT = 300.0 # segundos logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) logger = logging.getLogger("proxy") # ──────────────────────────────────────────────────────────────── # Mapeo de modelos # ──────────────────────────────────────────────────────────────── MODEL_OAI_TO_ANTH: Dict[str, str] = { "claude-opus-4.6": "claude-opus-4-6", } MODEL_ANTH_TO_OAI: Dict[str, str] = { "claude-3-opus-20240229": "gpt-4", } # ──────────────────────────────────────────────────────────────── # FastAPI # ──────────────────────────────────────────────────────────────── app = FastAPI(title="OpenAI ↔ Anthropic Proxy") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) # ──────────────────────────────────────────────────────────────── # Utilidades # ──────────────────────────────────────────────────────────────── def _gen_id(prefix: str = "chatcmpl") -> str: return f"{prefix}-{uuid.uuid4().hex[:29]}" def _now() -> int: return int(time.time()) def _finish_to_openai(reason: Optional[str]) -> Optional[str]: """end_turn → stop, max_tokens → length""" return { "end_turn": "stop", "max_tokens": "length", "stop_sequence": "stop", }.get(reason or "", reason) def _finish_to_anthropic(reason: Optional[str]) -> Optional[str]: """stop → end_turn, length → max_tokens""" return { "stop": "end_turn", "length": "max_tokens", }.get(reason or "", reason) # ──────────────────────────────────────────────────────────────── # Conversión de contenido (texto + imágenes) # ──────────────────────────────────────────────────────────────── def _oai_content_to_anth(content: Any) -> Any: """ OpenAI content (str | list[parts]) → Anthropic content (str | list[blocks]) """ if content is None: return "" if isinstance(content, str): return content blocks: list = [] for part in content: ptype = part.get("type", "text") if ptype == "text": blocks.append({"type": "text", "text": part.get("text", "")}) elif ptype == "image_url": url: str = part.get("image_url", {}).get("url", "") match = re.match( r"data:(image/[a-zA-Z+]+);base64,(.+)", url, re.DOTALL ) if match: blocks.append({ "type": "image", "source": { "type": "base64", "media_type": match.group(1), "data": match.group(2), }, }) else: # URL directa — Anthropic soporta URLs desde 2024 blocks.append({ "type": "image", "source": {"type": "url", "url": url}, }) return blocks or "" def _anth_content_to_text(content: Any) -> str: """Anthropic content (str | list[blocks]) → texto plano.""" if isinstance(content, str): return content if isinstance(content, list): return "".join( b.get("text", "") for b in content if isinstance(b, dict) and b.get("type") == "text" ) return str(content) if content else "" def _anth_content_to_oai_parts(content: Any) -> Any: """Anthropic content → OpenAI content (str o list de parts).""" if isinstance(content, str): return content if not isinstance(content, list): return str(content) if content else "" has_images = any( isinstance(b, dict) and b.get("type") == "image" for b in content ) if not has_images: return _anth_content_to_text(content) parts: list = [] for b in content: if not isinstance(b, dict): continue if b.get("type") == "text": parts.append({"type": "text", "text": b.get("text", "")}) elif b.get("type") == "image": src = b.get("source", {}) if src.get("type") == "base64": data_url = ( f"data:{src.get('media_type','image/png')};" f"base64,{src.get('data','')}" ) parts.append({"type": "image_url", "image_url": {"url": data_url}}) elif src.get("type") == "url": parts.append({"type": "image_url", "image_url": {"url": src.get("url", "")}}) return parts or "" # ──────────────────────────────────────────────────────────────── # Conversión de requests # ──────────────────────────────────────────────────────────────── def _merge_consecutive(msgs: List[Dict]) -> List[Dict]: """Anthropic no permite roles consecutivos iguales.""" merged: List[Dict] = [] for m in msgs: if not merged or merged[-1]["role"] != m["role"]: merged.append(m) continue # fusionar contenido prev, curr = merged[-1]["content"], m["content"] if isinstance(prev, str) and isinstance(curr, str): merged[-1]["content"] = prev + "\n" + curr else: if isinstance(prev, str): prev = [{"type": "text", "text": prev}] if prev else [] if isinstance(curr, str): curr = [{"type": "text", "text": curr}] if curr else [] merged[-1]["content"] = prev + curr return merged def openai_req_to_anthropic(oai: dict) -> dict: """ POST /v1/chat/completions body → cuerpo para POST Anthropic /v1/messages """ system_parts: List[str] = [] conv_msgs: List[Dict] = [] for msg in oai.get("messages", []): role = msg.get("role", "user") raw_content = msg.get("content", "") if role == "system": text = ( raw_content if isinstance(raw_content, str) else _anth_content_to_text(raw_content) ) system_parts.append(text) else: if role not in ("user", "assistant"): role = "user" # 'tool' / 'function' → user conv_msgs.append({ "role": role, "content": _oai_content_to_anth(raw_content), }) conv_msgs = _merge_consecutive(conv_msgs) # Anthropic exige que el primer mensaje sea 'user' if conv_msgs and conv_msgs[0]["role"] != "user": conv_msgs.insert(0, {"role": "user", "content": "[inicio de conversación]"}) if not conv_msgs: conv_msgs = [{"role": "user", "content": "Hola"}] anth: Dict[str, Any] = { "model": MODEL_OAI_TO_ANTH.get( oai.get("model", ""), oai.get("model", "claude-sonnet-4-20250514") ), "messages": conv_msgs, "max_tokens": oai.get("max_tokens") or oai.get("max_completion_tokens") or DEFAULT_MAX_TOKENS, } if system_parts: anth["system"] = "\n\n".join(system_parts) for key in ("temperature", "top_p"): if key in oai and oai[key] is not None: anth[key] = oai[key] if oai.get("stop") is not None: s = oai["stop"] anth["stop_sequences"] = [s] if isinstance(s, str) else s if oai.get("stream"): anth["stream"] = True return anth def anthropic_req_to_openai(anth: dict) -> dict: """ POST /v1/messages body → cuerpo para POST OpenAI /v1/chat/completions """ oai_msgs: List[Dict] = [] # system system = anth.get("system", "") if system: if isinstance(system, list): system = " ".join( b.get("text", "") for b in system if b.get("type") == "text" ) oai_msgs.append({"role": "system", "content": system}) for msg in anth.get("messages", []): oai_msgs.append({ "role": msg.get("role", "user"), "content": _anth_content_to_oai_parts(msg.get("content", "")), }) oai: Dict[str, Any] = { "model": MODEL_ANTH_TO_OAI.get( anth.get("model", ""), anth.get("model", "gpt-4o") ), "messages": oai_msgs, } if "max_tokens" in anth: oai["max_tokens"] = anth["max_tokens"] for key in ("temperature", "top_p"): if key in anth and anth[key] is not None: oai[key] = anth[key] if "stop_sequences" in anth: oai["stop"] = anth["stop_sequences"] if anth.get("stream"): oai["stream"] = True oai["stream_options"] = {"include_usage": True} return oai # ──────────────────────────────────────────────────────────────── # Conversión de responses (no-stream) # ──────────────────────────────────────────────────────────────── def anthropic_resp_to_openai(anth: dict, model: str) -> dict: usage = anth.get("usage", {}) inp = usage.get("input_tokens", 0) out = usage.get("output_tokens", 0) return { "id": _gen_id(), "object": "chat.completion", "created": _now(), "model": model, "choices": [{ "index": 0, "message": { "role": "assistant", "content": _anth_content_to_text(anth.get("content", [])), }, "finish_reason": _finish_to_openai(anth.get("stop_reason")), }], "usage": { "prompt_tokens": inp, "completion_tokens": out, "total_tokens": inp + out, }, } def openai_resp_to_anthropic(oai: dict, model: str) -> dict: choice = (oai.get("choices") or [{}])[0] usage = oai.get("usage", {}) content_text = choice.get("message", {}).get("content", "") return { "id": f"msg_{uuid.uuid4().hex[:24]}", "type": "message", "role": "assistant", "model": model, "content": [{"type": "text", "text": content_text}] if content_text else [], "stop_reason": _finish_to_anthropic(choice.get("finish_reason")), "stop_sequence": None, "usage": { "input_tokens": usage.get("prompt_tokens", 0), "output_tokens": usage.get("completion_tokens", 0), }, } # ──────────────────────────────────────────────────────────────── # Streaming: Anthropic SSE → OpenAI SSE # ──────────────────────────────────────────────────────────────── async def _stream_anth_as_oai( resp: httpx.Response, model: str, ) -> AsyncGenerator[str, None]: rid = _gen_id() created = _now() inp_tok = 0 # primer chunk: role yield _sse({ "id": rid, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [{"index": 0, "delta": {"role": "assistant", "content": ""}, "finish_reason": None}], }) async for line in resp.aiter_lines(): line = line.strip() if not line or line.startswith("event:"): continue if not line.startswith("data:"): continue raw = line[5:].strip() if not raw: continue try: data = json.loads(raw) except json.JSONDecodeError: continue evt = data.get("type", "") if evt == "message_start": inp_tok = ( data.get("message", {}) .get("usage", {}) .get("input_tokens", 0) ) elif evt == "content_block_delta": text = data.get("delta", {}).get("text", "") if text: yield _sse({ "id": rid, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [{"index": 0, "delta": {"content": text}, "finish_reason": None}], }) elif evt == "message_delta": sr = data.get("delta", {}).get("stop_reason") out = data.get("usage", {}).get("output_tokens", 0) yield _sse({ "id": rid, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [{"index": 0, "delta": {}, "finish_reason": _finish_to_openai(sr)}], "usage": { "prompt_tokens": inp_tok, "completion_tokens": out, "total_tokens": inp_tok + out, }, }) yield "data: [DONE]\n\n" # ──────────────────────────────────────────────────────────────── # Streaming: OpenAI SSE → Anthropic SSE # ──────────────────────────────────────────────────────────────── async def _stream_oai_as_anth( resp: httpx.Response, model: str, ) -> AsyncGenerator[str, None]: msg_id = f"msg_{uuid.uuid4().hex[:24]}" # message_start yield _anth_sse("message_start", { "type": "message_start", "message": { "id": msg_id, "type": "message", "role": "assistant", "content": [], "model": model, "stop_reason": None, "stop_sequence": None, "usage": {"input_tokens": 0, "output_tokens": 0}, }, }) # content_block_start yield _anth_sse("content_block_start", { "type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""}, }) finish_reason = None p_tok = 0 c_tok = 0 async for line in resp.aiter_lines(): line = line.strip() if not line.startswith("data:"): continue raw = line[5:].strip() if raw == "[DONE]": break if not raw: continue try: data = json.loads(raw) except json.JSONDecodeError: continue # uso acumulado if "usage" in data: p_tok = data["usage"].get("prompt_tokens", p_tok) c_tok = data["usage"].get("completion_tokens", c_tok) choices = data.get("choices", []) if not choices: continue choice = choices[0] delta = choice.get("delta", {}) fr = choice.get("finish_reason") if fr: finish_reason = fr text = delta.get("content", "") if text: yield _anth_sse("content_block_delta", { "type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": text}, }) # content_block_stop yield _anth_sse("content_block_stop", { "type": "content_block_stop", "index": 0, }) # message_delta yield _anth_sse("message_delta", { "type": "message_delta", "delta": { "stop_reason": _finish_to_anthropic(finish_reason), "stop_sequence": None, }, "usage": {"output_tokens": c_tok}, }) # message_stop yield _anth_sse("message_stop", {"type": "message_stop"}) # helpers SSE def _sse(obj: dict) -> str: return f"data: {json.dumps(obj, ensure_ascii=False)}\n\n" def _anth_sse(event: str, obj: dict) -> str: return f"event: {event}\ndata: {json.dumps(obj, ensure_ascii=False)}\n\n" # ──────────────────────────────────────────────────────────────── # Autenticación del proxy (opcional) # ──────────────────────────────────────────────────────────────── def _check_auth(request: Request) -> None: if not PROXY_API_KEY: return auth = request.headers.get("authorization", "") x_key = request.headers.get("x-api-key", "") token = "" if auth.lower().startswith("bearer "): token = auth[7:] elif x_key: token = x_key if token != PROXY_API_KEY: raise HTTPException(status_code=401, detail="API key inválida") def _require_key(name: str, value: str) -> None: if not value: raise HTTPException( status_code=500, detail=f"Variable de entorno {name} no configurada en el servidor", ) # ──────────────────────────────────────────────────────────────── # ENDPOINT: /v1/chat/completions # Recibe OpenAI → reenvía a Anthropic → devuelve OpenAI # ──────────────────────────────────────────────────────────────── @app.post("/v1/chat/completions") @app.post("/chat/completions") async def ep_chat_completions(request: Request): _check_auth(request) _require_key("ANTHROPIC_API_KEY", ANTHROPIC_API_KEY) body = await request.json() is_stream = body.get("stream", False) model_in = body.get("model", "") anth_body = openai_req_to_anthropic(body) url = f"{ANTHROPIC_BASE_URL.rstrip('/')}/v1/messages" headers = { "x-api-key": ANTHROPIC_API_KEY, "anthropic-version": ANTHROPIC_VERSION, "content-type": "application/json", "accept": "application/json", } logger.info( "[OAI→ANTH] %s → %s stream=%s", model_in, anth_body["model"], is_stream, ) if is_stream: return await _proxy_stream( url, headers, anth_body, converter=_stream_anth_as_oai, model_label=model_in, ) async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client: resp = await client.post(url, json=anth_body, headers=headers) if resp.status_code != 200: raise HTTPException(status_code=resp.status_code, detail=resp.text) return JSONResponse(anthropic_resp_to_openai(resp.json(), model_in)) # ──────────────────────────────────────────────────────────────── # ENDPOINT: /v1/messages # Recibe Anthropic → reenvía a OpenAI → devuelve Anthropic # ──────────────────────────────────────────────────────────────── @app.post("/v1/messages") @app.post("/messages") async def ep_messages(request: Request): _check_auth(request) _require_key("OPENAI_API_KEY", OPENAI_API_KEY) body = await request.json() is_stream = body.get("stream", False) model_in = body.get("model", "") oai_body = anthropic_req_to_openai(body) url = f"{OPENAI_BASE_URL.rstrip('/')}/v1/chat/completions" headers = { "Authorization": f"Bearer {OPENAI_API_KEY}", "Content-Type": "application/json", } logger.info( "[ANTH→OAI] %s → %s stream=%s", model_in, oai_body["model"], is_stream, ) if is_stream: return await _proxy_stream( url, headers, oai_body, converter=_stream_oai_as_anth, model_label=model_in, ) async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client: resp = await client.post(url, json=oai_body, headers=headers) if resp.status_code != 200: raise HTTPException(status_code=resp.status_code, detail=resp.text) return JSONResponse(openai_resp_to_anthropic(resp.json(), model_in)) # ──────────────────────────────────────────────────────────────── # Utilidad genérica para streaming # ──────────────────────────────────────────────────────────────── async def _proxy_stream( url: str, headers: dict, body: dict, converter, model_label: str, ) -> StreamingResponse: client = httpx.AsyncClient(timeout=httpx.Timeout(timeout=HTTP_TIMEOUT)) try: req = client.build_request("POST", url, json=body, headers=headers) resp = await client.send(req, stream=True) if resp.status_code != 200: err = await resp.aread() await resp.aclose() await client.aclose() raise HTTPException(status_code=resp.status_code, detail=err.decode()) except HTTPException: raise except Exception as exc: await client.aclose() raise HTTPException(status_code=502, detail=str(exc)) async def _generate(): try: async for chunk in converter(resp, model_label): yield chunk finally: await resp.aclose() await client.aclose() return StreamingResponse( _generate(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "X-Accel-Buffering": "no", "Connection": "keep-alive", }, ) # ──────────────────────────────────────────────────────────────── # Listado de modelos & Health # ──────────────────────────────────────────────────────────────── @app.get("/v1/models") @app.get("/models") async def ep_list_models(): seen: set = set() data: list = [] for mid in list(MODEL_OAI_TO_ANTH) + list(MODEL_ANTH_TO_OAI): if mid not in seen: seen.add(mid) data.append({ "id": mid, "object": "model", "created": _now(), "owned_by": "proxy", }) return {"object": "list", "data": data} @app.get("/health") @app.get("/") async def ep_health(): return { "status": "ok", "anthropic_configured": bool(ANTHROPIC_API_KEY), "openai_configured": bool(OPENAI_API_KEY), "endpoints": { "/v1/chat/completions": "OpenAI fmt → Anthropic API → OpenAI fmt", "/v1/messages": "Anthropic fmt → OpenAI API → Anthropic fmt", }, } # ──────────────────────────────────────────────────────────────── # Arranque # ──────────────────────────────────────────────────────────────── if __name__ == "__main__": banner = """ ╔═══════════════════════════════════════════════╗ ║ Proxy Bidireccional OpenAI ↔ Anthropic ║ ╚═══════════════════════════════════════════════╝ """ print(banner) warns = [] if not ANTHROPIC_API_KEY: warns.append("ANTHROPIC_API_KEY → /v1/chat/completions NO funcionará") if not OPENAI_API_KEY: warns.append("OPENAI_API_KEY → /v1/messages NO funcionará") for w in warns: logger.warning("⚠ %s", w) logger.info("Anthropic URL : %s", ANTHROPIC_BASE_URL) logger.info("OpenAI URL : %s", OPENAI_BASE_URL) logger.info("Escuchando en : http://%s:%s", PROXY_HOST, PROXY_PORT) uvicorn.run( app, host=PROXY_HOST, port=int(PROXY_PORT), log_level="info", )