import json import uuid from typing import AsyncIterator import litellm from fastapi.responses import StreamingResponse litellm.set_verbose = False def anthropic_to_openai_messages(messages: list, system: str | None) -> list: openai_msgs = [] if system: openai_msgs.append({"role": "system", "content": system}) for msg in messages: role = msg["role"] content = msg["content"] if isinstance(content, str): openai_msgs.append({"role": role, "content": content}) elif isinstance(content, list): parts = [] for block in content: btype = block.get("type") if btype == "text": parts.append({"type": "text", "text": block["text"]}) elif btype == "image": src = block.get("source", {}) if src.get("type") == "base64": url = f"data:{src['media_type']};base64,{src['data']}" else: url = src.get("url", "") parts.append({"type": "image_url", "image_url": {"url": url}}) elif btype in ("tool_use", "tool_result"): parts.append({"type": "text", "text": json.dumps(block)}) openai_msgs.append({ "role": role, "content": parts if len(parts) > 1 else (parts[0]["text"] if parts else ""), }) return openai_msgs _STOP_REASON_MAP = { "stop": "end_turn", "length": "max_tokens", "content_filter": "stop_sequence", "tool_calls": "tool_use", } def openai_response_to_anthropic(oai_resp, original_model: str) -> dict: choice = oai_resp.choices[0] usage = oai_resp.usage return { "id": f"msg_{uuid.uuid4().hex[:24]}", "type": "message", "role": "assistant", "content": [{"type": "text", "text": choice.message.content or ""}], "model": original_model, "stop_reason": _STOP_REASON_MAP.get(choice.finish_reason or "stop", "end_turn"), "stop_sequence": None, "usage": { "input_tokens": getattr(usage, "prompt_tokens", 0) if usage else 0, "output_tokens": getattr(usage, "completion_tokens", 0) if usage else 0, }, } async def stream_anthropic_sse(params: dict, original_model: str) -> AsyncIterator[str]: msg_id = f"msg_{uuid.uuid4().hex[:24]}" def _sse(event: str, data: dict) -> str: return f"event: {event}\ndata: {json.dumps(data)}\n\n" yield _sse("message_start", { "type": "message_start", "message": { "id": msg_id, "type": "message", "role": "assistant", "content": [], "model": original_model, "stop_reason": None, "stop_sequence": None, "usage": {"input_tokens": 0, "output_tokens": 0}, }, }) yield _sse("content_block_start", { "type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""}, }) yield _sse("ping", {"type": "ping"}) output_tokens = 0 stop_reason = "end_turn" input_tokens = 0 try: response = await litellm.acompletion(**params) async for chunk in response: delta_content = None if chunk.choices: delta_content = chunk.choices[0].delta.content finish = chunk.choices[0].finish_reason if finish: stop_reason = _STOP_REASON_MAP.get(finish, "end_turn") if delta_content: output_tokens += 1 yield _sse("content_block_delta", { "type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": delta_content}, }) if hasattr(chunk, "usage") and chunk.usage: input_tokens = getattr(chunk.usage, "prompt_tokens", input_tokens) output_tokens = getattr(chunk.usage, "completion_tokens", output_tokens) except Exception as exc: yield _sse("error", {"type": "error", "error": {"type": "api_error", "message": str(exc)}}) return yield _sse("content_block_stop", {"type": "content_block_stop", "index": 0}) yield _sse("message_delta", { "type": "message_delta", "delta": {"stop_reason": stop_reason, "stop_sequence": None}, "usage": {"output_tokens": output_tokens}, }) yield _sse("message_stop", {"type": "message_stop"}) async def handle_messages_request(body: dict, proxy_config): from .auth import decrypt_api_key anthropic_model = body.get("model", "claude-3-opus-20240229") messages = body.get("messages", []) system = body.get("system") max_tokens = body.get("max_tokens", 1024) temperature = body.get("temperature", 1.0) stream = body.get("stream", False) top_p = body.get("top_p") stop_seqs = body.get("stop_sequences") try: model_mapping = json.loads(proxy_config.model_mapping or "{}") except Exception: model_mapping = {} openai_model = model_mapping.get(anthropic_model, anthropic_model) openai_msgs = anthropic_to_openai_messages(messages, system) api_key = decrypt_api_key(proxy_config.encrypted_api_key) params: dict = { "model": f"openai/{openai_model}", "messages": openai_msgs, "max_tokens": max_tokens, "temperature": temperature, "stream": stream, "api_key": api_key, "api_base": proxy_config.openai_base_url.rstrip("/"), } if top_p is not None: params["top_p"] = top_p if stop_seqs: params["stop"] = stop_seqs if stream: return StreamingResponse( stream_anthropic_sse(params, anthropic_model), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, ) response = await litellm.acompletion(**params) return openai_response_to_anthropic(response, anthropic_model)