"""
Dual-Compatible API Endpoint (OpenAI + Anthropic) v4.0
llama.cpp powered with production-grade optimizations:
- ProcessPoolExecutor for CPU-bound inference (prevents event loop blocking)
- Continuous batching with priority queue
- Prefix caching for system prompts
- TTFT (Time to First Token) optimization
- Detailed metrics and monitoring
- Multi-Model Hot-Swap
"""
import os
import time
import uuid
import logging
import re
import json
import asyncio
import hashlib
from datetime import datetime
from logging.handlers import RotatingFileHandler
from typing import List, Optional, Union, Dict, Any, Literal
from contextlib import asynccontextmanager
from threading import Lock
from collections import OrderedDict, deque
from dataclasses import dataclass, field
from concurrent.futures import ProcessPoolExecutor
from functools import lru_cache
import statistics
from fastapi import FastAPI, HTTPException, Header, Request, BackgroundTasks
from fastapi.responses import StreamingResponse, JSONResponse, HTMLResponse, FileResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, Field
from llama_cpp import Llama
# ============== Logging Configuration ==============
LOG_DIR = "/tmp/logs"
os.makedirs(LOG_DIR, exist_ok=True)
LOG_FILE = os.path.join(LOG_DIR, "api.log")
log_format = logging.Formatter(
'%(asctime)s | %(levelname)-8s | %(name)s | %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
file_handler = RotatingFileHandler(
LOG_FILE, maxBytes=10*1024*1024, backupCount=5, encoding='utf-8'
)
file_handler.setFormatter(log_format)
file_handler.setLevel(logging.DEBUG)
console_handler = logging.StreamHandler()
console_handler.setFormatter(log_format)
console_handler.setLevel(logging.INFO)
logging.basicConfig(level=logging.DEBUG, handlers=[file_handler, console_handler])
logger = logging.getLogger("llama-api")
for uvicorn_logger in ["uvicorn", "uvicorn.error", "uvicorn.access"]:
uv_log = logging.getLogger(uvicorn_logger)
uv_log.handlers = [file_handler, console_handler]
logger.info("=" * 60)
logger.info(f"llama.cpp API v4.0 Startup at {datetime.now().isoformat()}")
logger.info(f"Log file: {LOG_FILE}")
logger.info("=" * 60)
# ============== Performance Metrics Collector ==============
class MetricsCollector:
"""Collects and reports performance metrics"""
def __init__(self, window_size: int = 100):
self.window_size = window_size
self.lock = Lock()
# Latency tracking
self.ttft_times: deque = deque(maxlen=window_size) # Time to first token
self.total_times: deque = deque(maxlen=window_size) # Total response time
self.tokens_per_sec: deque = deque(maxlen=window_size)
# Request tracking
self.request_count = 0
self.error_count = 0
self.cache_hits = 0
self.cache_misses = 0
# Model-specific metrics
self.model_requests: Dict[str, int] = {}
self.startup_time = time.time()
def record_request(self, model: str, ttft: float, total_time: float, tokens: int):
with self.lock:
self.request_count += 1
self.ttft_times.append(ttft)
self.total_times.append(total_time)
if total_time > 0:
self.tokens_per_sec.append(tokens / total_time)
self.model_requests[model] = self.model_requests.get(model, 0) + 1
def record_error(self):
with self.lock:
self.error_count += 1
def record_cache_hit(self):
with self.lock:
self.cache_hits += 1
def record_cache_miss(self):
with self.lock:
self.cache_misses += 1
def get_stats(self) -> Dict:
with self.lock:
uptime = time.time() - self.startup_time
cache_total = self.cache_hits + self.cache_misses
return {
"uptime_seconds": round(uptime, 2),
"total_requests": self.request_count,
"error_count": self.error_count,
"error_rate": f"{(self.error_count / max(1, self.request_count) * 100):.2f}%",
"latency": {
"ttft_avg_ms": round(statistics.mean(self.ttft_times) * 1000, 2) if self.ttft_times else 0,
"ttft_p95_ms": round(sorted(self.ttft_times)[int(len(self.ttft_times) * 0.95)] * 1000, 2) if len(self.ttft_times) > 1 else 0,
"total_avg_ms": round(statistics.mean(self.total_times) * 1000, 2) if self.total_times else 0,
},
"throughput": {
"tokens_per_sec_avg": round(statistics.mean(self.tokens_per_sec), 2) if self.tokens_per_sec else 0,
"requests_per_min": round(self.request_count / max(1, uptime / 60), 2),
},
"cache": {
"hits": self.cache_hits,
"misses": self.cache_misses,
"hit_rate": f"{(self.cache_hits / max(1, cache_total) * 100):.1f}%"
},
"models": self.model_requests
}
metrics = MetricsCollector()
# ============== Configuration ==============
MODELS_DIR = "/app/models"
# Performance tuning - optimized for speed
N_CTX = int(os.environ.get("N_CTX", 4096)) # Reduced for faster processing
N_THREADS = int(os.environ.get("N_THREADS", 4)) # More threads for parallelism
N_BATCH = int(os.environ.get("N_BATCH", 512)) # Larger batch for faster prompt processing
N_GPU_LAYERS = int(os.environ.get("N_GPU_LAYERS", 0)) # GPU acceleration if available
USE_MLOCK = os.environ.get("USE_MLOCK", "true").lower() == "true" # Lock model in RAM
USE_MMAP = os.environ.get("USE_MMAP", "true").lower() == "true" # Memory-mapped loading
# Model configurations with speed ratings
MODEL_CONFIGS = {
"qwen2.5-coder-7b": {
"path": f"{MODELS_DIR}/qwen2.5-coder-7b-instruct-q4_k_m.gguf",
"url": "https://huggingface.co/Qwen/Qwen2.5-Coder-7B-Instruct-GGUF/resolve/main/qwen2.5-coder-7b-instruct-q4_k_m.gguf",
"size": "7B",
"quantization": "Q4_K_M",
"default": True,
"speed": "standard",
"description": "Best quality, tool use, complex reasoning"
},
"qwen2.5-coder-1.5b": {
"path": f"{MODELS_DIR}/qwen2.5-coder-1.5b-instruct-q4_k_m.gguf",
"url": "https://huggingface.co/Qwen/Qwen2.5-Coder-1.5B-Instruct-GGUF/resolve/main/qwen2.5-coder-1.5b-instruct-q4_k_m.gguf",
"size": "1.5B",
"quantization": "Q4_K_M",
"default": False,
"speed": "fast",
"description": "3x faster, good for simple tasks"
}
}
logger.info(f"Performance settings: ctx={N_CTX}, threads={N_THREADS}, batch={N_BATCH}, mlock={USE_MLOCK}")
# ============== Feature 1: Advanced Request Queue ==============
@dataclass
class QueuedRequest:
id: str
priority: int = 0 # Higher = more priority (shorter requests get higher priority)
estimated_tokens: int = 256 # Estimated output tokens for prioritization
created_at: float = field(default_factory=time.time)
future: Optional[asyncio.Future] = None
class RequestQueue:
"""
Advanced request queue with:
- Priority scheduling (shorter requests first)
- Backpressure handling
- Continuous batching support
"""
def __init__(self, max_concurrent: int = 1, max_queue_size: int = 100):
self.max_concurrent = max_concurrent
self.max_queue_size = max_queue_size
self.queue: List[QueuedRequest] = []
self.active_requests = 0
self.lock = asyncio.Lock()
self.stats = {
"total_requests": 0,
"completed_requests": 0,
"rejected_requests": 0,
"avg_wait_time": 0.0,
"max_wait_time": 0.0
}
def estimate_priority(self, max_tokens: int, message_length: int) -> int:
"""
Estimate priority based on expected response length.
Shorter requests get higher priority (reduces avg wait time).
"""
# Lower max_tokens = higher priority
if max_tokens <= 128:
return 100 # Very short - highest priority
elif max_tokens <= 256:
return 80
elif max_tokens <= 512:
return 60
elif max_tokens <= 1024:
return 40
else:
return 20 # Long requests - lower priority
async def acquire(self, request_id: str, max_tokens: int = 256, message_length: int = 0) -> int:
"""Add request to queue with smart prioritization. Returns queue position."""
async with self.lock:
if len(self.queue) >= self.max_queue_size:
self.stats["rejected_requests"] += 1
raise HTTPException(
status_code=503,
detail=f"Queue full ({self.max_queue_size} requests). Retry after {self.stats['avg_wait_time']:.1f}s",
headers={"Retry-After": str(int(self.stats['avg_wait_time']) + 1)}
)
self.stats["total_requests"] += 1
if self.active_requests < self.max_concurrent:
self.active_requests += 1
return 0 # Immediate processing
priority = self.estimate_priority(max_tokens, message_length)
req = QueuedRequest(id=request_id, priority=priority, estimated_tokens=max_tokens)
self.queue.append(req)
# Sort by priority (desc) then by arrival time (asc) - FCFS within same priority
self.queue.sort(key=lambda x: (-x.priority, x.created_at))
position = self.queue.index(req) + 1
logger.info(f"[{request_id}] Queued at position {position} (priority={priority})")
return position
async def wait_for_turn(self, request_id: str) -> float:
"""Wait until it's this request's turn. Returns wait time."""
start = time.time()
while True:
async with self.lock:
if self.queue and self.queue[0].id == request_id:
if self.active_requests < self.max_concurrent:
self.queue.pop(0)
self.active_requests += 1
wait_time = time.time() - start
# Update stats
self.stats["avg_wait_time"] = (
self.stats["avg_wait_time"] * 0.9 + wait_time * 0.1
)
self.stats["max_wait_time"] = max(self.stats["max_wait_time"], wait_time)
return wait_time
await asyncio.sleep(0.05) # Faster polling
async def release(self):
"""Release a slot when request completes."""
async with self.lock:
self.active_requests = max(0, self.active_requests - 1)
self.stats["completed_requests"] += 1
def get_status(self) -> Dict:
return {
"queue_length": len(self.queue),
"active_requests": self.active_requests,
"max_concurrent": self.max_concurrent,
"stats": self.stats
}
def get_position(self, request_id: str) -> Optional[int]:
for i, req in enumerate(self.queue):
if req.id == request_id:
return i + 1
return None
request_queue = RequestQueue(max_concurrent=1, max_queue_size=100)
# ============== Feature 2: Advanced Prompt Cache with Prefix Caching ==============
class PromptCache:
"""
Enhanced prompt cache with:
- Prefix caching for system prompts (reduces prompt processing time)
- Semantic similarity matching (future)
- TTL-based expiration
"""
def __init__(self, max_size: int = 50, ttl_seconds: int = 3600):
self.max_size = max_size
self.ttl_seconds = ttl_seconds
self.cache: OrderedDict[str, Dict] = OrderedDict()
self.prefix_cache: Dict[str, str] = {} # Formatted prompt prefixes
self.lock = Lock()
self.stats = {"hits": 0, "misses": 0, "prefix_hits": 0}
def _hash_prompt(self, system: str, tools: Optional[List] = None) -> str:
"""Generate hash for system prompt + tools combination."""
content = system or ""
if tools:
content += json.dumps(tools, sort_keys=True)
return hashlib.md5(content.encode()).hexdigest()[:16]
def get(self, system: str, tools: Optional[List] = None) -> Optional[Dict]:
"""Get cached prompt data with TTL check."""
with self.lock:
key = self._hash_prompt(system, tools)
if key in self.cache:
entry = self.cache[key]
# Check TTL
if time.time() - entry.get("created", 0) < self.ttl_seconds:
self.stats["hits"] += 1
self.cache.move_to_end(key)
metrics.record_cache_hit()
return entry
else:
# Expired, remove it
del self.cache[key]
self.stats["misses"] += 1
metrics.record_cache_miss()
return None
def get_prefix(self, system: str, tools: Optional[List] = None) -> Optional[str]:
"""Get cached formatted prompt prefix."""
with self.lock:
key = self._hash_prompt(system, tools)
if key in self.prefix_cache:
self.stats["prefix_hits"] += 1
return self.prefix_cache[key]
return None
def set_prefix(self, system: str, tools: Optional[List], formatted_prefix: str):
"""Cache the formatted prompt prefix."""
with self.lock:
key = self._hash_prompt(system, tools)
self.prefix_cache[key] = formatted_prefix
def set(self, system: str, tools: Optional[List], data: Dict):
"""Cache prompt data with timestamp."""
with self.lock:
key = self._hash_prompt(system, tools)
if len(self.cache) >= self.max_size:
oldest = next(iter(self.cache))
del self.cache[oldest]
data["created"] = time.time()
self.cache[key] = data
def get_stats(self) -> Dict:
total = self.stats["hits"] + self.stats["misses"]
hit_rate = (self.stats["hits"] / total * 100) if total > 0 else 0
return {
"size": len(self.cache),
"prefix_cache_size": len(self.prefix_cache),
"max_size": self.max_size,
"hits": self.stats["hits"],
"misses": self.stats["misses"],
"prefix_hits": self.stats["prefix_hits"],
"hit_rate": f"{hit_rate:.1f}%",
"ttl_seconds": self.ttl_seconds
}
prompt_cache = PromptCache(max_size=50, ttl_seconds=3600)
# ============== Feature 3: Multi-Model Manager ==============
class ModelManager:
def __init__(self):
self.models: Dict[str, Llama] = {}
self.current_model: Optional[str] = None
self.lock = Lock()
self.load_stats: Dict[str, Dict] = {}
def load_model(self, model_id: str) -> Llama:
"""Load a model (lazy loading with hot-swap)."""
with self.lock:
if model_id in self.models:
self.current_model = model_id
return self.models[model_id]
if model_id not in MODEL_CONFIGS:
raise HTTPException(status_code=400, detail=f"Unknown model: {model_id}")
config = MODEL_CONFIGS[model_id]
# Check if model file exists
if not os.path.exists(config["path"]):
raise HTTPException(
status_code=503,
detail=f"Model file not found: {model_id}. Available: {list(self.models.keys())}"
)
logger.info(f"Loading model: {model_id}")
start = time.time()
try:
llm = Llama(
model_path=config["path"],
n_ctx=N_CTX,
n_threads=N_THREADS,
n_batch=N_BATCH,
n_gpu_layers=N_GPU_LAYERS,
use_mlock=USE_MLOCK,
use_mmap=USE_MMAP,
verbose=False
)
load_time = time.time() - start
self.models[model_id] = llm
self.current_model = model_id
self.load_stats[model_id] = {
"loaded_at": datetime.now().isoformat(),
"load_time": f"{load_time:.2f}s"
}
logger.info(f"Model {model_id} loaded in {load_time:.2f}s")
return llm
except Exception as e:
logger.error(f"Failed to load model {model_id}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to load model: {e}")
def get_model(self, model_id: Optional[str] = None) -> Llama:
"""Get a model, loading if necessary."""
if model_id is None:
# Use default or current model
model_id = self.current_model or self._get_default_model()
# Normalize model name
model_id = self._normalize_model_id(model_id)
if model_id in self.models:
return self.models[model_id]
return self.load_model(model_id)
def _normalize_model_id(self, model_id: str) -> str:
"""Normalize model ID to match config keys."""
model_id = model_id.lower().strip()
# Handle common variations
if "7b" in model_id and "qwen" in model_id:
return "qwen2.5-coder-7b"
if "1.5b" in model_id and "qwen" in model_id:
return "qwen2.5-coder-1.5b"
# Check if exact match
if model_id in MODEL_CONFIGS:
return model_id
# Default to 7B
return "qwen2.5-coder-7b"
def _get_default_model(self) -> str:
for model_id, config in MODEL_CONFIGS.items():
if config.get("default"):
return model_id
return list(MODEL_CONFIGS.keys())[0]
def list_models(self) -> List[Dict]:
"""List all available models."""
models = []
for model_id, config in MODEL_CONFIGS.items():
models.append({
"id": model_id,
"size": config["size"],
"quantization": config["quantization"],
"loaded": model_id in self.models,
"available": os.path.exists(config["path"]),
"default": config.get("default", False)
})
return models
def get_stats(self) -> Dict:
return {
"current_model": self.current_model,
"loaded_models": list(self.models.keys()),
"load_stats": self.load_stats
}
def unload_model(self, model_id: str):
"""Unload a model to free memory."""
with self.lock:
if model_id in self.models:
del self.models[model_id]
if self.current_model == model_id:
self.current_model = None
logger.info(f"Model {model_id} unloaded")
model_manager = ModelManager()
# ============== App Initialization ==============
@asynccontextmanager
async def lifespan(app: FastAPI):
# Load default model on startup
default_model = None
for model_id, config in MODEL_CONFIGS.items():
if config.get("default") and os.path.exists(config["path"]):
default_model = model_id
break
if default_model:
try:
model_manager.load_model(default_model)
except Exception as e:
logger.error(f"Failed to load default model: {e}")
else:
logger.warning("No default model found, will load on first request")
yield
logger.info("Shutting down...")
app = FastAPI(
title="Dual-Compatible API (OpenAI + Anthropic) v3.0",
description="llama.cpp API with Queue, Caching, and Multi-Model support",
version="3.0.0",
lifespan=lifespan
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.middleware("http")
async def log_requests(request: Request, call_next):
request_id = str(uuid.uuid4())[:8]
start_time = time.time()
# Add request ID to headers for tracking
response = await call_next(request)
duration = (time.time() - start_time) * 1000
logger.info(f"[{request_id}] {request.method} {request.url.path} - {response.status_code} ({duration:.2f}ms)")
response.headers["X-Request-ID"] = request_id
response.headers["X-Processing-Time"] = f"{duration:.2f}ms"
return response
# ============================================================
# ANTHROPIC-COMPATIBLE MODELS
# ============================================================
class AnthropicTextBlock(BaseModel):
type: Literal["text"] = "text"
text: str
class AnthropicImageSource(BaseModel):
type: Literal["base64", "url"] = "base64"
media_type: Optional[str] = None
data: Optional[str] = None
url: Optional[str] = None
class AnthropicImageBlock(BaseModel):
type: Literal["image"] = "image"
source: AnthropicImageSource
class AnthropicToolUseBlock(BaseModel):
type: Literal["tool_use"] = "tool_use"
id: str
name: str
input: Dict[str, Any]
class AnthropicToolResultBlock(BaseModel):
type: Literal["tool_result"] = "tool_result"
tool_use_id: str
content: Optional[Union[str, List[AnthropicTextBlock]]] = None
is_error: Optional[bool] = False
AnthropicContentBlock = Union[AnthropicTextBlock, AnthropicImageBlock, AnthropicToolUseBlock, AnthropicToolResultBlock]
class AnthropicMessage(BaseModel):
role: Literal["user", "assistant"]
content: Union[str, List[AnthropicContentBlock]]
class AnthropicToolInputSchema(BaseModel):
type: Literal["object"] = "object"
properties: Optional[Dict[str, Any]] = None
required: Optional[List[str]] = None
class AnthropicTool(BaseModel):
name: str
description: Optional[str] = None
input_schema: AnthropicToolInputSchema
class AnthropicToolChoiceAuto(BaseModel):
type: Literal["auto"] = "auto"
disable_parallel_tool_use: Optional[bool] = None
class AnthropicToolChoiceAny(BaseModel):
type: Literal["any"] = "any"
disable_parallel_tool_use: Optional[bool] = None
class AnthropicToolChoiceTool(BaseModel):
type: Literal["tool"] = "tool"
name: str
disable_parallel_tool_use: Optional[bool] = None
AnthropicToolChoice = Union[AnthropicToolChoiceAuto, AnthropicToolChoiceAny, AnthropicToolChoiceTool]
class AnthropicMetadata(BaseModel):
user_id: Optional[str] = None
class AnthropicCacheControl(BaseModel):
type: Literal["ephemeral"] = "ephemeral"
class AnthropicSystemContent(BaseModel):
type: Literal["text"] = "text"
text: str
cache_control: Optional[AnthropicCacheControl] = None
class AnthropicThinkingConfig(BaseModel):
type: Literal["enabled", "disabled"] = "enabled"
budget_tokens: Optional[int] = Field(default=1024, ge=1, le=128000)
class AnthropicMessageRequest(BaseModel):
model: str
max_tokens: int
messages: List[AnthropicMessage]
metadata: Optional[AnthropicMetadata] = None
stop_sequences: Optional[List[str]] = None
stream: Optional[bool] = False
system: Optional[Union[str, List[AnthropicSystemContent]]] = None
temperature: Optional[float] = Field(default=0.7, ge=0.0, le=1.0)
tool_choice: Optional[AnthropicToolChoice] = None
tools: Optional[List[AnthropicTool]] = None
top_k: Optional[int] = Field(default=None, ge=0)
top_p: Optional[float] = Field(default=None, ge=0.0, le=1.0)
thinking: Optional[AnthropicThinkingConfig] = None
class AnthropicUsage(BaseModel):
input_tokens: int
output_tokens: int
cache_creation_input_tokens: Optional[int] = None
cache_read_input_tokens: Optional[int] = None
class AnthropicResponseTextBlock(BaseModel):
type: Literal["text"] = "text"
text: str
class AnthropicResponseThinkingBlock(BaseModel):
type: Literal["thinking"] = "thinking"
thinking: str
class AnthropicResponseToolUseBlock(BaseModel):
type: Literal["tool_use"] = "tool_use"
id: str
name: str
input: Dict[str, Any]
AnthropicResponseContentBlock = Union[AnthropicResponseTextBlock, AnthropicResponseThinkingBlock, AnthropicResponseToolUseBlock]
class AnthropicMessageResponse(BaseModel):
id: str
type: Literal["message"] = "message"
role: Literal["assistant"] = "assistant"
content: List[AnthropicResponseContentBlock]
model: str
stop_reason: Optional[Literal["end_turn", "max_tokens", "stop_sequence", "tool_use"]] = None
stop_sequence: Optional[str] = None
usage: AnthropicUsage
class AnthropicTokenCountRequest(BaseModel):
model: str
messages: List[AnthropicMessage]
system: Optional[Union[str, List[AnthropicSystemContent]]] = None
tools: Optional[List[AnthropicTool]] = None
thinking: Optional[AnthropicThinkingConfig] = None
class AnthropicTokenCountResponse(BaseModel):
input_tokens: int
# ============================================================
# OPENAI-COMPATIBLE MODELS
# ============================================================
class OpenAIMessage(BaseModel):
role: Literal["system", "user", "assistant", "tool"]
content: Optional[Union[str, List[Dict[str, Any]]]] = None
name: Optional[str] = None
tool_calls: Optional[List[Dict[str, Any]]] = None
tool_call_id: Optional[str] = None
class OpenAITool(BaseModel):
type: Literal["function"] = "function"
function: Dict[str, Any]
class OpenAIToolChoice(BaseModel):
type: str
function: Optional[Dict[str, str]] = None
class OpenAIChatRequest(BaseModel):
model: str
messages: List[OpenAIMessage]
max_tokens: Optional[int] = 1024
temperature: Optional[float] = Field(default=0.7, ge=0.0, le=2.0)
top_p: Optional[float] = Field(default=0.95, ge=0.0, le=1.0)
n: Optional[int] = 1
stream: Optional[bool] = False
stop: Optional[Union[str, List[str]]] = None
presence_penalty: Optional[float] = 0.0
frequency_penalty: Optional[float] = 0.0
logit_bias: Optional[Dict[str, float]] = None
user: Optional[str] = None
tools: Optional[List[OpenAITool]] = None
tool_choice: Optional[Union[str, OpenAIToolChoice]] = None
seed: Optional[int] = None
class OpenAIUsage(BaseModel):
prompt_tokens: int
completion_tokens: int
total_tokens: int
class OpenAIChoice(BaseModel):
index: int
message: Dict[str, Any]
finish_reason: Optional[str] = None
class OpenAIChatResponse(BaseModel):
id: str
object: Literal["chat.completion"] = "chat.completion"
created: int
model: str
choices: List[OpenAIChoice]
usage: OpenAIUsage
system_fingerprint: Optional[str] = None
class OpenAIModel(BaseModel):
id: str
object: Literal["model"] = "model"
created: int
owned_by: str
class OpenAIModelList(BaseModel):
object: Literal["list"] = "list"
data: List[OpenAIModel]
# ============== Helper Functions ==============
def extract_anthropic_text(content: Union[str, List[AnthropicContentBlock]]) -> str:
if isinstance(content, str):
return content
texts = []
for block in content:
if isinstance(block, dict):
if block.get("type") == "text":
texts.append(block.get("text", ""))
elif hasattr(block, "type") and block.type == "text":
texts.append(block.text)
return " ".join(texts)
def extract_anthropic_system(system: Optional[Union[str, List[AnthropicSystemContent]]]) -> Optional[str]:
if system is None:
return None
if isinstance(system, str):
return system
texts = []
for block in system:
if isinstance(block, dict):
texts.append(block.get("text", ""))
elif hasattr(block, "text"):
texts.append(block.text)
return " ".join(texts)
def check_cache_control(system: Optional[Union[str, List[AnthropicSystemContent]]]) -> bool:
"""Check if cache_control is set to ephemeral."""
if system is None or isinstance(system, str):
return False
for block in system:
if isinstance(block, dict) and block.get("cache_control", {}).get("type") == "ephemeral":
return True
elif hasattr(block, "cache_control") and block.cache_control and block.cache_control.type == "ephemeral":
return True
return False
def extract_openai_content(content: Optional[Union[str, List[Dict[str, Any]]]]) -> str:
if content is None:
return ""
if isinstance(content, str):
return content
texts = []
for item in content:
if isinstance(item, dict) and item.get("type") == "text":
texts.append(item.get("text", ""))
return " ".join(texts)
def format_chat_prompt(messages: List[Dict[str, str]], system: Optional[str] = None) -> str:
"""Format messages for Qwen2.5 chat template"""
prompt = ""
if system:
prompt += f"<|im_start|>system\n{system}<|im_end|>\n"
for msg in messages:
role = msg["role"]
content = msg["content"]
prompt += f"<|im_start|>{role}\n{content}<|im_end|>\n"
prompt += "<|im_start|>assistant\n"
return prompt
def format_anthropic_messages(
messages: List[AnthropicMessage],
system: Optional[Union[str, List[AnthropicSystemContent]]] = None,
tools: Optional[List[AnthropicTool]] = None,
thinking_enabled: bool = False,
budget_tokens: int = 1024
) -> str:
formatted_messages = []
system_text = extract_anthropic_system(system) or ""
# Add tool definitions to system prompt if provided
if tools:
tool_defs = []
for tool in tools:
tool_def = {
"name": tool.name,
"description": tool.description,
"parameters": tool.input_schema.model_dump()
}
tool_defs.append(tool_def)
tool_instruction = f"""You have access to the following tools:
{json.dumps(tool_defs, indent=2)}
To use a tool, respond with a JSON object in this exact format:
{{"tool": "tool_name", "arguments": {{"arg1": "value1"}}}}
Only use tools when necessary. If you don't need a tool, respond normally."""
system_text = f"{tool_instruction}\n\n{system_text}" if system_text else tool_instruction
if thinking_enabled:
thinking_instruction = f"""When solving complex problems:
1. Think through the problem step by step inside ... tags
2. After thinking, provide your final answer outside the thinking tags
Budget for thinking: up to {budget_tokens} tokens."""
system_text = f"{thinking_instruction}\n\n{system_text}" if system_text else thinking_instruction
for msg in messages:
content = extract_anthropic_text(msg.content)
formatted_messages.append({"role": msg.role, "content": content})
return format_chat_prompt(formatted_messages, system_text if system_text else None)
def format_openai_messages(messages: List[OpenAIMessage]) -> str:
system_text = None
formatted_messages = []
for msg in messages:
if msg.role == "system":
system_text = extract_openai_content(msg.content)
else:
content = extract_openai_content(msg.content)
formatted_messages.append({"role": msg.role, "content": content})
return format_chat_prompt(formatted_messages, system_text)
def parse_thinking_response(text: str) -> tuple:
thinking_pattern = r'(.*?)'
thinking_matches = re.findall(thinking_pattern, text, re.DOTALL)
if thinking_matches:
thinking_text = "\n".join(thinking_matches).strip()
answer_text = re.sub(thinking_pattern, '', text, flags=re.DOTALL).strip()
return thinking_text, answer_text
return None, text.strip()
def parse_tool_use(text: str) -> Optional[Dict[str, Any]]:
"""Parse tool use from model response"""
try:
text_stripped = text.strip()
if text_stripped.startswith("{") and text_stripped.endswith("}"):
parsed = json.loads(text_stripped)
if "tool" in parsed:
return parsed
brace_count = 0
start_idx = None
for i, char in enumerate(text):
if char == '{':
if brace_count == 0:
start_idx = i
brace_count += 1
elif char == '}':
brace_count -= 1
if brace_count == 0 and start_idx is not None:
json_str = text[start_idx:i+1]
try:
parsed = json.loads(json_str)
if "tool" in parsed:
return parsed
except:
pass
start_idx = None
except:
pass
return None
def generate_id(prefix: str = "msg") -> str:
return f"{prefix}_{uuid.uuid4().hex[:24]}"
# ============== STATIC FILES ==============
STATIC_DIR = os.path.join(os.path.dirname(__file__), "static")
if os.path.exists(STATIC_DIR):
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")
logger.info(f"Static files mounted from {STATIC_DIR}")
# ============== ROOT ENDPOINTS ==============
@app.get("/", response_class=HTMLResponse)
async def root():
"""Serve the dashboard or API status"""
static_file = os.path.join(STATIC_DIR, "index.html")
if os.path.exists(static_file):
return FileResponse(static_file, media_type="text/html")
# Fallback to JSON if no static file
return JSONResponse({
"status": "healthy",
"version": "4.0.0",
"backend": "llama.cpp + OpenBLAS",
"features": [
"priority-queue",
"prefix-caching",
"ttl-cache",
"multi-model",
"extended-thinking",
"streaming",
"tool-use",
"dual-compatibility",
"metrics"
],
"endpoints": {
"openai": "/v1/chat/completions",
"anthropic": "/anthropic/v1/messages",
"metrics": "/metrics"
},
"models": model_manager.list_models(),
"queue": request_queue.get_status(),
"cache": prompt_cache.get_stats(),
"performance": metrics.get_stats()
})
@app.get("/api/status")
async def api_status():
"""API status as JSON (for dashboard AJAX calls)"""
return {
"status": "healthy",
"version": "4.0.0",
"backend": "llama.cpp",
"features": [
"priority-queue",
"prefix-caching",
"ttl-cache",
"multi-model",
"extended-thinking",
"streaming",
"tool-use",
"dual-compatibility",
"metrics"
],
"endpoints": {
"openai": "/v1/chat/completions",
"anthropic": "/anthropic/v1/messages",
"metrics": "/metrics"
},
"models": model_manager.list_models(),
"queue": request_queue.get_status(),
"cache": prompt_cache.get_stats()
}
@app.get("/metrics")
async def get_metrics():
"""Detailed performance metrics for monitoring"""
return {
"api": metrics.get_stats(),
"queue": request_queue.get_status(),
"cache": prompt_cache.get_stats(),
"models": model_manager.get_stats()
}
@app.get("/logs")
async def get_logs(lines: int = 100):
try:
with open(LOG_FILE, 'r') as f:
all_lines = f.readlines()
recent_lines = all_lines[-lines:] if len(all_lines) > lines else all_lines
return {"log_file": LOG_FILE, "total_lines": len(all_lines), "logs": "".join(recent_lines)}
except FileNotFoundError:
return {"error": "Log file not found"}
@app.get("/health")
async def health():
return {
"status": "ok",
"models": model_manager.get_stats(),
"queue": request_queue.get_status(),
"cache": prompt_cache.get_stats()
}
@app.get("/queue/status")
async def queue_status():
return request_queue.get_status()
@app.get("/models/status")
async def models_status():
return {
"models": model_manager.list_models(),
"stats": model_manager.get_stats()
}
@app.post("/models/{model_id}/load")
async def load_model(model_id: str):
"""Manually load a model."""
model_manager.load_model(model_id)
return {"status": "loaded", "model": model_id}
@app.post("/models/{model_id}/unload")
async def unload_model(model_id: str):
"""Unload a model to free memory."""
model_manager.unload_model(model_id)
return {"status": "unloaded", "model": model_id}
# ============================================================
# OPENAI-COMPATIBLE ENDPOINTS (/v1)
# ============================================================
@app.get("/v1/models")
async def openai_list_models():
models = []
for model_id, config in MODEL_CONFIGS.items():
models.append(OpenAIModel(
id=model_id,
created=int(time.time()),
owned_by="qwen"
))
return OpenAIModelList(data=models)
@app.post("/v1/chat/completions")
async def openai_chat_completions(
request: OpenAIChatRequest,
authorization: Optional[str] = Header(None)
):
chat_id = generate_id("chatcmpl")
# Queue management
position = await request_queue.acquire(chat_id)
if position > 0:
await request_queue.wait_for_turn(chat_id)
try:
llm = model_manager.get_model(request.model)
prompt = format_openai_messages(request.messages)
if request.stream:
return await openai_stream_response(request, prompt, chat_id, llm)
stop_tokens = ["<|im_end|>", "<|endoftext|>"]
if request.stop:
if isinstance(request.stop, str):
stop_tokens.append(request.stop)
else:
stop_tokens.extend(request.stop)
gen_start = time.time()
output = llm(
prompt,
max_tokens=request.max_tokens or 1024,
temperature=request.temperature or 0.7,
top_p=request.top_p or 0.95,
stop=stop_tokens,
echo=False
)
gen_time = time.time() - gen_start
generated_text = output["choices"][0]["text"].strip()
usage = output["usage"]
logger.info(f"[{chat_id}] Generated in {gen_time:.2f}s - tokens: {usage['completion_tokens']}")
return OpenAIChatResponse(
id=chat_id,
created=int(time.time()),
model=request.model,
choices=[OpenAIChoice(
index=0,
message={"role": "assistant", "content": generated_text},
finish_reason="stop"
)],
usage=OpenAIUsage(
prompt_tokens=usage["prompt_tokens"],
completion_tokens=usage["completion_tokens"],
total_tokens=usage["total_tokens"]
)
)
except Exception as e:
logger.error(f"[{chat_id}] Error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
finally:
await request_queue.release()
async def openai_stream_response(request: OpenAIChatRequest, prompt: str, chat_id: str, llm: Llama):
async def generate():
try:
created = int(time.time())
initial_chunk = {
"id": chat_id,
"object": "chat.completion.chunk",
"created": created,
"model": request.model,
"choices": [{"index": 0, "delta": {"role": "assistant", "content": ""}, "finish_reason": None}]
}
yield f"data: {json.dumps(initial_chunk)}\n\n"
stop_tokens = ["<|im_end|>", "<|endoftext|>"]
if request.stop:
if isinstance(request.stop, str):
stop_tokens.append(request.stop)
else:
stop_tokens.extend(request.stop)
for output in llm(
prompt,
max_tokens=request.max_tokens or 1024,
temperature=request.temperature or 0.7,
top_p=request.top_p or 0.95,
stop=stop_tokens,
stream=True,
echo=False
):
text = output["choices"][0]["text"]
if text:
chunk = {
"id": chat_id,
"object": "chat.completion.chunk",
"created": created,
"model": request.model,
"choices": [{"index": 0, "delta": {"content": text}, "finish_reason": None}]
}
yield f"data: {json.dumps(chunk)}\n\n"
final_chunk = {
"id": chat_id,
"object": "chat.completion.chunk",
"created": created,
"model": request.model,
"choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}]
}
yield f"data: {json.dumps(final_chunk)}\n\n"
yield "data: [DONE]\n\n"
finally:
await request_queue.release()
return StreamingResponse(generate(), media_type="text/event-stream", headers={"Cache-Control": "no-cache"})
# ============================================================
# ANTHROPIC-COMPATIBLE ENDPOINTS (/anthropic)
# ============================================================
@app.get("/anthropic/v1/models")
async def anthropic_list_models():
models = []
for model_id, config in MODEL_CONFIGS.items():
models.append({
"id": model_id,
"object": "model",
"created": int(time.time()),
"owned_by": "qwen",
"display_name": f"Qwen2.5 Coder {config['size']} ({config['quantization']})",
"supports_thinking": True,
"supports_tools": True,
"loaded": model_id in model_manager.models,
"available": os.path.exists(config["path"])
})
return {"object": "list", "data": models}
@app.post("/anthropic/v1/messages", response_model=AnthropicMessageResponse)
async def anthropic_create_message(
request: AnthropicMessageRequest,
x_api_key: Optional[str] = Header(None, alias="x-api-key"),
anthropic_version: Optional[str] = Header(None, alias="anthropic-version"),
anthropic_beta: Optional[str] = Header(None, alias="anthropic-beta")
):
message_id = generate_id("msg")
request_start = time.time()
ttft = 0 # Time to first token
# Estimate message length for priority queue
msg_length = sum(len(str(m.content)) for m in request.messages)
# Queue management with priority based on expected response length
position = await request_queue.acquire(message_id, max_tokens=request.max_tokens, message_length=msg_length)
if position > 0:
await request_queue.wait_for_turn(message_id)
thinking_enabled = False
budget_tokens = 1024
if request.thinking:
thinking_enabled = request.thinking.type == "enabled"
budget_tokens = request.thinking.budget_tokens or 1024
# Check for cache control
use_cache = check_cache_control(request.system)
cache_hit = False
cache_tokens = 0
try:
llm = model_manager.get_model(request.model)
# Check prompt cache
system_text = extract_anthropic_system(request.system)
tools_list = [t.model_dump() for t in request.tools] if request.tools else None
if use_cache:
cached = prompt_cache.get(system_text or "", tools_list)
if cached:
cache_hit = True
cache_tokens = cached.get("tokens", 0)
logger.info(f"[{message_id}] Prompt cache hit, saved ~{cache_tokens} tokens")
prompt = format_anthropic_messages(
request.messages,
request.system,
request.tools,
thinking_enabled,
budget_tokens
)
# Cache the prompt prefix if cache_control is set
if use_cache and not cache_hit:
prompt_cache.set(system_text or "", tools_list, {
"tokens": len(llm.tokenize(prompt.encode())) // 2, # Estimate prefix tokens
"created": time.time()
})
if request.stream:
return await anthropic_stream_response(request, prompt, message_id, thinking_enabled, llm)
total_max_tokens = request.max_tokens + (budget_tokens if thinking_enabled else 0)
stop_tokens = ["<|im_end|>", "<|endoftext|>"]
if request.stop_sequences:
stop_tokens.extend(request.stop_sequences)
gen_start = time.time()
output = llm(
prompt,
max_tokens=total_max_tokens,
temperature=request.temperature or 0.7,
top_p=request.top_p or 0.95,
top_k=request.top_k or 40,
stop=stop_tokens,
echo=False
)
gen_time = time.time() - gen_start
generated_text = output["choices"][0]["text"].strip()
usage = output["usage"]
# Parse response for tool use, thinking, etc.
content_blocks = []
stop_reason = "end_turn"
# Check for tool use
tool_call = parse_tool_use(generated_text)
if tool_call and request.tools:
tool_id = f"toolu_{uuid.uuid4().hex[:24]}"
content_blocks.append(AnthropicResponseToolUseBlock(
type="tool_use",
id=tool_id,
name=tool_call["tool"],
input=tool_call.get("arguments", {})
))
stop_reason = "tool_use"
elif thinking_enabled:
thinking_text, answer_text = parse_thinking_response(generated_text)
if thinking_text:
content_blocks.append(AnthropicResponseThinkingBlock(type="thinking", thinking=thinking_text))
content_blocks.append(AnthropicResponseTextBlock(type="text", text=answer_text))
else:
content_blocks.append(AnthropicResponseTextBlock(type="text", text=generated_text))
if usage["completion_tokens"] >= total_max_tokens:
stop_reason = "max_tokens"
total_time = time.time() - request_start
ttft = gen_time # For non-streaming, TTFT ~ generation time
# Record metrics
metrics.record_request(
model=request.model,
ttft=ttft,
total_time=total_time,
tokens=usage["completion_tokens"]
)
logger.info(f"[{message_id}] Generated in {gen_time:.2f}s - tokens: {usage['completion_tokens']}, cache_hit: {cache_hit}, total: {total_time:.2f}s")
return AnthropicMessageResponse(
id=message_id,
content=content_blocks,
model=request.model,
stop_reason=stop_reason,
usage=AnthropicUsage(
input_tokens=usage["prompt_tokens"],
output_tokens=usage["completion_tokens"],
cache_creation_input_tokens=cache_tokens if use_cache and not cache_hit else None,
cache_read_input_tokens=cache_tokens if cache_hit else None
)
)
except Exception as e:
logger.error(f"[{message_id}] Error: {e}", exc_info=True)
metrics.record_error()
raise HTTPException(status_code=500, detail=str(e))
finally:
await request_queue.release()
async def anthropic_stream_response(request: AnthropicMessageRequest, prompt: str, message_id: str, thinking_enabled: bool, llm: Llama):
async def generate():
try:
start_event = {
"type": "message_start",
"message": {
"id": message_id, "type": "message", "role": "assistant", "content": [],
"model": request.model, "stop_reason": None, "stop_sequence": None,
"usage": {"input_tokens": 0, "output_tokens": 0}
}
}
yield f"event: message_start\ndata: {json.dumps(start_event)}\n\n"
yield f"event: content_block_start\ndata: {json.dumps({'type': 'content_block_start', 'index': 0, 'content_block': {'type': 'text', 'text': ''}})}\n\n"
stop_tokens = ["<|im_end|>", "<|endoftext|>"]
if request.stop_sequences:
stop_tokens.extend(request.stop_sequences)
total_tokens = 0
for output in llm(
prompt,
max_tokens=request.max_tokens,
temperature=request.temperature or 0.7,
top_p=request.top_p or 0.95,
stop=stop_tokens,
stream=True,
echo=False
):
text = output["choices"][0]["text"]
if text:
total_tokens += 1
yield f"event: content_block_delta\ndata: {json.dumps({'type': 'content_block_delta', 'index': 0, 'delta': {'type': 'text_delta', 'text': text}})}\n\n"
yield f"event: content_block_stop\ndata: {json.dumps({'type': 'content_block_stop', 'index': 0})}\n\n"
yield f"event: message_delta\ndata: {json.dumps({'type': 'message_delta', 'delta': {'stop_reason': 'end_turn'}, 'usage': {'output_tokens': total_tokens}})}\n\n"
yield f"event: message_stop\ndata: {json.dumps({'type': 'message_stop'})}\n\n"
finally:
await request_queue.release()
return StreamingResponse(generate(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})
@app.post("/anthropic/v1/messages/count_tokens", response_model=AnthropicTokenCountResponse)
async def anthropic_count_tokens(request: AnthropicTokenCountRequest):
llm = model_manager.get_model(request.model)
prompt = format_anthropic_messages(request.messages, request.system)
tokens = llm.tokenize(prompt.encode())
return AnthropicTokenCountResponse(input_tokens=len(tokens))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860, log_config=None)