Litellm-proxy / app /proxy_handler.py
NitinBot001's picture
Upload 7 files
d91bbbb verified
import json
import uuid
from typing import AsyncIterator
import litellm
from fastapi.responses import StreamingResponse
litellm.set_verbose = False
def anthropic_to_openai_messages(messages: list, system: str | None) -> list:
openai_msgs = []
if system:
openai_msgs.append({"role": "system", "content": system})
for msg in messages:
role = msg["role"]
content = msg["content"]
if isinstance(content, str):
openai_msgs.append({"role": role, "content": content})
elif isinstance(content, list):
parts = []
for block in content:
btype = block.get("type")
if btype == "text":
parts.append({"type": "text", "text": block["text"]})
elif btype == "image":
src = block.get("source", {})
if src.get("type") == "base64":
url = f"data:{src['media_type']};base64,{src['data']}"
else:
url = src.get("url", "")
parts.append({"type": "image_url", "image_url": {"url": url}})
elif btype in ("tool_use", "tool_result"):
parts.append({"type": "text", "text": json.dumps(block)})
openai_msgs.append({
"role": role,
"content": parts if len(parts) > 1 else (parts[0]["text"] if parts else ""),
})
return openai_msgs
_STOP_REASON_MAP = {
"stop": "end_turn",
"length": "max_tokens",
"content_filter": "stop_sequence",
"tool_calls": "tool_use",
}
def openai_response_to_anthropic(oai_resp, original_model: str) -> dict:
choice = oai_resp.choices[0]
usage = oai_resp.usage
return {
"id": f"msg_{uuid.uuid4().hex[:24]}",
"type": "message",
"role": "assistant",
"content": [{"type": "text", "text": choice.message.content or ""}],
"model": original_model,
"stop_reason": _STOP_REASON_MAP.get(choice.finish_reason or "stop", "end_turn"),
"stop_sequence": None,
"usage": {
"input_tokens": getattr(usage, "prompt_tokens", 0) if usage else 0,
"output_tokens": getattr(usage, "completion_tokens", 0) if usage else 0,
},
}
async def stream_anthropic_sse(params: dict, original_model: str) -> AsyncIterator[str]:
msg_id = f"msg_{uuid.uuid4().hex[:24]}"
def _sse(event: str, data: dict) -> str:
return f"event: {event}\ndata: {json.dumps(data)}\n\n"
yield _sse("message_start", {
"type": "message_start",
"message": {
"id": msg_id, "type": "message", "role": "assistant",
"content": [], "model": original_model,
"stop_reason": None, "stop_sequence": None,
"usage": {"input_tokens": 0, "output_tokens": 0},
},
})
yield _sse("content_block_start", {
"type": "content_block_start", "index": 0,
"content_block": {"type": "text", "text": ""},
})
yield _sse("ping", {"type": "ping"})
output_tokens = 0
stop_reason = "end_turn"
input_tokens = 0
try:
response = await litellm.acompletion(**params)
async for chunk in response:
delta_content = None
if chunk.choices:
delta_content = chunk.choices[0].delta.content
finish = chunk.choices[0].finish_reason
if finish:
stop_reason = _STOP_REASON_MAP.get(finish, "end_turn")
if delta_content:
output_tokens += 1
yield _sse("content_block_delta", {
"type": "content_block_delta", "index": 0,
"delta": {"type": "text_delta", "text": delta_content},
})
if hasattr(chunk, "usage") and chunk.usage:
input_tokens = getattr(chunk.usage, "prompt_tokens", input_tokens)
output_tokens = getattr(chunk.usage, "completion_tokens", output_tokens)
except Exception as exc:
yield _sse("error", {"type": "error", "error": {"type": "api_error", "message": str(exc)}})
return
yield _sse("content_block_stop", {"type": "content_block_stop", "index": 0})
yield _sse("message_delta", {
"type": "message_delta",
"delta": {"stop_reason": stop_reason, "stop_sequence": None},
"usage": {"output_tokens": output_tokens},
})
yield _sse("message_stop", {"type": "message_stop"})
async def handle_messages_request(body: dict, proxy_config):
from .auth import decrypt_api_key
anthropic_model = body.get("model", "claude-3-opus-20240229")
messages = body.get("messages", [])
system = body.get("system")
max_tokens = body.get("max_tokens", 1024)
temperature = body.get("temperature", 1.0)
stream = body.get("stream", False)
top_p = body.get("top_p")
stop_seqs = body.get("stop_sequences")
try:
model_mapping = json.loads(proxy_config.model_mapping or "{}")
except Exception:
model_mapping = {}
openai_model = model_mapping.get(anthropic_model, anthropic_model)
openai_msgs = anthropic_to_openai_messages(messages, system)
api_key = decrypt_api_key(proxy_config.encrypted_api_key)
params: dict = {
"model": f"openai/{openai_model}",
"messages": openai_msgs,
"max_tokens": max_tokens,
"temperature": temperature,
"stream": stream,
"api_key": api_key,
"api_base": proxy_config.openai_base_url.rstrip("/"),
}
if top_p is not None:
params["top_p"] = top_p
if stop_seqs:
params["stop"] = stop_seqs
if stream:
return StreamingResponse(
stream_anthropic_sse(params, anthropic_model),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
response = await litellm.acompletion(**params)
return openai_response_to_anthropic(response, anthropic_model)