Spaces:
Running
Running
| """ | |
| HuggingFace Spaces - OpenAI & Anthropic Compatible Coding API | |
| A free, skills-only API endpoint for coding tasks (like Codex/Claude Code) | |
| Author: Matrix Agent | |
| Features: | |
| - Full OpenAI API compatibility (/v1/chat/completions) | |
| - Full Anthropic API compatibility (/v1/messages) | |
| - Computer Use Agent (CUA) endpoint (/v1/cua) | |
| - Prefill Response Support (assistant message prefix for output control) | |
| - Thinking/Reasoning Content Block Support | |
| - Optimized for coding tasks | |
| - Runs on free HF Spaces (2 vCPU, 16GB RAM) | |
| API Specifications verified against: | |
| - OpenAI: https://platform.openai.com/docs/api-reference/chat/create | |
| - Anthropic: https://docs.anthropic.com/en/api/messages | |
| - Anthropic Computer Use: https://docs.anthropic.com/en/docs/agents-and-tools/computer-use | |
| - Prefill: https://platform.claude.com/docs/en/build-with-claude/prompt-engineering/prefill-claudes-response | |
| - MiniMax Anthropic: https://platform.minimax.io/docs/api-reference/text-anthropic-api | |
| """ | |
| import os | |
| import time | |
| import uuid | |
| import json | |
| import asyncio | |
| from typing import List, Optional, Union, Dict, Any, AsyncGenerator | |
| from contextlib import asynccontextmanager | |
| import torch | |
| from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer | |
| from threading import Thread | |
| from fastapi import FastAPI, HTTPException, Header, Request, Response | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import StreamingResponse, JSONResponse | |
| from pydantic import BaseModel, Field | |
| # ============================================================================ | |
| # Configuration | |
| # ============================================================================ | |
| MODEL_ID = os.getenv("MODEL_ID", "Qwen/Qwen2.5-Coder-1.5B-Instruct") | |
| ANTHROPIC_VERSION = "2023-06-01" | |
| MODEL_ALIASES = { | |
| # OpenAI-style model names | |
| "gpt-4": MODEL_ID, | |
| "gpt-4-turbo": MODEL_ID, | |
| "gpt-4o": MODEL_ID, | |
| "gpt-4o-mini": MODEL_ID, | |
| "gpt-3.5-turbo": MODEL_ID, | |
| "codex": MODEL_ID, | |
| "code-davinci-002": MODEL_ID, | |
| "o1": MODEL_ID, | |
| "o1-mini": MODEL_ID, | |
| # Anthropic-style model names | |
| "claude-3-opus-20240229": MODEL_ID, | |
| "claude-3-sonnet-20240229": MODEL_ID, | |
| "claude-3-haiku-20240307": MODEL_ID, | |
| "claude-3-5-sonnet-20241022": MODEL_ID, | |
| "claude-3-5-haiku-20241022": MODEL_ID, | |
| "claude-3-opus": MODEL_ID, | |
| "claude-3-sonnet": MODEL_ID, | |
| "claude-3-haiku": MODEL_ID, | |
| "claude-3-5-sonnet": MODEL_ID, | |
| "claude-code": MODEL_ID, | |
| # Computer Use Agent (CUA) model | |
| "sheikh-computer-use-preview": MODEL_ID, | |
| "computer-use-preview": MODEL_ID, | |
| } | |
| API_KEY = os.getenv("API_KEY", "sk-free-coding-api") | |
| MAX_TOKENS_DEFAULT = 2048 | |
| TEMPERATURE_DEFAULT = 0.7 | |
| # ============================================================================ | |
| # Global Model Instance | |
| # ============================================================================ | |
| model = None | |
| tokenizer = None | |
| def load_model(): | |
| """Load model with CPU optimization""" | |
| global model, tokenizer | |
| print(f"π Loading model: {MODEL_ID}") | |
| print(f"π Device: CPU (Free HF Spaces)") | |
| tokenizer = AutoTokenizer.from_pretrained( | |
| MODEL_ID, | |
| trust_remote_code=True, | |
| padding_side="left" | |
| ) | |
| if tokenizer.pad_token is None: | |
| tokenizer.pad_token = tokenizer.eos_token | |
| model = AutoModelForCausalLM.from_pretrained( | |
| MODEL_ID, | |
| torch_dtype=torch.float32, | |
| device_map="cpu", | |
| trust_remote_code=True, | |
| low_cpu_mem_usage=True, | |
| ) | |
| model.eval() | |
| print("β Model loaded successfully!") | |
| return model, tokenizer | |
| # ============================================================================ | |
| # Pydantic Models - OpenAI Compatible | |
| # ============================================================================ | |
| class OpenAIContentPart(BaseModel): | |
| type: str | |
| text: Optional[str] = None | |
| image_url: Optional[Dict[str, str]] = None | |
| class OpenAIMessage(BaseModel): | |
| role: str | |
| content: Optional[Union[str, List[OpenAIContentPart]]] = None | |
| name: Optional[str] = None | |
| tool_calls: Optional[List[Dict]] = None | |
| tool_call_id: Optional[str] = None | |
| class OpenAIResponseFormat(BaseModel): | |
| type: str = "text" | |
| json_schema: Optional[Dict] = None | |
| class OpenAIChatRequest(BaseModel): | |
| model: str | |
| messages: List[OpenAIMessage] | |
| temperature: Optional[float] = Field(default=1.0, ge=0, le=2) | |
| top_p: Optional[float] = Field(default=1.0, ge=0, le=1) | |
| n: Optional[int] = Field(default=1, ge=1, le=10) | |
| stream: Optional[bool] = False | |
| stop: Optional[Union[str, List[str]]] = None | |
| max_tokens: Optional[int] = None | |
| max_completion_tokens: Optional[int] = None | |
| presence_penalty: Optional[float] = Field(default=0, ge=-2, le=2) | |
| frequency_penalty: Optional[float] = Field(default=0, ge=-2, le=2) | |
| logit_bias: Optional[Dict[str, float]] = None | |
| logprobs: Optional[bool] = False | |
| top_logprobs: Optional[int] = None | |
| user: Optional[str] = None | |
| seed: Optional[int] = None | |
| tools: Optional[List[Dict]] = None | |
| tool_choice: Optional[Union[str, Dict]] = None | |
| response_format: Optional[OpenAIResponseFormat] = None | |
| stream_options: Optional[Dict] = None | |
| class OpenAIChoiceMessage(BaseModel): | |
| role: str = "assistant" | |
| content: Optional[str] = None | |
| tool_calls: Optional[List[Dict]] = None | |
| class OpenAIChoice(BaseModel): | |
| index: int | |
| message: OpenAIChoiceMessage | |
| finish_reason: Optional[str] = None | |
| logprobs: Optional[Dict] = None | |
| class OpenAIStreamChoice(BaseModel): | |
| index: int | |
| delta: Dict | |
| finish_reason: Optional[str] = None | |
| logprobs: Optional[Dict] = None | |
| class OpenAIUsage(BaseModel): | |
| prompt_tokens: int | |
| completion_tokens: int | |
| total_tokens: int | |
| prompt_tokens_details: Optional[Dict] = None | |
| completion_tokens_details: Optional[Dict] = None | |
| class OpenAIChatResponse(BaseModel): | |
| id: str | |
| object: str = "chat.completion" | |
| created: int | |
| model: str | |
| choices: List[OpenAIChoice] | |
| usage: Optional[OpenAIUsage] = None | |
| system_fingerprint: Optional[str] = None | |
| service_tier: Optional[str] = None | |
| class OpenAIModelInfo(BaseModel): | |
| id: str | |
| object: str = "model" | |
| created: int | |
| owned_by: str = "hf-spaces" | |
| class OpenAIModelsResponse(BaseModel): | |
| object: str = "list" | |
| data: List[OpenAIModelInfo] | |
| # ============================================================================ | |
| # Pydantic Models - Anthropic Compatible (with Thinking & Prefill support) | |
| # ============================================================================ | |
| class AnthropicTextBlock(BaseModel): | |
| type: str = "text" | |
| text: str | |
| class AnthropicImageSource(BaseModel): | |
| type: str = "base64" | |
| media_type: str | |
| data: str | |
| class AnthropicImageBlock(BaseModel): | |
| type: str = "image" | |
| source: AnthropicImageSource | |
| class AnthropicThinkingBlock(BaseModel): | |
| """Thinking/reasoning content block""" | |
| type: str = "thinking" | |
| thinking: str | |
| AnthropicContentBlock = Union[AnthropicTextBlock, AnthropicImageBlock, AnthropicThinkingBlock, Dict] | |
| class AnthropicMessage(BaseModel): | |
| role: str # "user", "assistant" | |
| content: Union[str, List[AnthropicContentBlock]] | |
| class AnthropicTool(BaseModel): | |
| name: str | |
| description: Optional[str] = None | |
| input_schema: Dict | |
| class AnthropicToolChoice(BaseModel): | |
| type: str | |
| name: Optional[str] = None | |
| class AnthropicThinkingConfig(BaseModel): | |
| """Configuration for thinking/reasoning mode""" | |
| type: str = "enabled" # "enabled" or "disabled" | |
| budget_tokens: Optional[int] = None # Token budget for thinking | |
| class AnthropicRequest(BaseModel): | |
| """Full Anthropic Messages API request with thinking & prefill support""" | |
| model: str | |
| messages: List[AnthropicMessage] | |
| max_tokens: int | |
| # Optional parameters | |
| system: Optional[Union[str, List[Dict]]] = None | |
| temperature: Optional[float] = Field(default=1.0, ge=0, le=1) | |
| top_p: Optional[float] = Field(default=0.999, ge=0, le=1) | |
| top_k: Optional[int] = None | |
| stream: Optional[bool] = False | |
| stop_sequences: Optional[List[str]] = None | |
| # Tool use | |
| tools: Optional[List[AnthropicTool]] = None | |
| tool_choice: Optional[AnthropicToolChoice] = None | |
| # Thinking/reasoning support | |
| thinking: Optional[AnthropicThinkingConfig] = None | |
| # Metadata | |
| metadata: Optional[Dict] = None | |
| class AnthropicResponseContent(BaseModel): | |
| type: str = "text" | |
| text: Optional[str] = None | |
| # For thinking blocks | |
| thinking: Optional[str] = None | |
| # For tool_use | |
| id: Optional[str] = None | |
| name: Optional[str] = None | |
| input: Optional[Dict] = None | |
| class AnthropicUsage(BaseModel): | |
| input_tokens: int | |
| output_tokens: int | |
| class AnthropicResponse(BaseModel): | |
| id: str | |
| type: str = "message" | |
| role: str = "assistant" | |
| model: str | |
| content: List[AnthropicResponseContent] | |
| stop_reason: Optional[str] = None | |
| stop_sequence: Optional[str] = None | |
| usage: AnthropicUsage | |
| # ============================================================================ | |
| # Content Parsing Utilities | |
| # ============================================================================ | |
| def extract_text_from_openai_content(content: Union[str, List, None]) -> str: | |
| if content is None: | |
| return "" | |
| if isinstance(content, str): | |
| return content | |
| if isinstance(content, list): | |
| text_parts = [] | |
| for part in content: | |
| if isinstance(part, dict): | |
| if part.get("type") == "text": | |
| text_parts.append(part.get("text", "")) | |
| elif hasattr(part, "type") and part.type == "text": | |
| text_parts.append(part.text or "") | |
| return "\n".join(text_parts) | |
| return str(content) | |
| def extract_text_from_anthropic_content(content: Union[str, List]) -> str: | |
| if isinstance(content, str): | |
| return content | |
| if isinstance(content, list): | |
| text_parts = [] | |
| for block in content: | |
| if isinstance(block, dict): | |
| if block.get("type") == "text": | |
| text_parts.append(block.get("text", "")) | |
| elif block.get("type") == "thinking": | |
| pass # Skip thinking blocks in extraction | |
| elif hasattr(block, "type"): | |
| if block.type == "text": | |
| text_parts.append(block.text or "") | |
| return "\n".join(text_parts) | |
| return str(content) | |
| def extract_system_prompt_anthropic(system: Union[str, List[Dict], None]) -> str: | |
| if system is None: | |
| return "" | |
| if isinstance(system, str): | |
| return system | |
| if isinstance(system, list): | |
| text_parts = [] | |
| for block in system: | |
| if isinstance(block, dict) and block.get("type") == "text": | |
| text_parts.append(block.get("text", "")) | |
| return "\n".join(text_parts) | |
| return "" | |
| def extract_prefill_from_messages(messages: List[Dict]) -> tuple[List[Dict], str]: | |
| """ | |
| Extract prefill content if the last message is from assistant. | |
| Returns (messages_without_prefill, prefill_text) | |
| Prefill allows controlling output by providing initial assistant response. | |
| See: https://platform.claude.com/docs/en/build-with-claude/prompt-engineering/prefill-claudes-response | |
| """ | |
| if not messages: | |
| return messages, "" | |
| last_msg = messages[-1] | |
| if last_msg.get("role") == "assistant": | |
| prefill = last_msg.get("content", "") | |
| # Prefill cannot end with trailing whitespace | |
| if isinstance(prefill, str): | |
| prefill = prefill.rstrip() | |
| return messages[:-1], prefill | |
| return messages, "" | |
| # ============================================================================ | |
| # Message Formatting with Prefill Support | |
| # ============================================================================ | |
| def format_messages_for_model( | |
| messages: List[Dict], | |
| system_prompt: Optional[str] = None, | |
| prefill: str = "" | |
| ) -> str: | |
| """ | |
| Format messages for the model using chat template. | |
| Supports prefill for controlling output format. | |
| """ | |
| formatted_messages = [] | |
| if system_prompt: | |
| formatted_messages.append({"role": "system", "content": system_prompt}) | |
| for msg in messages: | |
| role = msg.get("role", "user") | |
| content = msg.get("content", "") | |
| if role == "tool": | |
| role = "user" | |
| formatted_messages.append({"role": role, "content": content}) | |
| # Use tokenizer's chat template if available | |
| if hasattr(tokenizer, 'apply_chat_template') and tokenizer.chat_template: | |
| try: | |
| prompt = tokenizer.apply_chat_template( | |
| formatted_messages, | |
| tokenize=False, | |
| add_generation_prompt=True | |
| ) | |
| # Append prefill if provided | |
| if prefill: | |
| prompt = prompt + prefill | |
| return prompt | |
| except Exception: | |
| pass | |
| # Fallback format | |
| prompt = "" | |
| for msg in formatted_messages: | |
| role = msg["role"] | |
| content = msg["content"] | |
| if role == "system": | |
| prompt += f"<|system|>\n{content}\n" | |
| elif role == "user": | |
| prompt += f"<|user|>\n{content}\n" | |
| elif role == "assistant": | |
| prompt += f"<|assistant|>\n{content}\n" | |
| prompt += "<|assistant|>\n" | |
| # Append prefill | |
| if prefill: | |
| prompt = prompt + prefill | |
| return prompt | |
| # ============================================================================ | |
| # Generation Logic with Thinking Support | |
| # ============================================================================ | |
| def generate_response( | |
| prompt: str, | |
| max_tokens: int = MAX_TOKENS_DEFAULT, | |
| temperature: float = TEMPERATURE_DEFAULT, | |
| top_p: float = 0.95, | |
| top_k: Optional[int] = None, | |
| stop: Optional[List[str]] = None, | |
| enable_thinking: bool = False, | |
| thinking_budget: int = 512, | |
| ) -> tuple[str, str, int, int, str]: | |
| """ | |
| Generate response from the model. | |
| Returns: (response_text, thinking_text, input_tokens, output_tokens, stop_reason) | |
| """ | |
| inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=4096) | |
| input_length = inputs.input_ids.shape[1] | |
| gen_kwargs = { | |
| "max_new_tokens": max_tokens, | |
| "temperature": max(temperature, 0.01), | |
| "top_p": top_p, | |
| "do_sample": temperature > 0, | |
| "pad_token_id": tokenizer.pad_token_id, | |
| "eos_token_id": tokenizer.eos_token_id, | |
| } | |
| if top_k is not None and top_k > 0: | |
| gen_kwargs["top_k"] = top_k | |
| with torch.no_grad(): | |
| outputs = model.generate(inputs.input_ids, **gen_kwargs) | |
| generated_tokens = outputs[0][input_length:] | |
| response_text = tokenizer.decode(generated_tokens, skip_special_tokens=True) | |
| output_length = len(generated_tokens) | |
| stop_reason = "stop" | |
| thinking_text = "" | |
| # Simulate thinking by extracting <think>...</think> blocks if present | |
| if enable_thinking and "<think>" in response_text: | |
| import re | |
| think_match = re.search(r"<think>(.*?)</think>", response_text, re.DOTALL) | |
| if think_match: | |
| thinking_text = think_match.group(1).strip() | |
| response_text = re.sub(r"<think>.*?</think>", "", response_text, flags=re.DOTALL).strip() | |
| # Handle stop sequences | |
| if stop: | |
| for stop_seq in stop: | |
| if stop_seq in response_text: | |
| response_text = response_text.split(stop_seq)[0] | |
| stop_reason = "stop" | |
| break | |
| if output_length >= max_tokens: | |
| stop_reason = "length" | |
| return response_text.strip(), thinking_text, input_length, output_length, stop_reason | |
| async def generate_stream( | |
| prompt: str, | |
| max_tokens: int = MAX_TOKENS_DEFAULT, | |
| temperature: float = TEMPERATURE_DEFAULT, | |
| top_p: float = 0.95, | |
| top_k: Optional[int] = None, | |
| ) -> AsyncGenerator[str, None]: | |
| """Stream generation for real-time responses""" | |
| inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=4096) | |
| streamer = TextIteratorStreamer(tokenizer, skip_special_tokens=True, skip_prompt=True) | |
| gen_kwargs = { | |
| "max_new_tokens": max_tokens, | |
| "temperature": max(temperature, 0.01), | |
| "top_p": top_p, | |
| "do_sample": temperature > 0, | |
| "pad_token_id": tokenizer.pad_token_id, | |
| "eos_token_id": tokenizer.eos_token_id, | |
| "streamer": streamer, | |
| } | |
| if top_k is not None and top_k > 0: | |
| gen_kwargs["top_k"] = top_k | |
| thread = Thread(target=lambda: model.generate(inputs.input_ids, **gen_kwargs)) | |
| thread.start() | |
| for text in streamer: | |
| yield text | |
| thread.join() | |
| # ============================================================================ | |
| # FastAPI Application | |
| # ============================================================================ | |
| async def lifespan(app: FastAPI): | |
| load_model() | |
| yield | |
| app = FastAPI( | |
| title="Free Coding API", | |
| description="OpenAI & Anthropic compatible API with Files, Skills, Batches, CUA, Prefill & Thinking support", | |
| version="1.3.0", | |
| lifespan=lifespan | |
| ) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # ============================================================================ | |
| # Authentication | |
| # ============================================================================ | |
| def verify_api_key(authorization: Optional[str] = None) -> bool: | |
| if not API_KEY or API_KEY == "": | |
| return True | |
| if not authorization: | |
| return False | |
| if authorization.startswith("Bearer "): | |
| token = authorization[7:] | |
| else: | |
| token = authorization | |
| return token == API_KEY | |
| # ============================================================================ | |
| # OpenAI Compatible Endpoints | |
| # ============================================================================ | |
| async def list_models(): | |
| models = [ | |
| OpenAIModelInfo(id=alias, created=int(time.time())) | |
| for alias in MODEL_ALIASES.keys() | |
| ] | |
| return OpenAIModelsResponse(data=models) | |
| async def get_model(model_id: str): | |
| if model_id in MODEL_ALIASES or model_id == MODEL_ID: | |
| return OpenAIModelInfo(id=model_id, created=int(time.time())) | |
| raise HTTPException(status_code=404, detail="Model not found") | |
| async def openai_chat_completions( | |
| request: OpenAIChatRequest, | |
| authorization: Optional[str] = Header(None), | |
| ): | |
| """OpenAI-compatible chat completions with prefill support""" | |
| if not verify_api_key(authorization): | |
| raise HTTPException(status_code=401, detail="Invalid API key") | |
| # Extract messages | |
| messages = [] | |
| for m in request.messages: | |
| content = extract_text_from_openai_content(m.content) | |
| messages.append({"role": m.role, "content": content}) | |
| # Check for prefill (last assistant message) | |
| messages, prefill = extract_prefill_from_messages(messages) | |
| # Extract system message | |
| system_prompt = None | |
| filtered_messages = [] | |
| for msg in messages: | |
| if msg["role"] == "system": | |
| system_prompt = msg["content"] | |
| else: | |
| filtered_messages.append(msg) | |
| prompt = format_messages_for_model(filtered_messages, system_prompt=system_prompt, prefill=prefill) | |
| max_tokens = request.max_completion_tokens or request.max_tokens or MAX_TOKENS_DEFAULT | |
| stop_sequences = None | |
| if request.stop: | |
| stop_sequences = [request.stop] if isinstance(request.stop, str) else request.stop | |
| request_id = f"chatcmpl-{uuid.uuid4().hex[:29]}" | |
| system_fingerprint = f"fp_{uuid.uuid4().hex[:10]}" | |
| created_time = int(time.time()) | |
| if request.stream: | |
| async def stream_generator(): | |
| first_chunk = { | |
| "id": request_id, | |
| "object": "chat.completion.chunk", | |
| "created": created_time, | |
| "model": request.model, | |
| "system_fingerprint": system_fingerprint, | |
| "choices": [{ | |
| "index": 0, | |
| "delta": {"role": "assistant", "content": prefill}, # Include prefill in first chunk | |
| "logprobs": None, | |
| "finish_reason": None | |
| }] | |
| } | |
| yield f"data: {json.dumps(first_chunk)}\n\n" | |
| async for token in generate_stream( | |
| prompt, | |
| max_tokens=max_tokens, | |
| temperature=request.temperature or 1.0, | |
| top_p=request.top_p or 1.0, | |
| ): | |
| chunk = { | |
| "id": request_id, | |
| "object": "chat.completion.chunk", | |
| "created": created_time, | |
| "model": request.model, | |
| "system_fingerprint": system_fingerprint, | |
| "choices": [{ | |
| "index": 0, | |
| "delta": {"content": token}, | |
| "logprobs": None, | |
| "finish_reason": None | |
| }] | |
| } | |
| yield f"data: {json.dumps(chunk)}\n\n" | |
| final_chunk = { | |
| "id": request_id, | |
| "object": "chat.completion.chunk", | |
| "created": created_time, | |
| "model": request.model, | |
| "system_fingerprint": system_fingerprint, | |
| "choices": [{ | |
| "index": 0, | |
| "delta": {}, | |
| "logprobs": None, | |
| "finish_reason": "stop" | |
| }] | |
| } | |
| yield f"data: {json.dumps(final_chunk)}\n\n" | |
| if request.stream_options and request.stream_options.get("include_usage"): | |
| usage_chunk = { | |
| "id": request_id, | |
| "object": "chat.completion.chunk", | |
| "created": created_time, | |
| "model": request.model, | |
| "choices": [], | |
| "usage": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0} | |
| } | |
| yield f"data: {json.dumps(usage_chunk)}\n\n" | |
| yield "data: [DONE]\n\n" | |
| return StreamingResponse( | |
| stream_generator(), | |
| media_type="text/event-stream", | |
| headers={"Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no"} | |
| ) | |
| # Non-streaming | |
| response_text, thinking_text, input_tokens, output_tokens, stop_reason = generate_response( | |
| prompt, | |
| max_tokens=max_tokens, | |
| temperature=request.temperature or 1.0, | |
| top_p=request.top_p or 1.0, | |
| stop=stop_sequences, | |
| ) | |
| # Prepend prefill to response | |
| full_response = prefill + response_text if prefill else response_text | |
| openai_finish_reason = "stop" if stop_reason == "stop" else "length" | |
| return OpenAIChatResponse( | |
| id=request_id, | |
| created=created_time, | |
| model=request.model, | |
| system_fingerprint=system_fingerprint, | |
| choices=[ | |
| OpenAIChoice( | |
| index=0, | |
| message=OpenAIChoiceMessage(role="assistant", content=full_response), | |
| finish_reason=openai_finish_reason, | |
| logprobs=None | |
| ) | |
| ], | |
| usage=OpenAIUsage( | |
| prompt_tokens=input_tokens, | |
| completion_tokens=output_tokens, | |
| total_tokens=input_tokens + output_tokens | |
| ) | |
| ) | |
| # ============================================================================ | |
| # Anthropic Compatible Endpoints with Prefill & Thinking | |
| # ============================================================================ | |
| async def anthropic_messages( | |
| request: AnthropicRequest, | |
| authorization: Optional[str] = Header(None), | |
| x_api_key: Optional[str] = Header(None, alias="x-api-key"), | |
| anthropic_version: Optional[str] = Header(None, alias="anthropic-version"), | |
| ): | |
| """Anthropic-compatible messages endpoint with prefill & thinking support""" | |
| auth_key = x_api_key or authorization | |
| if not verify_api_key(auth_key): | |
| raise HTTPException(status_code=401, detail="Invalid API key") | |
| # Extract messages | |
| messages = [] | |
| for m in request.messages: | |
| content = extract_text_from_anthropic_content(m.content) | |
| messages.append({"role": m.role, "content": content}) | |
| # Check for prefill (last assistant message) | |
| messages, prefill = extract_prefill_from_messages(messages) | |
| # Extract system prompt | |
| system_prompt = extract_system_prompt_anthropic(request.system) | |
| prompt = format_messages_for_model(messages, system_prompt=system_prompt, prefill=prefill) | |
| # Check thinking configuration | |
| enable_thinking = False | |
| thinking_budget = 512 | |
| if request.thinking: | |
| if request.thinking.type == "enabled": | |
| enable_thinking = True | |
| if request.thinking.budget_tokens: | |
| thinking_budget = request.thinking.budget_tokens | |
| request_id = f"msg_{uuid.uuid4().hex[:24]}" | |
| if request.stream: | |
| async def stream_generator(): | |
| input_tokens = 0 | |
| # message_start | |
| message_start = { | |
| "type": "message_start", | |
| "message": { | |
| "id": request_id, | |
| "type": "message", | |
| "role": "assistant", | |
| "model": request.model, | |
| "content": [], | |
| "stop_reason": None, | |
| "stop_sequence": None, | |
| "usage": {"input_tokens": input_tokens, "output_tokens": 0} | |
| } | |
| } | |
| yield f"event: message_start\ndata: {json.dumps(message_start)}\n\n" | |
| content_index = 0 | |
| # If thinking is enabled, add thinking block first (simulated) | |
| if enable_thinking: | |
| # thinking block start | |
| thinking_block_start = { | |
| "type": "content_block_start", | |
| "index": content_index, | |
| "content_block": {"type": "thinking", "thinking": ""} | |
| } | |
| yield f"event: content_block_start\ndata: {json.dumps(thinking_block_start)}\n\n" | |
| # Simulate thinking content | |
| thinking_text = "Analyzing the request and formulating a response..." | |
| thinking_delta = { | |
| "type": "content_block_delta", | |
| "index": content_index, | |
| "delta": {"type": "thinking_delta", "thinking": thinking_text} | |
| } | |
| yield f"event: content_block_delta\ndata: {json.dumps(thinking_delta)}\n\n" | |
| thinking_block_stop = {"type": "content_block_stop", "index": content_index} | |
| yield f"event: content_block_stop\ndata: {json.dumps(thinking_block_stop)}\n\n" | |
| content_index += 1 | |
| # text content block start | |
| content_block_start = { | |
| "type": "content_block_start", | |
| "index": content_index, | |
| "content_block": {"type": "text", "text": ""} | |
| } | |
| yield f"event: content_block_start\ndata: {json.dumps(content_block_start)}\n\n" | |
| # Include prefill in first delta if present | |
| if prefill: | |
| prefill_delta = { | |
| "type": "content_block_delta", | |
| "index": content_index, | |
| "delta": {"type": "text_delta", "text": prefill} | |
| } | |
| yield f"event: content_block_delta\ndata: {json.dumps(prefill_delta)}\n\n" | |
| # Stream content | |
| output_tokens = 0 | |
| async for token in generate_stream( | |
| prompt, | |
| max_tokens=request.max_tokens, | |
| temperature=request.temperature or 1.0, | |
| top_p=request.top_p or 0.999, | |
| top_k=request.top_k, | |
| ): | |
| output_tokens += 1 | |
| delta = { | |
| "type": "content_block_delta", | |
| "index": content_index, | |
| "delta": {"type": "text_delta", "text": token} | |
| } | |
| yield f"event: content_block_delta\ndata: {json.dumps(delta)}\n\n" | |
| # content_block_stop | |
| content_block_stop = {"type": "content_block_stop", "index": content_index} | |
| yield f"event: content_block_stop\ndata: {json.dumps(content_block_stop)}\n\n" | |
| # message_delta | |
| message_delta = { | |
| "type": "message_delta", | |
| "delta": {"stop_reason": "end_turn", "stop_sequence": None}, | |
| "usage": {"output_tokens": output_tokens} | |
| } | |
| yield f"event: message_delta\ndata: {json.dumps(message_delta)}\n\n" | |
| # message_stop | |
| message_stop = {"type": "message_stop"} | |
| yield f"event: message_stop\ndata: {json.dumps(message_stop)}\n\n" | |
| return StreamingResponse( | |
| stream_generator(), | |
| media_type="text/event-stream", | |
| headers={"Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no"} | |
| ) | |
| # Non-streaming response | |
| response_text, thinking_text, input_tokens, output_tokens, stop_reason = generate_response( | |
| prompt, | |
| max_tokens=request.max_tokens, | |
| temperature=request.temperature or 1.0, | |
| top_p=request.top_p or 0.999, | |
| top_k=request.top_k, | |
| stop=request.stop_sequences, | |
| enable_thinking=enable_thinking, | |
| thinking_budget=thinking_budget, | |
| ) | |
| # Prepend prefill to response | |
| full_response = prefill + response_text if prefill else response_text | |
| # Build content blocks | |
| content_blocks = [] | |
| # Add thinking block if enabled and we have thinking content | |
| if enable_thinking: | |
| if not thinking_text: | |
| thinking_text = "Analyzing the request and formulating a response." | |
| content_blocks.append(AnthropicResponseContent(type="thinking", thinking=thinking_text)) | |
| # Add text block | |
| content_blocks.append(AnthropicResponseContent(type="text", text=full_response)) | |
| # Determine stop reason | |
| anthropic_stop_reason = "end_turn" | |
| stop_sequence_used = None | |
| if stop_reason == "length": | |
| anthropic_stop_reason = "max_tokens" | |
| elif stop_reason == "stop" and request.stop_sequences: | |
| for seq in request.stop_sequences: | |
| if seq in response_text: | |
| anthropic_stop_reason = "stop_sequence" | |
| stop_sequence_used = seq | |
| break | |
| return AnthropicResponse( | |
| id=request_id, | |
| model=request.model, | |
| content=content_blocks, | |
| stop_reason=anthropic_stop_reason, | |
| stop_sequence=stop_sequence_used, | |
| usage=AnthropicUsage( | |
| input_tokens=input_tokens, | |
| output_tokens=output_tokens | |
| ) | |
| ) | |
| # ============================================================================ | |
| # Files API (Beta) | |
| # ============================================================================ | |
| # In-memory file storage (for demo - in production use persistent storage) | |
| files_storage: Dict[str, Dict] = {} | |
| class FileUploadResponse(BaseModel): | |
| id: str | |
| object: str = "file" | |
| bytes: int | |
| created_at: int | |
| filename: str | |
| purpose: str | |
| async def upload_file( | |
| request: Request, | |
| authorization: Optional[str] = Header(None), | |
| ): | |
| """Upload a file for use across multiple API calls""" | |
| if not verify_api_key(authorization): | |
| raise HTTPException(status_code=401, detail="Invalid API key") | |
| form = await request.form() | |
| file = form.get("file") | |
| purpose = form.get("purpose", "assistants") | |
| if not file: | |
| raise HTTPException(status_code=400, detail="No file provided") | |
| file_id = f"file-{uuid.uuid4().hex[:24]}" | |
| content = await file.read() | |
| file_data = { | |
| "id": file_id, | |
| "object": "file", | |
| "bytes": len(content), | |
| "created_at": int(time.time()), | |
| "filename": file.filename, | |
| "purpose": purpose, | |
| "content": content # Store content in memory | |
| } | |
| files_storage[file_id] = file_data | |
| return FileUploadResponse( | |
| id=file_id, | |
| bytes=len(content), | |
| created_at=file_data["created_at"], | |
| filename=file.filename, | |
| purpose=purpose | |
| ) | |
| async def list_files( | |
| authorization: Optional[str] = Header(None), | |
| purpose: Optional[str] = None, | |
| ): | |
| """List all uploaded files""" | |
| if not verify_api_key(authorization): | |
| raise HTTPException(status_code=401, detail="Invalid API key") | |
| files_list = [] | |
| for file_id, file_data in files_storage.items(): | |
| if purpose and file_data.get("purpose") != purpose: | |
| continue | |
| files_list.append({ | |
| "id": file_data["id"], | |
| "object": "file", | |
| "bytes": file_data["bytes"], | |
| "created_at": file_data["created_at"], | |
| "filename": file_data["filename"], | |
| "purpose": file_data["purpose"] | |
| }) | |
| return {"object": "list", "data": files_list} | |
| async def get_file( | |
| file_id: str, | |
| authorization: Optional[str] = Header(None), | |
| ): | |
| """Get file metadata""" | |
| if not verify_api_key(authorization): | |
| raise HTTPException(status_code=401, detail="Invalid API key") | |
| if file_id not in files_storage: | |
| raise HTTPException(status_code=404, detail="File not found") | |
| file_data = files_storage[file_id] | |
| return { | |
| "id": file_data["id"], | |
| "object": "file", | |
| "bytes": file_data["bytes"], | |
| "created_at": file_data["created_at"], | |
| "filename": file_data["filename"], | |
| "purpose": file_data["purpose"] | |
| } | |
| async def delete_file( | |
| file_id: str, | |
| authorization: Optional[str] = Header(None), | |
| ): | |
| """Delete a file""" | |
| if not verify_api_key(authorization): | |
| raise HTTPException(status_code=401, detail="Invalid API key") | |
| if file_id not in files_storage: | |
| raise HTTPException(status_code=404, detail="File not found") | |
| del files_storage[file_id] | |
| return {"id": file_id, "object": "file", "deleted": True} | |
| # ============================================================================ | |
| # Skills API (Beta) | |
| # ============================================================================ | |
| skills_storage: Dict[str, Dict] = {} | |
| class SkillCreate(BaseModel): | |
| name: str | |
| description: Optional[str] = None | |
| instructions: str | |
| tools: Optional[List[Dict]] = None | |
| class SkillResponse(BaseModel): | |
| id: str | |
| object: str = "skill" | |
| name: str | |
| description: Optional[str] = None | |
| instructions: str | |
| tools: Optional[List[Dict]] = None | |
| created_at: int | |
| async def create_skill( | |
| request: SkillCreate, | |
| authorization: Optional[str] = Header(None), | |
| ): | |
| """Create a custom agent skill""" | |
| if not verify_api_key(authorization): | |
| raise HTTPException(status_code=401, detail="Invalid API key") | |
| skill_id = f"skill-{uuid.uuid4().hex[:24]}" | |
| skill_data = { | |
| "id": skill_id, | |
| "object": "skill", | |
| "name": request.name, | |
| "description": request.description, | |
| "instructions": request.instructions, | |
| "tools": request.tools or [], | |
| "created_at": int(time.time()) | |
| } | |
| skills_storage[skill_id] = skill_data | |
| return SkillResponse(**skill_data) | |
| async def list_skills( | |
| authorization: Optional[str] = Header(None), | |
| ): | |
| """List all custom skills""" | |
| if not verify_api_key(authorization): | |
| raise HTTPException(status_code=401, detail="Invalid API key") | |
| return { | |
| "object": "list", | |
| "data": [ | |
| {k: v for k, v in skill.items()} | |
| for skill in skills_storage.values() | |
| ] | |
| } | |
| async def get_skill( | |
| skill_id: str, | |
| authorization: Optional[str] = Header(None), | |
| ): | |
| """Get skill details""" | |
| if not verify_api_key(authorization): | |
| raise HTTPException(status_code=401, detail="Invalid API key") | |
| if skill_id not in skills_storage: | |
| raise HTTPException(status_code=404, detail="Skill not found") | |
| return skills_storage[skill_id] | |
| async def delete_skill( | |
| skill_id: str, | |
| authorization: Optional[str] = Header(None), | |
| ): | |
| """Delete a skill""" | |
| if not verify_api_key(authorization): | |
| raise HTTPException(status_code=401, detail="Invalid API key") | |
| if skill_id not in skills_storage: | |
| raise HTTPException(status_code=404, detail="Skill not found") | |
| del skills_storage[skill_id] | |
| return {"id": skill_id, "object": "skill", "deleted": True} | |
| # ============================================================================ | |
| # Message Batches API (50% cost reduction for async processing) | |
| # ============================================================================ | |
| batches_storage: Dict[str, Dict] = {} | |
| class BatchRequest(BaseModel): | |
| custom_id: str | |
| params: Dict # Contains the message request parameters | |
| class CreateBatchRequest(BaseModel): | |
| requests: List[BatchRequest] | |
| class BatchResponse(BaseModel): | |
| id: str | |
| type: str = "message_batch" | |
| processing_status: str # "in_progress", "ended" | |
| request_counts: Dict | |
| ended_at: Optional[int] = None | |
| created_at: int | |
| expires_at: int | |
| results_url: Optional[str] = None | |
| async def create_message_batch( | |
| request: CreateBatchRequest, | |
| authorization: Optional[str] = Header(None), | |
| x_api_key: Optional[str] = Header(None, alias="x-api-key"), | |
| ): | |
| """ | |
| Create a Message Batch for async processing with 50% cost reduction. | |
| Process large volumes of Messages requests asynchronously. | |
| """ | |
| auth_key = x_api_key or authorization | |
| if not verify_api_key(auth_key): | |
| raise HTTPException(status_code=401, detail="Invalid API key") | |
| batch_id = f"batch_{uuid.uuid4().hex[:24]}" | |
| created_at = int(time.time()) | |
| # Process batch requests asynchronously (simulated) | |
| results = [] | |
| succeeded = 0 | |
| failed = 0 | |
| for req in request.requests: | |
| try: | |
| # Extract message parameters | |
| params = req.params | |
| messages = params.get("messages", []) | |
| max_tokens = params.get("max_tokens", 1024) | |
| # Format and generate | |
| formatted_msgs = [] | |
| for m in messages: | |
| content = m.get("content", "") | |
| if isinstance(content, list): | |
| content = " ".join([b.get("text", "") for b in content if b.get("type") == "text"]) | |
| formatted_msgs.append({"role": m.get("role"), "content": content}) | |
| prompt = format_messages_for_model(formatted_msgs) | |
| response_text, _, input_tokens, output_tokens, _ = generate_response( | |
| prompt, max_tokens=max_tokens | |
| ) | |
| results.append({ | |
| "custom_id": req.custom_id, | |
| "result": { | |
| "type": "succeeded", | |
| "message": { | |
| "id": f"msg_{uuid.uuid4().hex[:24]}", | |
| "type": "message", | |
| "role": "assistant", | |
| "content": [{"type": "text", "text": response_text}], | |
| "model": params.get("model", "claude-3-sonnet"), | |
| "stop_reason": "end_turn", | |
| "usage": {"input_tokens": input_tokens, "output_tokens": output_tokens} | |
| } | |
| } | |
| }) | |
| succeeded += 1 | |
| except Exception as e: | |
| results.append({ | |
| "custom_id": req.custom_id, | |
| "result": { | |
| "type": "errored", | |
| "error": {"type": "server_error", "message": str(e)} | |
| } | |
| }) | |
| failed += 1 | |
| batch_data = { | |
| "id": batch_id, | |
| "type": "message_batch", | |
| "processing_status": "ended", | |
| "request_counts": { | |
| "processing": 0, | |
| "succeeded": succeeded, | |
| "errored": failed, | |
| "canceled": 0, | |
| "expired": 0 | |
| }, | |
| "ended_at": int(time.time()), | |
| "created_at": created_at, | |
| "expires_at": created_at + 86400, # 24 hours | |
| "results": results | |
| } | |
| batches_storage[batch_id] = batch_data | |
| return BatchResponse( | |
| id=batch_id, | |
| processing_status="ended", | |
| request_counts=batch_data["request_counts"], | |
| ended_at=batch_data["ended_at"], | |
| created_at=created_at, | |
| expires_at=batch_data["expires_at"], | |
| results_url=f"/v1/messages/batches/{batch_id}/results" | |
| ) | |
| async def list_batches( | |
| authorization: Optional[str] = Header(None), | |
| x_api_key: Optional[str] = Header(None, alias="x-api-key"), | |
| ): | |
| """List all message batches""" | |
| auth_key = x_api_key or authorization | |
| if not verify_api_key(auth_key): | |
| raise HTTPException(status_code=401, detail="Invalid API key") | |
| return { | |
| "object": "list", | |
| "data": [ | |
| {k: v for k, v in batch.items() if k != "results"} | |
| for batch in batches_storage.values() | |
| ] | |
| } | |
| async def get_batch( | |
| batch_id: str, | |
| authorization: Optional[str] = Header(None), | |
| x_api_key: Optional[str] = Header(None, alias="x-api-key"), | |
| ): | |
| """Get batch status and details""" | |
| auth_key = x_api_key or authorization | |
| if not verify_api_key(auth_key): | |
| raise HTTPException(status_code=401, detail="Invalid API key") | |
| if batch_id not in batches_storage: | |
| raise HTTPException(status_code=404, detail="Batch not found") | |
| batch = batches_storage[batch_id] | |
| return {k: v for k, v in batch.items() if k != "results"} | |
| async def get_batch_results( | |
| batch_id: str, | |
| authorization: Optional[str] = Header(None), | |
| x_api_key: Optional[str] = Header(None, alias="x-api-key"), | |
| ): | |
| """Get batch results (JSONL format)""" | |
| auth_key = x_api_key or authorization | |
| if not verify_api_key(auth_key): | |
| raise HTTPException(status_code=401, detail="Invalid API key") | |
| if batch_id not in batches_storage: | |
| raise HTTPException(status_code=404, detail="Batch not found") | |
| batch = batches_storage[batch_id] | |
| if batch["processing_status"] != "ended": | |
| raise HTTPException(status_code=400, detail="Batch still processing") | |
| # Return results as JSON (in real API this would be JSONL) | |
| return {"results": batch.get("results", [])} | |
| async def cancel_batch( | |
| batch_id: str, | |
| authorization: Optional[str] = Header(None), | |
| x_api_key: Optional[str] = Header(None, alias="x-api-key"), | |
| ): | |
| """Cancel a batch""" | |
| auth_key = x_api_key or authorization | |
| if not verify_api_key(auth_key): | |
| raise HTTPException(status_code=401, detail="Invalid API key") | |
| if batch_id not in batches_storage: | |
| raise HTTPException(status_code=404, detail="Batch not found") | |
| batch = batches_storage[batch_id] | |
| if batch["processing_status"] == "ended": | |
| raise HTTPException(status_code=400, detail="Batch already ended") | |
| batch["processing_status"] = "ended" | |
| batch["request_counts"]["canceled"] = batch["request_counts"].get("processing", 0) | |
| batch["request_counts"]["processing"] = 0 | |
| return {k: v for k, v in batch.items() if k != "results"} | |
| # ============================================================================ | |
| # Anthropic Separate Base Path: /anthropic/v1/ | |
| # ============================================================================ | |
| async def anthropic_messages_separate( | |
| request: AnthropicRequest, | |
| authorization: Optional[str] = Header(None), | |
| x_api_key: Optional[str] = Header(None, alias="x-api-key"), | |
| anthropic_version: Optional[str] = Header(None, alias="anthropic-version"), | |
| ): | |
| """Anthropic endpoint with separate base path: /anthropic/v1/messages""" | |
| return await anthropic_messages(request, authorization, x_api_key, anthropic_version) | |
| async def anthropic_list_models(): | |
| """List Anthropic models""" | |
| return { | |
| "object": "list", | |
| "data": [ | |
| {"id": "claude-3-opus-20240229", "object": "model", "created": int(time.time()), "owned_by": "anthropic"}, | |
| {"id": "claude-3-sonnet-20240229", "object": "model", "created": int(time.time()), "owned_by": "anthropic"}, | |
| {"id": "claude-3-haiku-20240307", "object": "model", "created": int(time.time()), "owned_by": "anthropic"}, | |
| {"id": "claude-3-5-sonnet-20241022", "object": "model", "created": int(time.time()), "owned_by": "anthropic"}, | |
| {"id": "claude-3-5-haiku-20241022", "object": "model", "created": int(time.time()), "owned_by": "anthropic"}, | |
| {"id": "claude-3-opus", "object": "model", "created": int(time.time()), "owned_by": "anthropic"}, | |
| {"id": "claude-3-sonnet", "object": "model", "created": int(time.time()), "owned_by": "anthropic"}, | |
| {"id": "claude-3-haiku", "object": "model", "created": int(time.time()), "owned_by": "anthropic"}, | |
| {"id": "claude-3-5-sonnet", "object": "model", "created": int(time.time()), "owned_by": "anthropic"}, | |
| {"id": "claude-code", "object": "model", "created": int(time.time()), "owned_by": "anthropic"}, | |
| ] | |
| } | |
| async def anthropic_info(): | |
| """Anthropic base endpoint info""" | |
| return { | |
| "name": "Anthropic Compatible API", | |
| "version": ANTHROPIC_VERSION, | |
| "base_url": "/anthropic/v1", | |
| "endpoints": { | |
| "messages": "/anthropic/v1/messages", | |
| "models": "/anthropic/v1/models" | |
| }, | |
| "features": ["prefill_response", "thinking", "streaming"] | |
| } | |
| # ============================================================================ | |
| # Computer Use Agent (CUA) - Pydantic Models | |
| # ============================================================================ | |
| class CUAToolAction(BaseModel): | |
| """Computer use tool action""" | |
| type: str # "click", "type", "scroll", "screenshot", "key", "move", "drag", "wait" | |
| # For click/move/drag | |
| x: Optional[int] = None | |
| y: Optional[int] = None | |
| button: Optional[str] = "left" # "left", "right", "middle" | |
| # For type | |
| text: Optional[str] = None | |
| # For key | |
| key: Optional[str] = None # "enter", "tab", "escape", "backspace", etc. | |
| modifiers: Optional[List[str]] = None # ["ctrl", "shift", "alt", "meta"] | |
| # For scroll | |
| direction: Optional[str] = None # "up", "down", "left", "right" | |
| amount: Optional[int] = None # pixels or lines | |
| # For drag | |
| start_x: Optional[int] = None | |
| start_y: Optional[int] = None | |
| end_x: Optional[int] = None | |
| end_y: Optional[int] = None | |
| # For wait | |
| duration: Optional[float] = None # seconds | |
| class CUAToolResult(BaseModel): | |
| """Result of a computer use tool action""" | |
| type: str = "tool_result" | |
| tool_use_id: str | |
| content: Optional[Union[str, List[Dict]]] = None | |
| is_error: Optional[bool] = False | |
| class CUAScreenInfo(BaseModel): | |
| """Screen configuration for CUA""" | |
| width: int = 1920 | |
| height: int = 1080 | |
| display_number: Optional[int] = 0 | |
| class CUAComputerTool(BaseModel): | |
| """Computer use tool definition""" | |
| type: str = "computer_20241022" | |
| name: str = "computer" | |
| display_width_px: int = 1920 | |
| display_height_px: int = 1080 | |
| display_number: Optional[int] = 0 | |
| class CUAMessage(BaseModel): | |
| """CUA message format""" | |
| role: str | |
| content: Union[str, List[Dict]] | |
| class CUARequest(BaseModel): | |
| """Computer Use Agent request""" | |
| model: str = "sheikh-computer-use-preview" | |
| messages: List[CUAMessage] | |
| max_tokens: int = 4096 | |
| # Computer use specific | |
| tools: Optional[List[Dict]] = None | |
| tool_choice: Optional[Dict] = None | |
| # Screen configuration | |
| screen: Optional[CUAScreenInfo] = None | |
| # Standard params | |
| system: Optional[str] = None | |
| temperature: Optional[float] = 0.7 | |
| stream: Optional[bool] = False | |
| # Thinking mode | |
| thinking: Optional[AnthropicThinkingConfig] = None | |
| class CUAToolUseBlock(BaseModel): | |
| """Tool use content block""" | |
| type: str = "tool_use" | |
| id: str | |
| name: str | |
| input: Dict | |
| class CUAResponse(BaseModel): | |
| """CUA response format""" | |
| id: str | |
| type: str = "message" | |
| role: str = "assistant" | |
| model: str | |
| content: List[Dict] | |
| stop_reason: Optional[str] = None | |
| usage: Dict | |
| # ============================================================================ | |
| # CUA - Computer Action Parser | |
| # ============================================================================ | |
| def parse_computer_action_from_text(text: str, screen_width: int = 1920, screen_height: int = 1080) -> Optional[Dict]: | |
| """ | |
| Parse computer actions from model's text response. | |
| The model describes what actions it wants to take, and we parse them. | |
| """ | |
| import re | |
| text_lower = text.lower() | |
| # Click patterns | |
| click_match = re.search(r'click\s+(?:at\s+)?(?:\()?(\d+)\s*[,\s]\s*(\d+)(?:\))?', text_lower) | |
| if click_match: | |
| return { | |
| "type": "tool_use", | |
| "id": f"toolu_{uuid.uuid4().hex[:24]}", | |
| "name": "computer", | |
| "input": { | |
| "action": "click", | |
| "coordinate": [int(click_match.group(1)), int(click_match.group(2))] | |
| } | |
| } | |
| # Type patterns | |
| type_match = re.search(r'type\s+["\']([^"\']+)["\']', text, re.IGNORECASE) | |
| if type_match: | |
| return { | |
| "type": "tool_use", | |
| "id": f"toolu_{uuid.uuid4().hex[:24]}", | |
| "name": "computer", | |
| "input": { | |
| "action": "type", | |
| "text": type_match.group(1) | |
| } | |
| } | |
| # Key press patterns | |
| key_match = re.search(r'press\s+(?:the\s+)?(\w+)\s+key', text_lower) | |
| if key_match: | |
| return { | |
| "type": "tool_use", | |
| "id": f"toolu_{uuid.uuid4().hex[:24]}", | |
| "name": "computer", | |
| "input": { | |
| "action": "key", | |
| "key": key_match.group(1) | |
| } | |
| } | |
| # Screenshot request | |
| if 'screenshot' in text_lower or 'screen capture' in text_lower or 'take a picture' in text_lower: | |
| return { | |
| "type": "tool_use", | |
| "id": f"toolu_{uuid.uuid4().hex[:24]}", | |
| "name": "computer", | |
| "input": { | |
| "action": "screenshot" | |
| } | |
| } | |
| # Scroll patterns | |
| scroll_match = re.search(r'scroll\s+(up|down|left|right)(?:\s+(\d+))?', text_lower) | |
| if scroll_match: | |
| return { | |
| "type": "tool_use", | |
| "id": f"toolu_{uuid.uuid4().hex[:24]}", | |
| "name": "computer", | |
| "input": { | |
| "action": "scroll", | |
| "coordinate": [screen_width // 2, screen_height // 2], | |
| "direction": scroll_match.group(1), | |
| "amount": int(scroll_match.group(2)) if scroll_match.group(2) else 3 | |
| } | |
| } | |
| # Move mouse | |
| move_match = re.search(r'move\s+(?:mouse\s+)?(?:to\s+)?(?:\()?(\d+)\s*[,\s]\s*(\d+)(?:\))?', text_lower) | |
| if move_match: | |
| return { | |
| "type": "tool_use", | |
| "id": f"toolu_{uuid.uuid4().hex[:24]}", | |
| "name": "computer", | |
| "input": { | |
| "action": "mouse_move", | |
| "coordinate": [int(move_match.group(1)), int(move_match.group(2))] | |
| } | |
| } | |
| # Double click | |
| if 'double click' in text_lower or 'double-click' in text_lower: | |
| dbl_match = re.search(r'double[- ]click\s+(?:at\s+)?(?:\()?(\d+)\s*[,\s]\s*(\d+)(?:\))?', text_lower) | |
| if dbl_match: | |
| return { | |
| "type": "tool_use", | |
| "id": f"toolu_{uuid.uuid4().hex[:24]}", | |
| "name": "computer", | |
| "input": { | |
| "action": "double_click", | |
| "coordinate": [int(dbl_match.group(1)), int(dbl_match.group(2))] | |
| } | |
| } | |
| # Right click | |
| if 'right click' in text_lower or 'right-click' in text_lower: | |
| right_match = re.search(r'right[- ]click\s+(?:at\s+)?(?:\()?(\d+)\s*[,\s]\s*(\d+)(?:\))?', text_lower) | |
| if right_match: | |
| return { | |
| "type": "tool_use", | |
| "id": f"toolu_{uuid.uuid4().hex[:24]}", | |
| "name": "computer", | |
| "input": { | |
| "action": "right_click", | |
| "coordinate": [int(right_match.group(1)), int(right_match.group(2))] | |
| } | |
| } | |
| # Drag patterns | |
| drag_match = re.search(r'drag\s+from\s+(?:\()?(\d+)\s*[,\s]\s*(\d+)(?:\))?\s+to\s+(?:\()?(\d+)\s*[,\s]\s*(\d+)(?:\))?', text_lower) | |
| if drag_match: | |
| return { | |
| "type": "tool_use", | |
| "id": f"toolu_{uuid.uuid4().hex[:24]}", | |
| "name": "computer", | |
| "input": { | |
| "action": "left_click_drag", | |
| "start_coordinate": [int(drag_match.group(1)), int(drag_match.group(2))], | |
| "coordinate": [int(drag_match.group(3)), int(drag_match.group(4))] | |
| } | |
| } | |
| return None | |
| # ============================================================================ | |
| # Computer Use Agent (CUA) Endpoint | |
| # ============================================================================ | |
| async def computer_use_agent( | |
| request: CUARequest, | |
| authorization: Optional[str] = Header(None), | |
| x_api_key: Optional[str] = Header(None, alias="x-api-key"), | |
| ): | |
| """ | |
| Computer Use Agent endpoint - sheikh-computer-use-preview | |
| This endpoint provides a computer control interface compatible with | |
| Anthropic's Computer Use API. It processes user requests and generates | |
| computer control actions (click, type, scroll, screenshot, etc.) | |
| The model analyzes the request and current state (via screenshots) and | |
| outputs structured tool calls for computer control actions. | |
| """ | |
| auth_key = x_api_key or authorization | |
| if not verify_api_key(auth_key): | |
| raise HTTPException(status_code=401, detail="Invalid API key") | |
| # Get screen configuration | |
| screen_width = 1920 | |
| screen_height = 1080 | |
| if request.screen: | |
| screen_width = request.screen.width | |
| screen_height = request.screen.height | |
| # Build system prompt for computer use | |
| cua_system_prompt = f"""You are a Computer Use Agent (CUA) that helps users interact with computers. | |
| You can control the computer by describing actions you want to take. | |
| Available actions: | |
| - click at (x, y) - Click at screen coordinates | |
| - double click at (x, y) - Double click at coordinates | |
| - right click at (x, y) - Right click at coordinates | |
| - type "text" - Type the specified text | |
| - press [key] key - Press a key (enter, tab, escape, backspace, etc.) | |
| - scroll [up/down/left/right] [amount] - Scroll the screen | |
| - move mouse to (x, y) - Move cursor to coordinates | |
| - drag from (x1, y1) to (x2, y2) - Drag from one point to another | |
| - screenshot - Request a screenshot of the current screen | |
| Screen resolution: {screen_width}x{screen_height} | |
| When analyzing a screenshot or user request, describe the actions needed step by step. | |
| Always specify exact coordinates when performing click or move actions. | |
| Be precise and methodical in your approach.""" | |
| if request.system: | |
| cua_system_prompt = request.system + "\n\n" + cua_system_prompt | |
| # Extract messages | |
| messages = [] | |
| for m in request.messages: | |
| content = m.content | |
| if isinstance(content, str): | |
| messages.append({"role": m.role, "content": content}) | |
| elif isinstance(content, list): | |
| # Handle multimodal content (images, tool results) | |
| text_parts = [] | |
| for block in content: | |
| if isinstance(block, dict): | |
| if block.get("type") == "text": | |
| text_parts.append(block.get("text", "")) | |
| elif block.get("type") == "image": | |
| text_parts.append("[Screenshot provided - analyzing...]") | |
| elif block.get("type") == "tool_result": | |
| text_parts.append(f"[Tool result: {block.get('content', '')}]") | |
| messages.append({"role": m.role, "content": "\n".join(text_parts)}) | |
| # Check for prefill | |
| messages, prefill = extract_prefill_from_messages(messages) | |
| prompt = format_messages_for_model(messages, system_prompt=cua_system_prompt, prefill=prefill) | |
| request_id = f"msg_{uuid.uuid4().hex[:24]}" | |
| if request.stream: | |
| async def stream_generator(): | |
| # message_start | |
| message_start = { | |
| "type": "message_start", | |
| "message": { | |
| "id": request_id, | |
| "type": "message", | |
| "role": "assistant", | |
| "model": request.model, | |
| "content": [], | |
| "stop_reason": None, | |
| "usage": {"input_tokens": 0, "output_tokens": 0} | |
| } | |
| } | |
| yield f"event: message_start\ndata: {json.dumps(message_start)}\n\n" | |
| # content_block_start for text | |
| content_block_start = { | |
| "type": "content_block_start", | |
| "index": 0, | |
| "content_block": {"type": "text", "text": ""} | |
| } | |
| yield f"event: content_block_start\ndata: {json.dumps(content_block_start)}\n\n" | |
| full_text = "" | |
| output_tokens = 0 | |
| async for token in generate_stream( | |
| prompt, | |
| max_tokens=request.max_tokens, | |
| temperature=request.temperature or 0.7, | |
| ): | |
| full_text += token | |
| output_tokens += 1 | |
| delta = { | |
| "type": "content_block_delta", | |
| "index": 0, | |
| "delta": {"type": "text_delta", "text": token} | |
| } | |
| yield f"event: content_block_delta\ndata: {json.dumps(delta)}\n\n" | |
| # content_block_stop for text | |
| yield f"event: content_block_stop\ndata: {json.dumps({'type': 'content_block_stop', 'index': 0})}\n\n" | |
| # Check if we should emit a tool_use block | |
| tool_action = parse_computer_action_from_text(full_text, screen_width, screen_height) | |
| if tool_action: | |
| tool_block_start = { | |
| "type": "content_block_start", | |
| "index": 1, | |
| "content_block": { | |
| "type": "tool_use", | |
| "id": tool_action["id"], | |
| "name": tool_action["name"], | |
| "input": {} | |
| } | |
| } | |
| yield f"event: content_block_start\ndata: {json.dumps(tool_block_start)}\n\n" | |
| # Send input as delta | |
| input_delta = { | |
| "type": "content_block_delta", | |
| "index": 1, | |
| "delta": {"type": "input_json_delta", "partial_json": json.dumps(tool_action["input"])} | |
| } | |
| yield f"event: content_block_delta\ndata: {json.dumps(input_delta)}\n\n" | |
| yield f"event: content_block_stop\ndata: {json.dumps({'type': 'content_block_stop', 'index': 1})}\n\n" | |
| # message_delta | |
| stop_reason = "tool_use" if tool_action else "end_turn" | |
| message_delta = { | |
| "type": "message_delta", | |
| "delta": {"stop_reason": stop_reason}, | |
| "usage": {"output_tokens": output_tokens} | |
| } | |
| yield f"event: message_delta\ndata: {json.dumps(message_delta)}\n\n" | |
| yield f"event: message_stop\ndata: {json.dumps({'type': 'message_stop'})}\n\n" | |
| return StreamingResponse( | |
| stream_generator(), | |
| media_type="text/event-stream", | |
| headers={"Cache-Control": "no-cache", "Connection": "keep-alive"} | |
| ) | |
| # Non-streaming response | |
| response_text, thinking_text, input_tokens, output_tokens, stop_reason = generate_response( | |
| prompt, | |
| max_tokens=request.max_tokens, | |
| temperature=request.temperature or 0.7, | |
| ) | |
| full_response = prefill + response_text if prefill else response_text | |
| # Build content blocks | |
| content_blocks = [] | |
| # Add text block | |
| content_blocks.append({"type": "text", "text": full_response}) | |
| # Parse and add tool use block if detected | |
| tool_action = parse_computer_action_from_text(full_response, screen_width, screen_height) | |
| if tool_action: | |
| content_blocks.append(tool_action) | |
| stop_reason = "tool_use" | |
| else: | |
| stop_reason = "end_turn" | |
| return CUAResponse( | |
| id=request_id, | |
| model=request.model, | |
| content=content_blocks, | |
| stop_reason=stop_reason, | |
| usage={ | |
| "input_tokens": input_tokens, | |
| "output_tokens": output_tokens | |
| } | |
| ) | |
| # Alternative endpoint paths for compatibility | |
| async def computer_use_alt( | |
| request: CUARequest, | |
| authorization: Optional[str] = Header(None), | |
| x_api_key: Optional[str] = Header(None, alias="x-api-key"), | |
| ): | |
| """Alternative endpoint path for computer use""" | |
| return await computer_use_agent(request, authorization, x_api_key) | |
| # ============================================================================ | |
| # CUA Separate Base Path: /cua/v1/ | |
| # ============================================================================ | |
| async def cua_messages( | |
| request: CUARequest, | |
| authorization: Optional[str] = Header(None), | |
| x_api_key: Optional[str] = Header(None, alias="x-api-key"), | |
| ): | |
| """CUA endpoint with separate base path: /cua/v1/messages""" | |
| return await computer_use_agent(request, authorization, x_api_key) | |
| async def cua_list_models(): | |
| """List CUA models""" | |
| return { | |
| "object": "list", | |
| "data": [ | |
| { | |
| "id": "sheikh-computer-use-preview", | |
| "object": "model", | |
| "created": int(time.time()), | |
| "owned_by": "sheikh-ai", | |
| "capabilities": { | |
| "computer_use": True, | |
| "vision": True, | |
| "tool_use": True | |
| } | |
| }, | |
| { | |
| "id": "computer-use-preview", | |
| "object": "model", | |
| "created": int(time.time()), | |
| "owned_by": "sheikh-ai", | |
| "capabilities": { | |
| "computer_use": True, | |
| "vision": True, | |
| "tool_use": True | |
| } | |
| } | |
| ] | |
| } | |
| async def cua_info(): | |
| """CUA base endpoint info""" | |
| return { | |
| "name": "Sheikh Computer Use Agent (CUA)", | |
| "version": "1.0.0", | |
| "model": "sheikh-computer-use-preview", | |
| "base_url": "/cua/v1", | |
| "endpoints": { | |
| "messages": "/cua/v1/messages", | |
| "models": "/cua/v1/models" | |
| }, | |
| "supported_actions": [ | |
| "click", "double_click", "right_click", | |
| "type", "key", "scroll", | |
| "mouse_move", "left_click_drag", | |
| "screenshot" | |
| ], | |
| "screen_default": {"width": 1920, "height": 1080} | |
| } | |
| # ============================================================================ | |
| # Health & Info Endpoints | |
| # ============================================================================ | |
| async def root(): | |
| return { | |
| "name": "Free Coding API", | |
| "version": "1.3.0", | |
| "model": MODEL_ID, | |
| "features": { | |
| "prefill_response": "Supported", | |
| "thinking": "Supported", | |
| "streaming": "Supported", | |
| "computer_use": "Supported", | |
| "files_api": "Beta", | |
| "skills_api": "Beta", | |
| "message_batches": "Supported (50% cost reduction)" | |
| }, | |
| "openai": { | |
| "base_url": "/v1", | |
| "chat": "/v1/chat/completions", | |
| "models": "/v1/models", | |
| "files": "/v1/files", | |
| "skills": "/v1/skills" | |
| }, | |
| "anthropic": { | |
| "base_url": "/anthropic/v1", | |
| "messages": "/anthropic/v1/messages", | |
| "batches": "/v1/messages/batches", | |
| "models": "/anthropic/v1/models" | |
| }, | |
| "cua": { | |
| "base_url": "/cua/v1", | |
| "messages": "/cua/v1/messages", | |
| "models": "/cua/v1/models", | |
| "model": "sheikh-computer-use-preview" | |
| }, | |
| "docs": "/docs" | |
| } | |
| async def health(): | |
| return { | |
| "status": "healthy", | |
| "model_loaded": model is not None, | |
| "model_id": MODEL_ID | |
| } | |
| # ============================================================================ | |
| # Main Entry Point | |
| # ============================================================================ | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |