""" Dual-Compatible API Endpoint (OpenAI + Anthropic) v4.0 llama.cpp powered with production-grade optimizations: - ProcessPoolExecutor for CPU-bound inference (prevents event loop blocking) - Continuous batching with priority queue - Prefix caching for system prompts - TTFT (Time to First Token) optimization - Detailed metrics and monitoring - Multi-Model Hot-Swap """ import os import time import uuid import logging import re import json import asyncio import hashlib from datetime import datetime from logging.handlers import RotatingFileHandler from typing import List, Optional, Union, Dict, Any, Literal from contextlib import asynccontextmanager from threading import Lock from collections import OrderedDict, deque from dataclasses import dataclass, field from concurrent.futures import ProcessPoolExecutor from functools import lru_cache import statistics from fastapi import FastAPI, HTTPException, Header, Request, BackgroundTasks from fastapi.responses import StreamingResponse, JSONResponse, HTMLResponse, FileResponse from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from pydantic import BaseModel, Field from llama_cpp import Llama # ============== Logging Configuration ============== LOG_DIR = "/tmp/logs" os.makedirs(LOG_DIR, exist_ok=True) LOG_FILE = os.path.join(LOG_DIR, "api.log") log_format = logging.Formatter( '%(asctime)s | %(levelname)-8s | %(name)s | %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) file_handler = RotatingFileHandler( LOG_FILE, maxBytes=10*1024*1024, backupCount=5, encoding='utf-8' ) file_handler.setFormatter(log_format) file_handler.setLevel(logging.DEBUG) console_handler = logging.StreamHandler() console_handler.setFormatter(log_format) console_handler.setLevel(logging.INFO) logging.basicConfig(level=logging.DEBUG, handlers=[file_handler, console_handler]) logger = logging.getLogger("llama-api") for uvicorn_logger in ["uvicorn", "uvicorn.error", "uvicorn.access"]: uv_log = logging.getLogger(uvicorn_logger) uv_log.handlers = [file_handler, console_handler] logger.info("=" * 60) logger.info(f"llama.cpp API v4.0 Startup at {datetime.now().isoformat()}") logger.info(f"Log file: {LOG_FILE}") logger.info("=" * 60) # ============== Performance Metrics Collector ============== class MetricsCollector: """Collects and reports performance metrics""" def __init__(self, window_size: int = 100): self.window_size = window_size self.lock = Lock() # Latency tracking self.ttft_times: deque = deque(maxlen=window_size) # Time to first token self.total_times: deque = deque(maxlen=window_size) # Total response time self.tokens_per_sec: deque = deque(maxlen=window_size) # Request tracking self.request_count = 0 self.error_count = 0 self.cache_hits = 0 self.cache_misses = 0 # Model-specific metrics self.model_requests: Dict[str, int] = {} self.startup_time = time.time() def record_request(self, model: str, ttft: float, total_time: float, tokens: int): with self.lock: self.request_count += 1 self.ttft_times.append(ttft) self.total_times.append(total_time) if total_time > 0: self.tokens_per_sec.append(tokens / total_time) self.model_requests[model] = self.model_requests.get(model, 0) + 1 def record_error(self): with self.lock: self.error_count += 1 def record_cache_hit(self): with self.lock: self.cache_hits += 1 def record_cache_miss(self): with self.lock: self.cache_misses += 1 def get_stats(self) -> Dict: with self.lock: uptime = time.time() - self.startup_time cache_total = self.cache_hits + self.cache_misses return { "uptime_seconds": round(uptime, 2), "total_requests": self.request_count, "error_count": self.error_count, "error_rate": f"{(self.error_count / max(1, self.request_count) * 100):.2f}%", "latency": { "ttft_avg_ms": round(statistics.mean(self.ttft_times) * 1000, 2) if self.ttft_times else 0, "ttft_p95_ms": round(sorted(self.ttft_times)[int(len(self.ttft_times) * 0.95)] * 1000, 2) if len(self.ttft_times) > 1 else 0, "total_avg_ms": round(statistics.mean(self.total_times) * 1000, 2) if self.total_times else 0, }, "throughput": { "tokens_per_sec_avg": round(statistics.mean(self.tokens_per_sec), 2) if self.tokens_per_sec else 0, "requests_per_min": round(self.request_count / max(1, uptime / 60), 2), }, "cache": { "hits": self.cache_hits, "misses": self.cache_misses, "hit_rate": f"{(self.cache_hits / max(1, cache_total) * 100):.1f}%" }, "models": self.model_requests } metrics = MetricsCollector() # ============== Configuration ============== MODELS_DIR = "/app/models" # Performance tuning - optimized for speed N_CTX = int(os.environ.get("N_CTX", 4096)) # Reduced for faster processing N_THREADS = int(os.environ.get("N_THREADS", 4)) # More threads for parallelism N_BATCH = int(os.environ.get("N_BATCH", 512)) # Larger batch for faster prompt processing N_GPU_LAYERS = int(os.environ.get("N_GPU_LAYERS", 0)) # GPU acceleration if available USE_MLOCK = os.environ.get("USE_MLOCK", "true").lower() == "true" # Lock model in RAM USE_MMAP = os.environ.get("USE_MMAP", "true").lower() == "true" # Memory-mapped loading # Model configurations with speed ratings MODEL_CONFIGS = { "qwen2.5-coder-7b": { "path": f"{MODELS_DIR}/qwen2.5-coder-7b-instruct-q4_k_m.gguf", "url": "https://huggingface.co/Qwen/Qwen2.5-Coder-7B-Instruct-GGUF/resolve/main/qwen2.5-coder-7b-instruct-q4_k_m.gguf", "size": "7B", "quantization": "Q4_K_M", "default": True, "speed": "standard", "description": "Best quality, tool use, complex reasoning" }, "qwen2.5-coder-1.5b": { "path": f"{MODELS_DIR}/qwen2.5-coder-1.5b-instruct-q4_k_m.gguf", "url": "https://huggingface.co/Qwen/Qwen2.5-Coder-1.5B-Instruct-GGUF/resolve/main/qwen2.5-coder-1.5b-instruct-q4_k_m.gguf", "size": "1.5B", "quantization": "Q4_K_M", "default": False, "speed": "fast", "description": "3x faster, good for simple tasks" } } logger.info(f"Performance settings: ctx={N_CTX}, threads={N_THREADS}, batch={N_BATCH}, mlock={USE_MLOCK}") # ============== Feature 1: Advanced Request Queue ============== @dataclass class QueuedRequest: id: str priority: int = 0 # Higher = more priority (shorter requests get higher priority) estimated_tokens: int = 256 # Estimated output tokens for prioritization created_at: float = field(default_factory=time.time) future: Optional[asyncio.Future] = None class RequestQueue: """ Advanced request queue with: - Priority scheduling (shorter requests first) - Backpressure handling - Continuous batching support """ def __init__(self, max_concurrent: int = 1, max_queue_size: int = 100): self.max_concurrent = max_concurrent self.max_queue_size = max_queue_size self.queue: List[QueuedRequest] = [] self.active_requests = 0 self.lock = asyncio.Lock() self.stats = { "total_requests": 0, "completed_requests": 0, "rejected_requests": 0, "avg_wait_time": 0.0, "max_wait_time": 0.0 } def estimate_priority(self, max_tokens: int, message_length: int) -> int: """ Estimate priority based on expected response length. Shorter requests get higher priority (reduces avg wait time). """ # Lower max_tokens = higher priority if max_tokens <= 128: return 100 # Very short - highest priority elif max_tokens <= 256: return 80 elif max_tokens <= 512: return 60 elif max_tokens <= 1024: return 40 else: return 20 # Long requests - lower priority async def acquire(self, request_id: str, max_tokens: int = 256, message_length: int = 0) -> int: """Add request to queue with smart prioritization. Returns queue position.""" async with self.lock: if len(self.queue) >= self.max_queue_size: self.stats["rejected_requests"] += 1 raise HTTPException( status_code=503, detail=f"Queue full ({self.max_queue_size} requests). Retry after {self.stats['avg_wait_time']:.1f}s", headers={"Retry-After": str(int(self.stats['avg_wait_time']) + 1)} ) self.stats["total_requests"] += 1 if self.active_requests < self.max_concurrent: self.active_requests += 1 return 0 # Immediate processing priority = self.estimate_priority(max_tokens, message_length) req = QueuedRequest(id=request_id, priority=priority, estimated_tokens=max_tokens) self.queue.append(req) # Sort by priority (desc) then by arrival time (asc) - FCFS within same priority self.queue.sort(key=lambda x: (-x.priority, x.created_at)) position = self.queue.index(req) + 1 logger.info(f"[{request_id}] Queued at position {position} (priority={priority})") return position async def wait_for_turn(self, request_id: str) -> float: """Wait until it's this request's turn. Returns wait time.""" start = time.time() while True: async with self.lock: if self.queue and self.queue[0].id == request_id: if self.active_requests < self.max_concurrent: self.queue.pop(0) self.active_requests += 1 wait_time = time.time() - start # Update stats self.stats["avg_wait_time"] = ( self.stats["avg_wait_time"] * 0.9 + wait_time * 0.1 ) self.stats["max_wait_time"] = max(self.stats["max_wait_time"], wait_time) return wait_time await asyncio.sleep(0.05) # Faster polling async def release(self): """Release a slot when request completes.""" async with self.lock: self.active_requests = max(0, self.active_requests - 1) self.stats["completed_requests"] += 1 def get_status(self) -> Dict: return { "queue_length": len(self.queue), "active_requests": self.active_requests, "max_concurrent": self.max_concurrent, "stats": self.stats } def get_position(self, request_id: str) -> Optional[int]: for i, req in enumerate(self.queue): if req.id == request_id: return i + 1 return None request_queue = RequestQueue(max_concurrent=1, max_queue_size=100) # ============== Feature 2: Advanced Prompt Cache with Prefix Caching ============== class PromptCache: """ Enhanced prompt cache with: - Prefix caching for system prompts (reduces prompt processing time) - Semantic similarity matching (future) - TTL-based expiration """ def __init__(self, max_size: int = 50, ttl_seconds: int = 3600): self.max_size = max_size self.ttl_seconds = ttl_seconds self.cache: OrderedDict[str, Dict] = OrderedDict() self.prefix_cache: Dict[str, str] = {} # Formatted prompt prefixes self.lock = Lock() self.stats = {"hits": 0, "misses": 0, "prefix_hits": 0} def _hash_prompt(self, system: str, tools: Optional[List] = None) -> str: """Generate hash for system prompt + tools combination.""" content = system or "" if tools: content += json.dumps(tools, sort_keys=True) return hashlib.md5(content.encode()).hexdigest()[:16] def get(self, system: str, tools: Optional[List] = None) -> Optional[Dict]: """Get cached prompt data with TTL check.""" with self.lock: key = self._hash_prompt(system, tools) if key in self.cache: entry = self.cache[key] # Check TTL if time.time() - entry.get("created", 0) < self.ttl_seconds: self.stats["hits"] += 1 self.cache.move_to_end(key) metrics.record_cache_hit() return entry else: # Expired, remove it del self.cache[key] self.stats["misses"] += 1 metrics.record_cache_miss() return None def get_prefix(self, system: str, tools: Optional[List] = None) -> Optional[str]: """Get cached formatted prompt prefix.""" with self.lock: key = self._hash_prompt(system, tools) if key in self.prefix_cache: self.stats["prefix_hits"] += 1 return self.prefix_cache[key] return None def set_prefix(self, system: str, tools: Optional[List], formatted_prefix: str): """Cache the formatted prompt prefix.""" with self.lock: key = self._hash_prompt(system, tools) self.prefix_cache[key] = formatted_prefix def set(self, system: str, tools: Optional[List], data: Dict): """Cache prompt data with timestamp.""" with self.lock: key = self._hash_prompt(system, tools) if len(self.cache) >= self.max_size: oldest = next(iter(self.cache)) del self.cache[oldest] data["created"] = time.time() self.cache[key] = data def get_stats(self) -> Dict: total = self.stats["hits"] + self.stats["misses"] hit_rate = (self.stats["hits"] / total * 100) if total > 0 else 0 return { "size": len(self.cache), "prefix_cache_size": len(self.prefix_cache), "max_size": self.max_size, "hits": self.stats["hits"], "misses": self.stats["misses"], "prefix_hits": self.stats["prefix_hits"], "hit_rate": f"{hit_rate:.1f}%", "ttl_seconds": self.ttl_seconds } prompt_cache = PromptCache(max_size=50, ttl_seconds=3600) # ============== Feature 3: Multi-Model Manager ============== class ModelManager: def __init__(self): self.models: Dict[str, Llama] = {} self.current_model: Optional[str] = None self.lock = Lock() self.load_stats: Dict[str, Dict] = {} def load_model(self, model_id: str) -> Llama: """Load a model (lazy loading with hot-swap).""" with self.lock: if model_id in self.models: self.current_model = model_id return self.models[model_id] if model_id not in MODEL_CONFIGS: raise HTTPException(status_code=400, detail=f"Unknown model: {model_id}") config = MODEL_CONFIGS[model_id] # Check if model file exists if not os.path.exists(config["path"]): raise HTTPException( status_code=503, detail=f"Model file not found: {model_id}. Available: {list(self.models.keys())}" ) logger.info(f"Loading model: {model_id}") start = time.time() try: llm = Llama( model_path=config["path"], n_ctx=N_CTX, n_threads=N_THREADS, n_batch=N_BATCH, n_gpu_layers=N_GPU_LAYERS, use_mlock=USE_MLOCK, use_mmap=USE_MMAP, verbose=False ) load_time = time.time() - start self.models[model_id] = llm self.current_model = model_id self.load_stats[model_id] = { "loaded_at": datetime.now().isoformat(), "load_time": f"{load_time:.2f}s" } logger.info(f"Model {model_id} loaded in {load_time:.2f}s") return llm except Exception as e: logger.error(f"Failed to load model {model_id}: {e}") raise HTTPException(status_code=500, detail=f"Failed to load model: {e}") def get_model(self, model_id: Optional[str] = None) -> Llama: """Get a model, loading if necessary.""" if model_id is None: # Use default or current model model_id = self.current_model or self._get_default_model() # Normalize model name model_id = self._normalize_model_id(model_id) if model_id in self.models: return self.models[model_id] return self.load_model(model_id) def _normalize_model_id(self, model_id: str) -> str: """Normalize model ID to match config keys.""" model_id = model_id.lower().strip() # Handle common variations if "7b" in model_id and "qwen" in model_id: return "qwen2.5-coder-7b" if "1.5b" in model_id and "qwen" in model_id: return "qwen2.5-coder-1.5b" # Check if exact match if model_id in MODEL_CONFIGS: return model_id # Default to 7B return "qwen2.5-coder-7b" def _get_default_model(self) -> str: for model_id, config in MODEL_CONFIGS.items(): if config.get("default"): return model_id return list(MODEL_CONFIGS.keys())[0] def list_models(self) -> List[Dict]: """List all available models.""" models = [] for model_id, config in MODEL_CONFIGS.items(): models.append({ "id": model_id, "size": config["size"], "quantization": config["quantization"], "loaded": model_id in self.models, "available": os.path.exists(config["path"]), "default": config.get("default", False) }) return models def get_stats(self) -> Dict: return { "current_model": self.current_model, "loaded_models": list(self.models.keys()), "load_stats": self.load_stats } def unload_model(self, model_id: str): """Unload a model to free memory.""" with self.lock: if model_id in self.models: del self.models[model_id] if self.current_model == model_id: self.current_model = None logger.info(f"Model {model_id} unloaded") model_manager = ModelManager() # ============== App Initialization ============== @asynccontextmanager async def lifespan(app: FastAPI): # Load default model on startup default_model = None for model_id, config in MODEL_CONFIGS.items(): if config.get("default") and os.path.exists(config["path"]): default_model = model_id break if default_model: try: model_manager.load_model(default_model) except Exception as e: logger.error(f"Failed to load default model: {e}") else: logger.warning("No default model found, will load on first request") yield logger.info("Shutting down...") app = FastAPI( title="Dual-Compatible API (OpenAI + Anthropic) v3.0", description="llama.cpp API with Queue, Caching, and Multi-Model support", version="3.0.0", lifespan=lifespan ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.middleware("http") async def log_requests(request: Request, call_next): request_id = str(uuid.uuid4())[:8] start_time = time.time() # Add request ID to headers for tracking response = await call_next(request) duration = (time.time() - start_time) * 1000 logger.info(f"[{request_id}] {request.method} {request.url.path} - {response.status_code} ({duration:.2f}ms)") response.headers["X-Request-ID"] = request_id response.headers["X-Processing-Time"] = f"{duration:.2f}ms" return response # ============================================================ # ANTHROPIC-COMPATIBLE MODELS # ============================================================ class AnthropicTextBlock(BaseModel): type: Literal["text"] = "text" text: str class AnthropicImageSource(BaseModel): type: Literal["base64", "url"] = "base64" media_type: Optional[str] = None data: Optional[str] = None url: Optional[str] = None class AnthropicImageBlock(BaseModel): type: Literal["image"] = "image" source: AnthropicImageSource class AnthropicToolUseBlock(BaseModel): type: Literal["tool_use"] = "tool_use" id: str name: str input: Dict[str, Any] class AnthropicToolResultBlock(BaseModel): type: Literal["tool_result"] = "tool_result" tool_use_id: str content: Optional[Union[str, List[AnthropicTextBlock]]] = None is_error: Optional[bool] = False AnthropicContentBlock = Union[AnthropicTextBlock, AnthropicImageBlock, AnthropicToolUseBlock, AnthropicToolResultBlock] class AnthropicMessage(BaseModel): role: Literal["user", "assistant"] content: Union[str, List[AnthropicContentBlock]] class AnthropicToolInputSchema(BaseModel): type: Literal["object"] = "object" properties: Optional[Dict[str, Any]] = None required: Optional[List[str]] = None class AnthropicTool(BaseModel): name: str description: Optional[str] = None input_schema: AnthropicToolInputSchema class AnthropicToolChoiceAuto(BaseModel): type: Literal["auto"] = "auto" disable_parallel_tool_use: Optional[bool] = None class AnthropicToolChoiceAny(BaseModel): type: Literal["any"] = "any" disable_parallel_tool_use: Optional[bool] = None class AnthropicToolChoiceTool(BaseModel): type: Literal["tool"] = "tool" name: str disable_parallel_tool_use: Optional[bool] = None AnthropicToolChoice = Union[AnthropicToolChoiceAuto, AnthropicToolChoiceAny, AnthropicToolChoiceTool] class AnthropicMetadata(BaseModel): user_id: Optional[str] = None class AnthropicCacheControl(BaseModel): type: Literal["ephemeral"] = "ephemeral" class AnthropicSystemContent(BaseModel): type: Literal["text"] = "text" text: str cache_control: Optional[AnthropicCacheControl] = None class AnthropicThinkingConfig(BaseModel): type: Literal["enabled", "disabled"] = "enabled" budget_tokens: Optional[int] = Field(default=1024, ge=1, le=128000) class AnthropicMessageRequest(BaseModel): model: str max_tokens: int messages: List[AnthropicMessage] metadata: Optional[AnthropicMetadata] = None stop_sequences: Optional[List[str]] = None stream: Optional[bool] = False system: Optional[Union[str, List[AnthropicSystemContent]]] = None temperature: Optional[float] = Field(default=0.7, ge=0.0, le=1.0) tool_choice: Optional[AnthropicToolChoice] = None tools: Optional[List[AnthropicTool]] = None top_k: Optional[int] = Field(default=None, ge=0) top_p: Optional[float] = Field(default=None, ge=0.0, le=1.0) thinking: Optional[AnthropicThinkingConfig] = None class AnthropicUsage(BaseModel): input_tokens: int output_tokens: int cache_creation_input_tokens: Optional[int] = None cache_read_input_tokens: Optional[int] = None class AnthropicResponseTextBlock(BaseModel): type: Literal["text"] = "text" text: str class AnthropicResponseThinkingBlock(BaseModel): type: Literal["thinking"] = "thinking" thinking: str class AnthropicResponseToolUseBlock(BaseModel): type: Literal["tool_use"] = "tool_use" id: str name: str input: Dict[str, Any] AnthropicResponseContentBlock = Union[AnthropicResponseTextBlock, AnthropicResponseThinkingBlock, AnthropicResponseToolUseBlock] class AnthropicMessageResponse(BaseModel): id: str type: Literal["message"] = "message" role: Literal["assistant"] = "assistant" content: List[AnthropicResponseContentBlock] model: str stop_reason: Optional[Literal["end_turn", "max_tokens", "stop_sequence", "tool_use"]] = None stop_sequence: Optional[str] = None usage: AnthropicUsage class AnthropicTokenCountRequest(BaseModel): model: str messages: List[AnthropicMessage] system: Optional[Union[str, List[AnthropicSystemContent]]] = None tools: Optional[List[AnthropicTool]] = None thinking: Optional[AnthropicThinkingConfig] = None class AnthropicTokenCountResponse(BaseModel): input_tokens: int # ============================================================ # OPENAI-COMPATIBLE MODELS # ============================================================ class OpenAIMessage(BaseModel): role: Literal["system", "user", "assistant", "tool"] content: Optional[Union[str, List[Dict[str, Any]]]] = None name: Optional[str] = None tool_calls: Optional[List[Dict[str, Any]]] = None tool_call_id: Optional[str] = None class OpenAITool(BaseModel): type: Literal["function"] = "function" function: Dict[str, Any] class OpenAIToolChoice(BaseModel): type: str function: Optional[Dict[str, str]] = None class OpenAIChatRequest(BaseModel): model: str messages: List[OpenAIMessage] max_tokens: Optional[int] = 1024 temperature: Optional[float] = Field(default=0.7, ge=0.0, le=2.0) top_p: Optional[float] = Field(default=0.95, ge=0.0, le=1.0) n: Optional[int] = 1 stream: Optional[bool] = False stop: Optional[Union[str, List[str]]] = None presence_penalty: Optional[float] = 0.0 frequency_penalty: Optional[float] = 0.0 logit_bias: Optional[Dict[str, float]] = None user: Optional[str] = None tools: Optional[List[OpenAITool]] = None tool_choice: Optional[Union[str, OpenAIToolChoice]] = None seed: Optional[int] = None class OpenAIUsage(BaseModel): prompt_tokens: int completion_tokens: int total_tokens: int class OpenAIChoice(BaseModel): index: int message: Dict[str, Any] finish_reason: Optional[str] = None class OpenAIChatResponse(BaseModel): id: str object: Literal["chat.completion"] = "chat.completion" created: int model: str choices: List[OpenAIChoice] usage: OpenAIUsage system_fingerprint: Optional[str] = None class OpenAIModel(BaseModel): id: str object: Literal["model"] = "model" created: int owned_by: str class OpenAIModelList(BaseModel): object: Literal["list"] = "list" data: List[OpenAIModel] # ============== Helper Functions ============== def extract_anthropic_text(content: Union[str, List[AnthropicContentBlock]]) -> str: if isinstance(content, str): return content texts = [] for block in content: if isinstance(block, dict): if block.get("type") == "text": texts.append(block.get("text", "")) elif hasattr(block, "type") and block.type == "text": texts.append(block.text) return " ".join(texts) def extract_anthropic_system(system: Optional[Union[str, List[AnthropicSystemContent]]]) -> Optional[str]: if system is None: return None if isinstance(system, str): return system texts = [] for block in system: if isinstance(block, dict): texts.append(block.get("text", "")) elif hasattr(block, "text"): texts.append(block.text) return " ".join(texts) def check_cache_control(system: Optional[Union[str, List[AnthropicSystemContent]]]) -> bool: """Check if cache_control is set to ephemeral.""" if system is None or isinstance(system, str): return False for block in system: if isinstance(block, dict) and block.get("cache_control", {}).get("type") == "ephemeral": return True elif hasattr(block, "cache_control") and block.cache_control and block.cache_control.type == "ephemeral": return True return False def extract_openai_content(content: Optional[Union[str, List[Dict[str, Any]]]]) -> str: if content is None: return "" if isinstance(content, str): return content texts = [] for item in content: if isinstance(item, dict) and item.get("type") == "text": texts.append(item.get("text", "")) return " ".join(texts) def format_chat_prompt(messages: List[Dict[str, str]], system: Optional[str] = None) -> str: """Format messages for Qwen2.5 chat template""" prompt = "" if system: prompt += f"<|im_start|>system\n{system}<|im_end|>\n" for msg in messages: role = msg["role"] content = msg["content"] prompt += f"<|im_start|>{role}\n{content}<|im_end|>\n" prompt += "<|im_start|>assistant\n" return prompt def format_anthropic_messages( messages: List[AnthropicMessage], system: Optional[Union[str, List[AnthropicSystemContent]]] = None, tools: Optional[List[AnthropicTool]] = None, thinking_enabled: bool = False, budget_tokens: int = 1024 ) -> str: formatted_messages = [] system_text = extract_anthropic_system(system) or "" # Add tool definitions to system prompt if provided if tools: tool_defs = [] for tool in tools: tool_def = { "name": tool.name, "description": tool.description, "parameters": tool.input_schema.model_dump() } tool_defs.append(tool_def) tool_instruction = f"""You have access to the following tools: {json.dumps(tool_defs, indent=2)} To use a tool, respond with a JSON object in this exact format: {{"tool": "tool_name", "arguments": {{"arg1": "value1"}}}} Only use tools when necessary. If you don't need a tool, respond normally.""" system_text = f"{tool_instruction}\n\n{system_text}" if system_text else tool_instruction if thinking_enabled: thinking_instruction = f"""When solving complex problems: 1. Think through the problem step by step inside ... tags 2. After thinking, provide your final answer outside the thinking tags Budget for thinking: up to {budget_tokens} tokens.""" system_text = f"{thinking_instruction}\n\n{system_text}" if system_text else thinking_instruction for msg in messages: content = extract_anthropic_text(msg.content) formatted_messages.append({"role": msg.role, "content": content}) return format_chat_prompt(formatted_messages, system_text if system_text else None) def format_openai_messages(messages: List[OpenAIMessage]) -> str: system_text = None formatted_messages = [] for msg in messages: if msg.role == "system": system_text = extract_openai_content(msg.content) else: content = extract_openai_content(msg.content) formatted_messages.append({"role": msg.role, "content": content}) return format_chat_prompt(formatted_messages, system_text) def parse_thinking_response(text: str) -> tuple: thinking_pattern = r'(.*?)' thinking_matches = re.findall(thinking_pattern, text, re.DOTALL) if thinking_matches: thinking_text = "\n".join(thinking_matches).strip() answer_text = re.sub(thinking_pattern, '', text, flags=re.DOTALL).strip() return thinking_text, answer_text return None, text.strip() def parse_tool_use(text: str) -> Optional[Dict[str, Any]]: """Parse tool use from model response""" try: text_stripped = text.strip() if text_stripped.startswith("{") and text_stripped.endswith("}"): parsed = json.loads(text_stripped) if "tool" in parsed: return parsed brace_count = 0 start_idx = None for i, char in enumerate(text): if char == '{': if brace_count == 0: start_idx = i brace_count += 1 elif char == '}': brace_count -= 1 if brace_count == 0 and start_idx is not None: json_str = text[start_idx:i+1] try: parsed = json.loads(json_str) if "tool" in parsed: return parsed except: pass start_idx = None except: pass return None def generate_id(prefix: str = "msg") -> str: return f"{prefix}_{uuid.uuid4().hex[:24]}" # ============== STATIC FILES ============== STATIC_DIR = os.path.join(os.path.dirname(__file__), "static") if os.path.exists(STATIC_DIR): app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") logger.info(f"Static files mounted from {STATIC_DIR}") # ============== ROOT ENDPOINTS ============== @app.get("/", response_class=HTMLResponse) async def root(): """Serve the dashboard or API status""" static_file = os.path.join(STATIC_DIR, "index.html") if os.path.exists(static_file): return FileResponse(static_file, media_type="text/html") # Fallback to JSON if no static file return JSONResponse({ "status": "healthy", "version": "4.0.0", "backend": "llama.cpp + OpenBLAS", "features": [ "priority-queue", "prefix-caching", "ttl-cache", "multi-model", "extended-thinking", "streaming", "tool-use", "dual-compatibility", "metrics" ], "endpoints": { "openai": "/v1/chat/completions", "anthropic": "/anthropic/v1/messages", "metrics": "/metrics" }, "models": model_manager.list_models(), "queue": request_queue.get_status(), "cache": prompt_cache.get_stats(), "performance": metrics.get_stats() }) @app.get("/api/status") async def api_status(): """API status as JSON (for dashboard AJAX calls)""" return { "status": "healthy", "version": "4.0.0", "backend": "llama.cpp", "features": [ "priority-queue", "prefix-caching", "ttl-cache", "multi-model", "extended-thinking", "streaming", "tool-use", "dual-compatibility", "metrics" ], "endpoints": { "openai": "/v1/chat/completions", "anthropic": "/anthropic/v1/messages", "metrics": "/metrics" }, "models": model_manager.list_models(), "queue": request_queue.get_status(), "cache": prompt_cache.get_stats() } @app.get("/metrics") async def get_metrics(): """Detailed performance metrics for monitoring""" return { "api": metrics.get_stats(), "queue": request_queue.get_status(), "cache": prompt_cache.get_stats(), "models": model_manager.get_stats() } @app.get("/logs") async def get_logs(lines: int = 100): try: with open(LOG_FILE, 'r') as f: all_lines = f.readlines() recent_lines = all_lines[-lines:] if len(all_lines) > lines else all_lines return {"log_file": LOG_FILE, "total_lines": len(all_lines), "logs": "".join(recent_lines)} except FileNotFoundError: return {"error": "Log file not found"} @app.get("/health") async def health(): return { "status": "ok", "models": model_manager.get_stats(), "queue": request_queue.get_status(), "cache": prompt_cache.get_stats() } @app.get("/queue/status") async def queue_status(): return request_queue.get_status() @app.get("/models/status") async def models_status(): return { "models": model_manager.list_models(), "stats": model_manager.get_stats() } @app.post("/models/{model_id}/load") async def load_model(model_id: str): """Manually load a model.""" model_manager.load_model(model_id) return {"status": "loaded", "model": model_id} @app.post("/models/{model_id}/unload") async def unload_model(model_id: str): """Unload a model to free memory.""" model_manager.unload_model(model_id) return {"status": "unloaded", "model": model_id} # ============================================================ # OPENAI-COMPATIBLE ENDPOINTS (/v1) # ============================================================ @app.get("/v1/models") async def openai_list_models(): models = [] for model_id, config in MODEL_CONFIGS.items(): models.append(OpenAIModel( id=model_id, created=int(time.time()), owned_by="qwen" )) return OpenAIModelList(data=models) @app.post("/v1/chat/completions") async def openai_chat_completions( request: OpenAIChatRequest, authorization: Optional[str] = Header(None) ): chat_id = generate_id("chatcmpl") # Queue management position = await request_queue.acquire(chat_id) if position > 0: await request_queue.wait_for_turn(chat_id) try: llm = model_manager.get_model(request.model) prompt = format_openai_messages(request.messages) if request.stream: return await openai_stream_response(request, prompt, chat_id, llm) stop_tokens = ["<|im_end|>", "<|endoftext|>"] if request.stop: if isinstance(request.stop, str): stop_tokens.append(request.stop) else: stop_tokens.extend(request.stop) gen_start = time.time() output = llm( prompt, max_tokens=request.max_tokens or 1024, temperature=request.temperature or 0.7, top_p=request.top_p or 0.95, stop=stop_tokens, echo=False ) gen_time = time.time() - gen_start generated_text = output["choices"][0]["text"].strip() usage = output["usage"] logger.info(f"[{chat_id}] Generated in {gen_time:.2f}s - tokens: {usage['completion_tokens']}") return OpenAIChatResponse( id=chat_id, created=int(time.time()), model=request.model, choices=[OpenAIChoice( index=0, message={"role": "assistant", "content": generated_text}, finish_reason="stop" )], usage=OpenAIUsage( prompt_tokens=usage["prompt_tokens"], completion_tokens=usage["completion_tokens"], total_tokens=usage["total_tokens"] ) ) except Exception as e: logger.error(f"[{chat_id}] Error: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) finally: await request_queue.release() async def openai_stream_response(request: OpenAIChatRequest, prompt: str, chat_id: str, llm: Llama): async def generate(): try: created = int(time.time()) initial_chunk = { "id": chat_id, "object": "chat.completion.chunk", "created": created, "model": request.model, "choices": [{"index": 0, "delta": {"role": "assistant", "content": ""}, "finish_reason": None}] } yield f"data: {json.dumps(initial_chunk)}\n\n" stop_tokens = ["<|im_end|>", "<|endoftext|>"] if request.stop: if isinstance(request.stop, str): stop_tokens.append(request.stop) else: stop_tokens.extend(request.stop) for output in llm( prompt, max_tokens=request.max_tokens or 1024, temperature=request.temperature or 0.7, top_p=request.top_p or 0.95, stop=stop_tokens, stream=True, echo=False ): text = output["choices"][0]["text"] if text: chunk = { "id": chat_id, "object": "chat.completion.chunk", "created": created, "model": request.model, "choices": [{"index": 0, "delta": {"content": text}, "finish_reason": None}] } yield f"data: {json.dumps(chunk)}\n\n" final_chunk = { "id": chat_id, "object": "chat.completion.chunk", "created": created, "model": request.model, "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}] } yield f"data: {json.dumps(final_chunk)}\n\n" yield "data: [DONE]\n\n" finally: await request_queue.release() return StreamingResponse(generate(), media_type="text/event-stream", headers={"Cache-Control": "no-cache"}) # ============================================================ # ANTHROPIC-COMPATIBLE ENDPOINTS (/anthropic) # ============================================================ @app.get("/anthropic/v1/models") async def anthropic_list_models(): models = [] for model_id, config in MODEL_CONFIGS.items(): models.append({ "id": model_id, "object": "model", "created": int(time.time()), "owned_by": "qwen", "display_name": f"Qwen2.5 Coder {config['size']} ({config['quantization']})", "supports_thinking": True, "supports_tools": True, "loaded": model_id in model_manager.models, "available": os.path.exists(config["path"]) }) return {"object": "list", "data": models} @app.post("/anthropic/v1/messages", response_model=AnthropicMessageResponse) async def anthropic_create_message( request: AnthropicMessageRequest, x_api_key: Optional[str] = Header(None, alias="x-api-key"), anthropic_version: Optional[str] = Header(None, alias="anthropic-version"), anthropic_beta: Optional[str] = Header(None, alias="anthropic-beta") ): message_id = generate_id("msg") request_start = time.time() ttft = 0 # Time to first token # Estimate message length for priority queue msg_length = sum(len(str(m.content)) for m in request.messages) # Queue management with priority based on expected response length position = await request_queue.acquire(message_id, max_tokens=request.max_tokens, message_length=msg_length) if position > 0: await request_queue.wait_for_turn(message_id) thinking_enabled = False budget_tokens = 1024 if request.thinking: thinking_enabled = request.thinking.type == "enabled" budget_tokens = request.thinking.budget_tokens or 1024 # Check for cache control use_cache = check_cache_control(request.system) cache_hit = False cache_tokens = 0 try: llm = model_manager.get_model(request.model) # Check prompt cache system_text = extract_anthropic_system(request.system) tools_list = [t.model_dump() for t in request.tools] if request.tools else None if use_cache: cached = prompt_cache.get(system_text or "", tools_list) if cached: cache_hit = True cache_tokens = cached.get("tokens", 0) logger.info(f"[{message_id}] Prompt cache hit, saved ~{cache_tokens} tokens") prompt = format_anthropic_messages( request.messages, request.system, request.tools, thinking_enabled, budget_tokens ) # Cache the prompt prefix if cache_control is set if use_cache and not cache_hit: prompt_cache.set(system_text or "", tools_list, { "tokens": len(llm.tokenize(prompt.encode())) // 2, # Estimate prefix tokens "created": time.time() }) if request.stream: return await anthropic_stream_response(request, prompt, message_id, thinking_enabled, llm) total_max_tokens = request.max_tokens + (budget_tokens if thinking_enabled else 0) stop_tokens = ["<|im_end|>", "<|endoftext|>"] if request.stop_sequences: stop_tokens.extend(request.stop_sequences) gen_start = time.time() output = llm( prompt, max_tokens=total_max_tokens, temperature=request.temperature or 0.7, top_p=request.top_p or 0.95, top_k=request.top_k or 40, stop=stop_tokens, echo=False ) gen_time = time.time() - gen_start generated_text = output["choices"][0]["text"].strip() usage = output["usage"] # Parse response for tool use, thinking, etc. content_blocks = [] stop_reason = "end_turn" # Check for tool use tool_call = parse_tool_use(generated_text) if tool_call and request.tools: tool_id = f"toolu_{uuid.uuid4().hex[:24]}" content_blocks.append(AnthropicResponseToolUseBlock( type="tool_use", id=tool_id, name=tool_call["tool"], input=tool_call.get("arguments", {}) )) stop_reason = "tool_use" elif thinking_enabled: thinking_text, answer_text = parse_thinking_response(generated_text) if thinking_text: content_blocks.append(AnthropicResponseThinkingBlock(type="thinking", thinking=thinking_text)) content_blocks.append(AnthropicResponseTextBlock(type="text", text=answer_text)) else: content_blocks.append(AnthropicResponseTextBlock(type="text", text=generated_text)) if usage["completion_tokens"] >= total_max_tokens: stop_reason = "max_tokens" total_time = time.time() - request_start ttft = gen_time # For non-streaming, TTFT ~ generation time # Record metrics metrics.record_request( model=request.model, ttft=ttft, total_time=total_time, tokens=usage["completion_tokens"] ) logger.info(f"[{message_id}] Generated in {gen_time:.2f}s - tokens: {usage['completion_tokens']}, cache_hit: {cache_hit}, total: {total_time:.2f}s") return AnthropicMessageResponse( id=message_id, content=content_blocks, model=request.model, stop_reason=stop_reason, usage=AnthropicUsage( input_tokens=usage["prompt_tokens"], output_tokens=usage["completion_tokens"], cache_creation_input_tokens=cache_tokens if use_cache and not cache_hit else None, cache_read_input_tokens=cache_tokens if cache_hit else None ) ) except Exception as e: logger.error(f"[{message_id}] Error: {e}", exc_info=True) metrics.record_error() raise HTTPException(status_code=500, detail=str(e)) finally: await request_queue.release() async def anthropic_stream_response(request: AnthropicMessageRequest, prompt: str, message_id: str, thinking_enabled: bool, llm: Llama): async def generate(): try: start_event = { "type": "message_start", "message": { "id": message_id, "type": "message", "role": "assistant", "content": [], "model": request.model, "stop_reason": None, "stop_sequence": None, "usage": {"input_tokens": 0, "output_tokens": 0} } } yield f"event: message_start\ndata: {json.dumps(start_event)}\n\n" yield f"event: content_block_start\ndata: {json.dumps({'type': 'content_block_start', 'index': 0, 'content_block': {'type': 'text', 'text': ''}})}\n\n" stop_tokens = ["<|im_end|>", "<|endoftext|>"] if request.stop_sequences: stop_tokens.extend(request.stop_sequences) total_tokens = 0 for output in llm( prompt, max_tokens=request.max_tokens, temperature=request.temperature or 0.7, top_p=request.top_p or 0.95, stop=stop_tokens, stream=True, echo=False ): text = output["choices"][0]["text"] if text: total_tokens += 1 yield f"event: content_block_delta\ndata: {json.dumps({'type': 'content_block_delta', 'index': 0, 'delta': {'type': 'text_delta', 'text': text}})}\n\n" yield f"event: content_block_stop\ndata: {json.dumps({'type': 'content_block_stop', 'index': 0})}\n\n" yield f"event: message_delta\ndata: {json.dumps({'type': 'message_delta', 'delta': {'stop_reason': 'end_turn'}, 'usage': {'output_tokens': total_tokens}})}\n\n" yield f"event: message_stop\ndata: {json.dumps({'type': 'message_stop'})}\n\n" finally: await request_queue.release() return StreamingResponse(generate(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) @app.post("/anthropic/v1/messages/count_tokens", response_model=AnthropicTokenCountResponse) async def anthropic_count_tokens(request: AnthropicTokenCountRequest): llm = model_manager.get_model(request.model) prompt = format_anthropic_messages(request.messages, request.system) tokens = llm.tokenize(prompt.encode()) return AnthropicTokenCountResponse(input_tokens=len(tokens)) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860, log_config=None)