Spaces:
Sleeping
Sleeping
| import json | |
| import uuid | |
| from typing import AsyncIterator | |
| import litellm | |
| from fastapi.responses import StreamingResponse | |
| litellm.set_verbose = False | |
| def anthropic_to_openai_messages(messages: list, system: str | None) -> list: | |
| openai_msgs = [] | |
| if system: | |
| openai_msgs.append({"role": "system", "content": system}) | |
| for msg in messages: | |
| role = msg["role"] | |
| content = msg["content"] | |
| if isinstance(content, str): | |
| openai_msgs.append({"role": role, "content": content}) | |
| elif isinstance(content, list): | |
| parts = [] | |
| for block in content: | |
| btype = block.get("type") | |
| if btype == "text": | |
| parts.append({"type": "text", "text": block["text"]}) | |
| elif btype == "image": | |
| src = block.get("source", {}) | |
| if src.get("type") == "base64": | |
| url = f"data:{src['media_type']};base64,{src['data']}" | |
| else: | |
| url = src.get("url", "") | |
| parts.append({"type": "image_url", "image_url": {"url": url}}) | |
| elif btype in ("tool_use", "tool_result"): | |
| parts.append({"type": "text", "text": json.dumps(block)}) | |
| openai_msgs.append({ | |
| "role": role, | |
| "content": parts if len(parts) > 1 else (parts[0]["text"] if parts else ""), | |
| }) | |
| return openai_msgs | |
| _STOP_REASON_MAP = { | |
| "stop": "end_turn", | |
| "length": "max_tokens", | |
| "content_filter": "stop_sequence", | |
| "tool_calls": "tool_use", | |
| } | |
| def openai_response_to_anthropic(oai_resp, original_model: str) -> dict: | |
| choice = oai_resp.choices[0] | |
| usage = oai_resp.usage | |
| return { | |
| "id": f"msg_{uuid.uuid4().hex[:24]}", | |
| "type": "message", | |
| "role": "assistant", | |
| "content": [{"type": "text", "text": choice.message.content or ""}], | |
| "model": original_model, | |
| "stop_reason": _STOP_REASON_MAP.get(choice.finish_reason or "stop", "end_turn"), | |
| "stop_sequence": None, | |
| "usage": { | |
| "input_tokens": getattr(usage, "prompt_tokens", 0) if usage else 0, | |
| "output_tokens": getattr(usage, "completion_tokens", 0) if usage else 0, | |
| }, | |
| } | |
| async def stream_anthropic_sse(params: dict, original_model: str) -> AsyncIterator[str]: | |
| msg_id = f"msg_{uuid.uuid4().hex[:24]}" | |
| def _sse(event: str, data: dict) -> str: | |
| return f"event: {event}\ndata: {json.dumps(data)}\n\n" | |
| yield _sse("message_start", { | |
| "type": "message_start", | |
| "message": { | |
| "id": msg_id, "type": "message", "role": "assistant", | |
| "content": [], "model": original_model, | |
| "stop_reason": None, "stop_sequence": None, | |
| "usage": {"input_tokens": 0, "output_tokens": 0}, | |
| }, | |
| }) | |
| yield _sse("content_block_start", { | |
| "type": "content_block_start", "index": 0, | |
| "content_block": {"type": "text", "text": ""}, | |
| }) | |
| yield _sse("ping", {"type": "ping"}) | |
| output_tokens = 0 | |
| stop_reason = "end_turn" | |
| input_tokens = 0 | |
| try: | |
| response = await litellm.acompletion(**params) | |
| async for chunk in response: | |
| delta_content = None | |
| if chunk.choices: | |
| delta_content = chunk.choices[0].delta.content | |
| finish = chunk.choices[0].finish_reason | |
| if finish: | |
| stop_reason = _STOP_REASON_MAP.get(finish, "end_turn") | |
| if delta_content: | |
| output_tokens += 1 | |
| yield _sse("content_block_delta", { | |
| "type": "content_block_delta", "index": 0, | |
| "delta": {"type": "text_delta", "text": delta_content}, | |
| }) | |
| if hasattr(chunk, "usage") and chunk.usage: | |
| input_tokens = getattr(chunk.usage, "prompt_tokens", input_tokens) | |
| output_tokens = getattr(chunk.usage, "completion_tokens", output_tokens) | |
| except Exception as exc: | |
| yield _sse("error", {"type": "error", "error": {"type": "api_error", "message": str(exc)}}) | |
| return | |
| yield _sse("content_block_stop", {"type": "content_block_stop", "index": 0}) | |
| yield _sse("message_delta", { | |
| "type": "message_delta", | |
| "delta": {"stop_reason": stop_reason, "stop_sequence": None}, | |
| "usage": {"output_tokens": output_tokens}, | |
| }) | |
| yield _sse("message_stop", {"type": "message_stop"}) | |
| async def handle_messages_request(body: dict, proxy_config): | |
| from .auth import decrypt_api_key | |
| anthropic_model = body.get("model", "claude-3-opus-20240229") | |
| messages = body.get("messages", []) | |
| system = body.get("system") | |
| max_tokens = body.get("max_tokens", 1024) | |
| temperature = body.get("temperature", 1.0) | |
| stream = body.get("stream", False) | |
| top_p = body.get("top_p") | |
| stop_seqs = body.get("stop_sequences") | |
| try: | |
| model_mapping = json.loads(proxy_config.model_mapping or "{}") | |
| except Exception: | |
| model_mapping = {} | |
| openai_model = model_mapping.get(anthropic_model, anthropic_model) | |
| openai_msgs = anthropic_to_openai_messages(messages, system) | |
| api_key = decrypt_api_key(proxy_config.encrypted_api_key) | |
| params: dict = { | |
| "model": f"openai/{openai_model}", | |
| "messages": openai_msgs, | |
| "max_tokens": max_tokens, | |
| "temperature": temperature, | |
| "stream": stream, | |
| "api_key": api_key, | |
| "api_base": proxy_config.openai_base_url.rstrip("/"), | |
| } | |
| if top_p is not None: | |
| params["top_p"] = top_p | |
| if stop_seqs: | |
| params["stop"] = stop_seqs | |
| if stream: | |
| return StreamingResponse( | |
| stream_anthropic_sse(params, anthropic_model), | |
| media_type="text/event-stream", | |
| headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, | |
| ) | |
| response = await litellm.acompletion(**params) | |
| return openai_response_to_anthropic(response, anthropic_model) | |