""" Dual-Compatible API Endpoint (OpenAI + Anthropic) llama.cpp powered with advanced features: - Request Queue & Rate Limiting - Prompt Caching (KV Cache) - Multi-Model Hot-Swap """ import os import time import uuid import logging import re import json import asyncio import hashlib from datetime import datetime from logging.handlers import RotatingFileHandler from typing import List, Optional, Union, Dict, Any, Literal from contextlib import asynccontextmanager from threading import Thread, Lock from collections import OrderedDict from dataclasses import dataclass, field from fastapi import FastAPI, HTTPException, Header, Request, BackgroundTasks from fastapi.responses import StreamingResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field from llama_cpp import Llama # ============== Logging Configuration ============== LOG_DIR = "/tmp/logs" os.makedirs(LOG_DIR, exist_ok=True) LOG_FILE = os.path.join(LOG_DIR, "api.log") log_format = logging.Formatter( '%(asctime)s | %(levelname)-8s | %(name)s | %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) file_handler = RotatingFileHandler( LOG_FILE, maxBytes=10*1024*1024, backupCount=5, encoding='utf-8' ) file_handler.setFormatter(log_format) file_handler.setLevel(logging.DEBUG) console_handler = logging.StreamHandler() console_handler.setFormatter(log_format) console_handler.setLevel(logging.INFO) logging.basicConfig(level=logging.DEBUG, handlers=[file_handler, console_handler]) logger = logging.getLogger("llama-api") for uvicorn_logger in ["uvicorn", "uvicorn.error", "uvicorn.access"]: uv_log = logging.getLogger(uvicorn_logger) uv_log.handlers = [file_handler, console_handler] logger.info("=" * 60) logger.info(f"llama.cpp API v3.0 Startup at {datetime.now().isoformat()}") logger.info(f"Log file: {LOG_FILE}") logger.info("=" * 60) # ============== Configuration ============== MODELS_DIR = "/app/models" N_CTX = 8192 N_THREADS = 2 N_BATCH = 128 # Model configurations MODEL_CONFIGS = { "qwen2.5-coder-7b": { "path": f"{MODELS_DIR}/qwen2.5-coder-7b-instruct-q4_k_m.gguf", "url": "https://huggingface.co/Qwen/Qwen2.5-Coder-7B-Instruct-GGUF/resolve/main/qwen2.5-coder-7b-instruct-q4_k_m.gguf", "size": "7B", "quantization": "Q4_K_M", "default": True }, "qwen2.5-coder-1.5b": { "path": f"{MODELS_DIR}/qwen2.5-coder-1.5b-instruct-q8_0.gguf", "url": "https://huggingface.co/Qwen/Qwen2.5-Coder-1.5B-Instruct-GGUF/resolve/main/qwen2.5-coder-1.5b-instruct-q8_0.gguf", "size": "1.5B", "quantization": "Q8_0", "default": False } } # ============== Feature 1: Request Queue ============== @dataclass class QueuedRequest: id: str priority: int = 0 # Higher = more priority created_at: float = field(default_factory=time.time) # Note: Future is created at runtime, not at class definition future: Optional[asyncio.Future] = None class RequestQueue: def __init__(self, max_concurrent: int = 1, max_queue_size: int = 50): self.max_concurrent = max_concurrent self.max_queue_size = max_queue_size self.queue: List[QueuedRequest] = [] self.active_requests = 0 self.lock = asyncio.Lock() self.stats = { "total_requests": 0, "completed_requests": 0, "rejected_requests": 0, "avg_wait_time": 0.0 } async def acquire(self, request_id: str, priority: int = 0) -> int: """Add request to queue, return position. Raises if queue full.""" async with self.lock: if len(self.queue) >= self.max_queue_size: self.stats["rejected_requests"] += 1 raise HTTPException(status_code=503, detail="Queue full, try again later") self.stats["total_requests"] += 1 if self.active_requests < self.max_concurrent: self.active_requests += 1 return 0 # Immediate processing req = QueuedRequest(id=request_id, priority=priority) self.queue.append(req) self.queue.sort(key=lambda x: (-x.priority, x.created_at)) position = self.queue.index(req) + 1 logger.info(f"[{request_id}] Queued at position {position}") return position async def wait_for_turn(self, request_id: str) -> float: """Wait until it's this request's turn. Returns wait time.""" start = time.time() while True: async with self.lock: # Check if we're first in queue and can proceed if self.queue and self.queue[0].id == request_id: if self.active_requests < self.max_concurrent: self.queue.pop(0) self.active_requests += 1 wait_time = time.time() - start # Update rolling average self.stats["avg_wait_time"] = ( self.stats["avg_wait_time"] * 0.9 + wait_time * 0.1 ) return wait_time await asyncio.sleep(0.1) async def release(self): """Release a slot when request completes.""" async with self.lock: self.active_requests = max(0, self.active_requests - 1) self.stats["completed_requests"] += 1 def get_status(self) -> Dict: return { "queue_length": len(self.queue), "active_requests": self.active_requests, "max_concurrent": self.max_concurrent, "stats": self.stats } def get_position(self, request_id: str) -> Optional[int]: for i, req in enumerate(self.queue): if req.id == request_id: return i + 1 return None request_queue = RequestQueue(max_concurrent=1, max_queue_size=50) # ============== Feature 2: Prompt Cache ============== class PromptCache: def __init__(self, max_size: int = 10): self.max_size = max_size self.cache: OrderedDict[str, Dict] = OrderedDict() self.lock = Lock() self.stats = {"hits": 0, "misses": 0} def _hash_prompt(self, system: str, tools: Optional[List] = None) -> str: """Generate hash for system prompt + tools combination.""" content = system or "" if tools: content += json.dumps(tools, sort_keys=True) return hashlib.md5(content.encode()).hexdigest()[:16] def get(self, system: str, tools: Optional[List] = None) -> Optional[Dict]: """Get cached prompt prefix.""" with self.lock: key = self._hash_prompt(system, tools) if key in self.cache: self.stats["hits"] += 1 self.cache.move_to_end(key) logger.debug(f"Prompt cache HIT: {key}") return self.cache[key] self.stats["misses"] += 1 return None def set(self, system: str, tools: Optional[List], data: Dict): """Cache prompt prefix data.""" with self.lock: key = self._hash_prompt(system, tools) if len(self.cache) >= self.max_size: oldest = next(iter(self.cache)) del self.cache[oldest] logger.debug(f"Prompt cache evicted: {oldest}") self.cache[key] = data logger.debug(f"Prompt cache SET: {key}") def get_stats(self) -> Dict: total = self.stats["hits"] + self.stats["misses"] hit_rate = (self.stats["hits"] / total * 100) if total > 0 else 0 return { "size": len(self.cache), "max_size": self.max_size, "hits": self.stats["hits"], "misses": self.stats["misses"], "hit_rate": f"{hit_rate:.1f}%" } prompt_cache = PromptCache(max_size=10) # ============== Feature 3: Multi-Model Manager ============== class ModelManager: def __init__(self): self.models: Dict[str, Llama] = {} self.current_model: Optional[str] = None self.lock = Lock() self.load_stats: Dict[str, Dict] = {} def load_model(self, model_id: str) -> Llama: """Load a model (lazy loading with hot-swap).""" with self.lock: if model_id in self.models: self.current_model = model_id return self.models[model_id] if model_id not in MODEL_CONFIGS: raise HTTPException(status_code=400, detail=f"Unknown model: {model_id}") config = MODEL_CONFIGS[model_id] # Check if model file exists if not os.path.exists(config["path"]): raise HTTPException( status_code=503, detail=f"Model file not found: {model_id}. Available: {list(self.models.keys())}" ) logger.info(f"Loading model: {model_id}") start = time.time() try: llm = Llama( model_path=config["path"], n_ctx=N_CTX, n_threads=N_THREADS, n_batch=N_BATCH, verbose=False ) load_time = time.time() - start self.models[model_id] = llm self.current_model = model_id self.load_stats[model_id] = { "loaded_at": datetime.now().isoformat(), "load_time": f"{load_time:.2f}s" } logger.info(f"Model {model_id} loaded in {load_time:.2f}s") return llm except Exception as e: logger.error(f"Failed to load model {model_id}: {e}") raise HTTPException(status_code=500, detail=f"Failed to load model: {e}") def get_model(self, model_id: Optional[str] = None) -> Llama: """Get a model, loading if necessary.""" if model_id is None: # Use default or current model model_id = self.current_model or self._get_default_model() # Normalize model name model_id = self._normalize_model_id(model_id) if model_id in self.models: return self.models[model_id] return self.load_model(model_id) def _normalize_model_id(self, model_id: str) -> str: """Normalize model ID to match config keys.""" model_id = model_id.lower().strip() # Handle common variations if "7b" in model_id and "qwen" in model_id: return "qwen2.5-coder-7b" if "1.5b" in model_id and "qwen" in model_id: return "qwen2.5-coder-1.5b" # Check if exact match if model_id in MODEL_CONFIGS: return model_id # Default to 7B return "qwen2.5-coder-7b" def _get_default_model(self) -> str: for model_id, config in MODEL_CONFIGS.items(): if config.get("default"): return model_id return list(MODEL_CONFIGS.keys())[0] def list_models(self) -> List[Dict]: """List all available models.""" models = [] for model_id, config in MODEL_CONFIGS.items(): models.append({ "id": model_id, "size": config["size"], "quantization": config["quantization"], "loaded": model_id in self.models, "available": os.path.exists(config["path"]), "default": config.get("default", False) }) return models def get_stats(self) -> Dict: return { "current_model": self.current_model, "loaded_models": list(self.models.keys()), "load_stats": self.load_stats } def unload_model(self, model_id: str): """Unload a model to free memory.""" with self.lock: if model_id in self.models: del self.models[model_id] if self.current_model == model_id: self.current_model = None logger.info(f"Model {model_id} unloaded") model_manager = ModelManager() # ============== App Initialization ============== @asynccontextmanager async def lifespan(app: FastAPI): # Load default model on startup default_model = None for model_id, config in MODEL_CONFIGS.items(): if config.get("default") and os.path.exists(config["path"]): default_model = model_id break if default_model: try: model_manager.load_model(default_model) except Exception as e: logger.error(f"Failed to load default model: {e}") else: logger.warning("No default model found, will load on first request") yield logger.info("Shutting down...") app = FastAPI( title="Dual-Compatible API (OpenAI + Anthropic) v3.0", description="llama.cpp API with Queue, Caching, and Multi-Model support", version="3.0.0", lifespan=lifespan ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.middleware("http") async def log_requests(request: Request, call_next): request_id = str(uuid.uuid4())[:8] start_time = time.time() # Add request ID to headers for tracking response = await call_next(request) duration = (time.time() - start_time) * 1000 logger.info(f"[{request_id}] {request.method} {request.url.path} - {response.status_code} ({duration:.2f}ms)") response.headers["X-Request-ID"] = request_id response.headers["X-Processing-Time"] = f"{duration:.2f}ms" return response # ============================================================ # ANTHROPIC-COMPATIBLE MODELS # ============================================================ class AnthropicTextBlock(BaseModel): type: Literal["text"] = "text" text: str class AnthropicImageSource(BaseModel): type: Literal["base64", "url"] = "base64" media_type: Optional[str] = None data: Optional[str] = None url: Optional[str] = None class AnthropicImageBlock(BaseModel): type: Literal["image"] = "image" source: AnthropicImageSource class AnthropicToolUseBlock(BaseModel): type: Literal["tool_use"] = "tool_use" id: str name: str input: Dict[str, Any] class AnthropicToolResultBlock(BaseModel): type: Literal["tool_result"] = "tool_result" tool_use_id: str content: Optional[Union[str, List[AnthropicTextBlock]]] = None is_error: Optional[bool] = False AnthropicContentBlock = Union[AnthropicTextBlock, AnthropicImageBlock, AnthropicToolUseBlock, AnthropicToolResultBlock] class AnthropicMessage(BaseModel): role: Literal["user", "assistant"] content: Union[str, List[AnthropicContentBlock]] class AnthropicToolInputSchema(BaseModel): type: Literal["object"] = "object" properties: Optional[Dict[str, Any]] = None required: Optional[List[str]] = None class AnthropicTool(BaseModel): name: str description: Optional[str] = None input_schema: AnthropicToolInputSchema class AnthropicToolChoiceAuto(BaseModel): type: Literal["auto"] = "auto" disable_parallel_tool_use: Optional[bool] = None class AnthropicToolChoiceAny(BaseModel): type: Literal["any"] = "any" disable_parallel_tool_use: Optional[bool] = None class AnthropicToolChoiceTool(BaseModel): type: Literal["tool"] = "tool" name: str disable_parallel_tool_use: Optional[bool] = None AnthropicToolChoice = Union[AnthropicToolChoiceAuto, AnthropicToolChoiceAny, AnthropicToolChoiceTool] class AnthropicMetadata(BaseModel): user_id: Optional[str] = None class AnthropicCacheControl(BaseModel): type: Literal["ephemeral"] = "ephemeral" class AnthropicSystemContent(BaseModel): type: Literal["text"] = "text" text: str cache_control: Optional[AnthropicCacheControl] = None class AnthropicThinkingConfig(BaseModel): type: Literal["enabled", "disabled"] = "enabled" budget_tokens: Optional[int] = Field(default=1024, ge=1, le=128000) class AnthropicMessageRequest(BaseModel): model: str max_tokens: int messages: List[AnthropicMessage] metadata: Optional[AnthropicMetadata] = None stop_sequences: Optional[List[str]] = None stream: Optional[bool] = False system: Optional[Union[str, List[AnthropicSystemContent]]] = None temperature: Optional[float] = Field(default=0.7, ge=0.0, le=1.0) tool_choice: Optional[AnthropicToolChoice] = None tools: Optional[List[AnthropicTool]] = None top_k: Optional[int] = Field(default=None, ge=0) top_p: Optional[float] = Field(default=None, ge=0.0, le=1.0) thinking: Optional[AnthropicThinkingConfig] = None class AnthropicUsage(BaseModel): input_tokens: int output_tokens: int cache_creation_input_tokens: Optional[int] = None cache_read_input_tokens: Optional[int] = None class AnthropicResponseTextBlock(BaseModel): type: Literal["text"] = "text" text: str class AnthropicResponseThinkingBlock(BaseModel): type: Literal["thinking"] = "thinking" thinking: str class AnthropicResponseToolUseBlock(BaseModel): type: Literal["tool_use"] = "tool_use" id: str name: str input: Dict[str, Any] AnthropicResponseContentBlock = Union[AnthropicResponseTextBlock, AnthropicResponseThinkingBlock, AnthropicResponseToolUseBlock] class AnthropicMessageResponse(BaseModel): id: str type: Literal["message"] = "message" role: Literal["assistant"] = "assistant" content: List[AnthropicResponseContentBlock] model: str stop_reason: Optional[Literal["end_turn", "max_tokens", "stop_sequence", "tool_use"]] = None stop_sequence: Optional[str] = None usage: AnthropicUsage class AnthropicTokenCountRequest(BaseModel): model: str messages: List[AnthropicMessage] system: Optional[Union[str, List[AnthropicSystemContent]]] = None tools: Optional[List[AnthropicTool]] = None thinking: Optional[AnthropicThinkingConfig] = None class AnthropicTokenCountResponse(BaseModel): input_tokens: int # ============================================================ # OPENAI-COMPATIBLE MODELS # ============================================================ class OpenAIMessage(BaseModel): role: Literal["system", "user", "assistant", "tool"] content: Optional[Union[str, List[Dict[str, Any]]]] = None name: Optional[str] = None tool_calls: Optional[List[Dict[str, Any]]] = None tool_call_id: Optional[str] = None class OpenAITool(BaseModel): type: Literal["function"] = "function" function: Dict[str, Any] class OpenAIToolChoice(BaseModel): type: str function: Optional[Dict[str, str]] = None class OpenAIChatRequest(BaseModel): model: str messages: List[OpenAIMessage] max_tokens: Optional[int] = 1024 temperature: Optional[float] = Field(default=0.7, ge=0.0, le=2.0) top_p: Optional[float] = Field(default=0.95, ge=0.0, le=1.0) n: Optional[int] = 1 stream: Optional[bool] = False stop: Optional[Union[str, List[str]]] = None presence_penalty: Optional[float] = 0.0 frequency_penalty: Optional[float] = 0.0 logit_bias: Optional[Dict[str, float]] = None user: Optional[str] = None tools: Optional[List[OpenAITool]] = None tool_choice: Optional[Union[str, OpenAIToolChoice]] = None seed: Optional[int] = None class OpenAIUsage(BaseModel): prompt_tokens: int completion_tokens: int total_tokens: int class OpenAIChoice(BaseModel): index: int message: Dict[str, Any] finish_reason: Optional[str] = None class OpenAIChatResponse(BaseModel): id: str object: Literal["chat.completion"] = "chat.completion" created: int model: str choices: List[OpenAIChoice] usage: OpenAIUsage system_fingerprint: Optional[str] = None class OpenAIModel(BaseModel): id: str object: Literal["model"] = "model" created: int owned_by: str class OpenAIModelList(BaseModel): object: Literal["list"] = "list" data: List[OpenAIModel] # ============== Helper Functions ============== def extract_anthropic_text(content: Union[str, List[AnthropicContentBlock]]) -> str: if isinstance(content, str): return content texts = [] for block in content: if isinstance(block, dict): if block.get("type") == "text": texts.append(block.get("text", "")) elif hasattr(block, "type") and block.type == "text": texts.append(block.text) return " ".join(texts) def extract_anthropic_system(system: Optional[Union[str, List[AnthropicSystemContent]]]) -> Optional[str]: if system is None: return None if isinstance(system, str): return system texts = [] for block in system: if isinstance(block, dict): texts.append(block.get("text", "")) elif hasattr(block, "text"): texts.append(block.text) return " ".join(texts) def check_cache_control(system: Optional[Union[str, List[AnthropicSystemContent]]]) -> bool: """Check if cache_control is set to ephemeral.""" if system is None or isinstance(system, str): return False for block in system: if isinstance(block, dict) and block.get("cache_control", {}).get("type") == "ephemeral": return True elif hasattr(block, "cache_control") and block.cache_control and block.cache_control.type == "ephemeral": return True return False def extract_openai_content(content: Optional[Union[str, List[Dict[str, Any]]]]) -> str: if content is None: return "" if isinstance(content, str): return content texts = [] for item in content: if isinstance(item, dict) and item.get("type") == "text": texts.append(item.get("text", "")) return " ".join(texts) def format_chat_prompt(messages: List[Dict[str, str]], system: Optional[str] = None) -> str: """Format messages for Qwen2.5 chat template""" prompt = "" if system: prompt += f"<|im_start|>system\n{system}<|im_end|>\n" for msg in messages: role = msg["role"] content = msg["content"] prompt += f"<|im_start|>{role}\n{content}<|im_end|>\n" prompt += "<|im_start|>assistant\n" return prompt def format_anthropic_messages( messages: List[AnthropicMessage], system: Optional[Union[str, List[AnthropicSystemContent]]] = None, tools: Optional[List[AnthropicTool]] = None, thinking_enabled: bool = False, budget_tokens: int = 1024 ) -> str: formatted_messages = [] system_text = extract_anthropic_system(system) or "" # Add tool definitions to system prompt if provided if tools: tool_defs = [] for tool in tools: tool_def = { "name": tool.name, "description": tool.description, "parameters": tool.input_schema.model_dump() } tool_defs.append(tool_def) tool_instruction = f"""You have access to the following tools: {json.dumps(tool_defs, indent=2)} To use a tool, respond with a JSON object in this exact format: {{"tool": "tool_name", "arguments": {{"arg1": "value1"}}}} Only use tools when necessary. If you don't need a tool, respond normally.""" system_text = f"{tool_instruction}\n\n{system_text}" if system_text else tool_instruction if thinking_enabled: thinking_instruction = f"""When solving complex problems: 1. Think through the problem step by step inside ... tags 2. After thinking, provide your final answer outside the thinking tags Budget for thinking: up to {budget_tokens} tokens.""" system_text = f"{thinking_instruction}\n\n{system_text}" if system_text else thinking_instruction for msg in messages: content = extract_anthropic_text(msg.content) formatted_messages.append({"role": msg.role, "content": content}) return format_chat_prompt(formatted_messages, system_text if system_text else None) def format_openai_messages(messages: List[OpenAIMessage]) -> str: system_text = None formatted_messages = [] for msg in messages: if msg.role == "system": system_text = extract_openai_content(msg.content) else: content = extract_openai_content(msg.content) formatted_messages.append({"role": msg.role, "content": content}) return format_chat_prompt(formatted_messages, system_text) def parse_thinking_response(text: str) -> tuple: thinking_pattern = r'(.*?)' thinking_matches = re.findall(thinking_pattern, text, re.DOTALL) if thinking_matches: thinking_text = "\n".join(thinking_matches).strip() answer_text = re.sub(thinking_pattern, '', text, flags=re.DOTALL).strip() return thinking_text, answer_text return None, text.strip() def parse_tool_use(text: str) -> Optional[Dict[str, Any]]: """Parse tool use from model response""" try: text_stripped = text.strip() if text_stripped.startswith("{") and text_stripped.endswith("}"): parsed = json.loads(text_stripped) if "tool" in parsed: return parsed brace_count = 0 start_idx = None for i, char in enumerate(text): if char == '{': if brace_count == 0: start_idx = i brace_count += 1 elif char == '}': brace_count -= 1 if brace_count == 0 and start_idx is not None: json_str = text[start_idx:i+1] try: parsed = json.loads(json_str) if "tool" in parsed: return parsed except: pass start_idx = None except: pass return None def generate_id(prefix: str = "msg") -> str: return f"{prefix}_{uuid.uuid4().hex[:24]}" # ============== ROOT ENDPOINTS ============== @app.get("/") async def root(): return { "status": "healthy", "version": "3.0.0", "backend": "llama.cpp", "features": [ "request-queue", "prompt-caching", "multi-model", "extended-thinking", "streaming", "tool-use", "dual-compatibility" ], "endpoints": { "openai": "/v1/chat/completions", "anthropic": "/anthropic/v1/messages" }, "models": model_manager.list_models(), "queue": request_queue.get_status(), "cache": prompt_cache.get_stats() } @app.get("/logs") async def get_logs(lines: int = 100): try: with open(LOG_FILE, 'r') as f: all_lines = f.readlines() recent_lines = all_lines[-lines:] if len(all_lines) > lines else all_lines return {"log_file": LOG_FILE, "total_lines": len(all_lines), "logs": "".join(recent_lines)} except FileNotFoundError: return {"error": "Log file not found"} @app.get("/health") async def health(): return { "status": "ok", "models": model_manager.get_stats(), "queue": request_queue.get_status(), "cache": prompt_cache.get_stats() } @app.get("/queue/status") async def queue_status(): return request_queue.get_status() @app.get("/models/status") async def models_status(): return { "models": model_manager.list_models(), "stats": model_manager.get_stats() } @app.post("/models/{model_id}/load") async def load_model(model_id: str): """Manually load a model.""" model_manager.load_model(model_id) return {"status": "loaded", "model": model_id} @app.post("/models/{model_id}/unload") async def unload_model(model_id: str): """Unload a model to free memory.""" model_manager.unload_model(model_id) return {"status": "unloaded", "model": model_id} # ============================================================ # OPENAI-COMPATIBLE ENDPOINTS (/v1) # ============================================================ @app.get("/v1/models") async def openai_list_models(): models = [] for model_id, config in MODEL_CONFIGS.items(): models.append(OpenAIModel( id=model_id, created=int(time.time()), owned_by="qwen" )) return OpenAIModelList(data=models) @app.post("/v1/chat/completions") async def openai_chat_completions( request: OpenAIChatRequest, authorization: Optional[str] = Header(None) ): chat_id = generate_id("chatcmpl") # Queue management position = await request_queue.acquire(chat_id) if position > 0: await request_queue.wait_for_turn(chat_id) try: llm = model_manager.get_model(request.model) prompt = format_openai_messages(request.messages) if request.stream: return await openai_stream_response(request, prompt, chat_id, llm) stop_tokens = ["<|im_end|>", "<|endoftext|>"] if request.stop: if isinstance(request.stop, str): stop_tokens.append(request.stop) else: stop_tokens.extend(request.stop) gen_start = time.time() output = llm( prompt, max_tokens=request.max_tokens or 1024, temperature=request.temperature or 0.7, top_p=request.top_p or 0.95, stop=stop_tokens, echo=False ) gen_time = time.time() - gen_start generated_text = output["choices"][0]["text"].strip() usage = output["usage"] logger.info(f"[{chat_id}] Generated in {gen_time:.2f}s - tokens: {usage['completion_tokens']}") return OpenAIChatResponse( id=chat_id, created=int(time.time()), model=request.model, choices=[OpenAIChoice( index=0, message={"role": "assistant", "content": generated_text}, finish_reason="stop" )], usage=OpenAIUsage( prompt_tokens=usage["prompt_tokens"], completion_tokens=usage["completion_tokens"], total_tokens=usage["total_tokens"] ) ) except Exception as e: logger.error(f"[{chat_id}] Error: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) finally: await request_queue.release() async def openai_stream_response(request: OpenAIChatRequest, prompt: str, chat_id: str, llm: Llama): async def generate(): try: created = int(time.time()) initial_chunk = { "id": chat_id, "object": "chat.completion.chunk", "created": created, "model": request.model, "choices": [{"index": 0, "delta": {"role": "assistant", "content": ""}, "finish_reason": None}] } yield f"data: {json.dumps(initial_chunk)}\n\n" stop_tokens = ["<|im_end|>", "<|endoftext|>"] if request.stop: if isinstance(request.stop, str): stop_tokens.append(request.stop) else: stop_tokens.extend(request.stop) for output in llm( prompt, max_tokens=request.max_tokens or 1024, temperature=request.temperature or 0.7, top_p=request.top_p or 0.95, stop=stop_tokens, stream=True, echo=False ): text = output["choices"][0]["text"] if text: chunk = { "id": chat_id, "object": "chat.completion.chunk", "created": created, "model": request.model, "choices": [{"index": 0, "delta": {"content": text}, "finish_reason": None}] } yield f"data: {json.dumps(chunk)}\n\n" final_chunk = { "id": chat_id, "object": "chat.completion.chunk", "created": created, "model": request.model, "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}] } yield f"data: {json.dumps(final_chunk)}\n\n" yield "data: [DONE]\n\n" finally: await request_queue.release() return StreamingResponse(generate(), media_type="text/event-stream", headers={"Cache-Control": "no-cache"}) # ============================================================ # ANTHROPIC-COMPATIBLE ENDPOINTS (/anthropic) # ============================================================ @app.get("/anthropic/v1/models") async def anthropic_list_models(): models = [] for model_id, config in MODEL_CONFIGS.items(): models.append({ "id": model_id, "object": "model", "created": int(time.time()), "owned_by": "qwen", "display_name": f"Qwen2.5 Coder {config['size']} ({config['quantization']})", "supports_thinking": True, "supports_tools": True, "loaded": model_id in model_manager.models, "available": os.path.exists(config["path"]) }) return {"object": "list", "data": models} @app.post("/anthropic/v1/messages", response_model=AnthropicMessageResponse) async def anthropic_create_message( request: AnthropicMessageRequest, x_api_key: Optional[str] = Header(None, alias="x-api-key"), anthropic_version: Optional[str] = Header(None, alias="anthropic-version"), anthropic_beta: Optional[str] = Header(None, alias="anthropic-beta") ): message_id = generate_id("msg") # Queue management position = await request_queue.acquire(message_id) if position > 0: await request_queue.wait_for_turn(message_id) thinking_enabled = False budget_tokens = 1024 if request.thinking: thinking_enabled = request.thinking.type == "enabled" budget_tokens = request.thinking.budget_tokens or 1024 # Check for cache control use_cache = check_cache_control(request.system) cache_hit = False cache_tokens = 0 try: llm = model_manager.get_model(request.model) # Check prompt cache system_text = extract_anthropic_system(request.system) tools_list = [t.model_dump() for t in request.tools] if request.tools else None if use_cache: cached = prompt_cache.get(system_text or "", tools_list) if cached: cache_hit = True cache_tokens = cached.get("tokens", 0) logger.info(f"[{message_id}] Prompt cache hit, saved ~{cache_tokens} tokens") prompt = format_anthropic_messages( request.messages, request.system, request.tools, thinking_enabled, budget_tokens ) # Cache the prompt prefix if cache_control is set if use_cache and not cache_hit: prompt_cache.set(system_text or "", tools_list, { "tokens": len(llm.tokenize(prompt.encode())) // 2, # Estimate prefix tokens "created": time.time() }) if request.stream: return await anthropic_stream_response(request, prompt, message_id, thinking_enabled, llm) total_max_tokens = request.max_tokens + (budget_tokens if thinking_enabled else 0) stop_tokens = ["<|im_end|>", "<|endoftext|>"] if request.stop_sequences: stop_tokens.extend(request.stop_sequences) gen_start = time.time() output = llm( prompt, max_tokens=total_max_tokens, temperature=request.temperature or 0.7, top_p=request.top_p or 0.95, top_k=request.top_k or 40, stop=stop_tokens, echo=False ) gen_time = time.time() - gen_start generated_text = output["choices"][0]["text"].strip() usage = output["usage"] # Parse response for tool use, thinking, etc. content_blocks = [] stop_reason = "end_turn" # Check for tool use tool_call = parse_tool_use(generated_text) if tool_call and request.tools: tool_id = f"toolu_{uuid.uuid4().hex[:24]}" content_blocks.append(AnthropicResponseToolUseBlock( type="tool_use", id=tool_id, name=tool_call["tool"], input=tool_call.get("arguments", {}) )) stop_reason = "tool_use" elif thinking_enabled: thinking_text, answer_text = parse_thinking_response(generated_text) if thinking_text: content_blocks.append(AnthropicResponseThinkingBlock(type="thinking", thinking=thinking_text)) content_blocks.append(AnthropicResponseTextBlock(type="text", text=answer_text)) else: content_blocks.append(AnthropicResponseTextBlock(type="text", text=generated_text)) if usage["completion_tokens"] >= total_max_tokens: stop_reason = "max_tokens" logger.info(f"[{message_id}] Generated in {gen_time:.2f}s - tokens: {usage['completion_tokens']}, cache_hit: {cache_hit}") return AnthropicMessageResponse( id=message_id, content=content_blocks, model=request.model, stop_reason=stop_reason, usage=AnthropicUsage( input_tokens=usage["prompt_tokens"], output_tokens=usage["completion_tokens"], cache_creation_input_tokens=cache_tokens if use_cache and not cache_hit else None, cache_read_input_tokens=cache_tokens if cache_hit else None ) ) except Exception as e: logger.error(f"[{message_id}] Error: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) finally: await request_queue.release() async def anthropic_stream_response(request: AnthropicMessageRequest, prompt: str, message_id: str, thinking_enabled: bool, llm: Llama): async def generate(): try: start_event = { "type": "message_start", "message": { "id": message_id, "type": "message", "role": "assistant", "content": [], "model": request.model, "stop_reason": None, "stop_sequence": None, "usage": {"input_tokens": 0, "output_tokens": 0} } } yield f"event: message_start\ndata: {json.dumps(start_event)}\n\n" yield f"event: content_block_start\ndata: {json.dumps({'type': 'content_block_start', 'index': 0, 'content_block': {'type': 'text', 'text': ''}})}\n\n" stop_tokens = ["<|im_end|>", "<|endoftext|>"] if request.stop_sequences: stop_tokens.extend(request.stop_sequences) total_tokens = 0 for output in llm( prompt, max_tokens=request.max_tokens, temperature=request.temperature or 0.7, top_p=request.top_p or 0.95, stop=stop_tokens, stream=True, echo=False ): text = output["choices"][0]["text"] if text: total_tokens += 1 yield f"event: content_block_delta\ndata: {json.dumps({'type': 'content_block_delta', 'index': 0, 'delta': {'type': 'text_delta', 'text': text}})}\n\n" yield f"event: content_block_stop\ndata: {json.dumps({'type': 'content_block_stop', 'index': 0})}\n\n" yield f"event: message_delta\ndata: {json.dumps({'type': 'message_delta', 'delta': {'stop_reason': 'end_turn'}, 'usage': {'output_tokens': total_tokens}})}\n\n" yield f"event: message_stop\ndata: {json.dumps({'type': 'message_stop'})}\n\n" finally: await request_queue.release() return StreamingResponse(generate(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) @app.post("/anthropic/v1/messages/count_tokens", response_model=AnthropicTokenCountResponse) async def anthropic_count_tokens(request: AnthropicTokenCountRequest): llm = model_manager.get_model(request.model) prompt = format_anthropic_messages(request.messages, request.system) tokens = llm.tokenize(prompt.encode()) return AnthropicTokenCountResponse(input_tokens=len(tokens)) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860, log_config=None)