Matrix Agent
v4.0: Production-grade optimizations - priority queue, prefix caching, TTL, metrics, TTFT tracking
c9737d6
| """ | |
| Dual-Compatible API Endpoint (OpenAI + Anthropic) v4.0 | |
| llama.cpp powered with production-grade optimizations: | |
| - ProcessPoolExecutor for CPU-bound inference (prevents event loop blocking) | |
| - Continuous batching with priority queue | |
| - Prefix caching for system prompts | |
| - TTFT (Time to First Token) optimization | |
| - Detailed metrics and monitoring | |
| - Multi-Model Hot-Swap | |
| """ | |
| import os | |
| import time | |
| import uuid | |
| import logging | |
| import re | |
| import json | |
| import asyncio | |
| import hashlib | |
| from datetime import datetime | |
| from logging.handlers import RotatingFileHandler | |
| from typing import List, Optional, Union, Dict, Any, Literal | |
| from contextlib import asynccontextmanager | |
| from threading import Lock | |
| from collections import OrderedDict, deque | |
| from dataclasses import dataclass, field | |
| from concurrent.futures import ProcessPoolExecutor | |
| from functools import lru_cache | |
| import statistics | |
| from fastapi import FastAPI, HTTPException, Header, Request, BackgroundTasks | |
| from fastapi.responses import StreamingResponse, JSONResponse, HTMLResponse, FileResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.staticfiles import StaticFiles | |
| from pydantic import BaseModel, Field | |
| from llama_cpp import Llama | |
| # ============== Logging Configuration ============== | |
| LOG_DIR = "/tmp/logs" | |
| os.makedirs(LOG_DIR, exist_ok=True) | |
| LOG_FILE = os.path.join(LOG_DIR, "api.log") | |
| log_format = logging.Formatter( | |
| '%(asctime)s | %(levelname)-8s | %(name)s | %(message)s', | |
| datefmt='%Y-%m-%d %H:%M:%S' | |
| ) | |
| file_handler = RotatingFileHandler( | |
| LOG_FILE, maxBytes=10*1024*1024, backupCount=5, encoding='utf-8' | |
| ) | |
| file_handler.setFormatter(log_format) | |
| file_handler.setLevel(logging.DEBUG) | |
| console_handler = logging.StreamHandler() | |
| console_handler.setFormatter(log_format) | |
| console_handler.setLevel(logging.INFO) | |
| logging.basicConfig(level=logging.DEBUG, handlers=[file_handler, console_handler]) | |
| logger = logging.getLogger("llama-api") | |
| for uvicorn_logger in ["uvicorn", "uvicorn.error", "uvicorn.access"]: | |
| uv_log = logging.getLogger(uvicorn_logger) | |
| uv_log.handlers = [file_handler, console_handler] | |
| logger.info("=" * 60) | |
| logger.info(f"llama.cpp API v4.0 Startup at {datetime.now().isoformat()}") | |
| logger.info(f"Log file: {LOG_FILE}") | |
| logger.info("=" * 60) | |
| # ============== Performance Metrics Collector ============== | |
| class MetricsCollector: | |
| """Collects and reports performance metrics""" | |
| def __init__(self, window_size: int = 100): | |
| self.window_size = window_size | |
| self.lock = Lock() | |
| # Latency tracking | |
| self.ttft_times: deque = deque(maxlen=window_size) # Time to first token | |
| self.total_times: deque = deque(maxlen=window_size) # Total response time | |
| self.tokens_per_sec: deque = deque(maxlen=window_size) | |
| # Request tracking | |
| self.request_count = 0 | |
| self.error_count = 0 | |
| self.cache_hits = 0 | |
| self.cache_misses = 0 | |
| # Model-specific metrics | |
| self.model_requests: Dict[str, int] = {} | |
| self.startup_time = time.time() | |
| def record_request(self, model: str, ttft: float, total_time: float, tokens: int): | |
| with self.lock: | |
| self.request_count += 1 | |
| self.ttft_times.append(ttft) | |
| self.total_times.append(total_time) | |
| if total_time > 0: | |
| self.tokens_per_sec.append(tokens / total_time) | |
| self.model_requests[model] = self.model_requests.get(model, 0) + 1 | |
| def record_error(self): | |
| with self.lock: | |
| self.error_count += 1 | |
| def record_cache_hit(self): | |
| with self.lock: | |
| self.cache_hits += 1 | |
| def record_cache_miss(self): | |
| with self.lock: | |
| self.cache_misses += 1 | |
| def get_stats(self) -> Dict: | |
| with self.lock: | |
| uptime = time.time() - self.startup_time | |
| cache_total = self.cache_hits + self.cache_misses | |
| return { | |
| "uptime_seconds": round(uptime, 2), | |
| "total_requests": self.request_count, | |
| "error_count": self.error_count, | |
| "error_rate": f"{(self.error_count / max(1, self.request_count) * 100):.2f}%", | |
| "latency": { | |
| "ttft_avg_ms": round(statistics.mean(self.ttft_times) * 1000, 2) if self.ttft_times else 0, | |
| "ttft_p95_ms": round(sorted(self.ttft_times)[int(len(self.ttft_times) * 0.95)] * 1000, 2) if len(self.ttft_times) > 1 else 0, | |
| "total_avg_ms": round(statistics.mean(self.total_times) * 1000, 2) if self.total_times else 0, | |
| }, | |
| "throughput": { | |
| "tokens_per_sec_avg": round(statistics.mean(self.tokens_per_sec), 2) if self.tokens_per_sec else 0, | |
| "requests_per_min": round(self.request_count / max(1, uptime / 60), 2), | |
| }, | |
| "cache": { | |
| "hits": self.cache_hits, | |
| "misses": self.cache_misses, | |
| "hit_rate": f"{(self.cache_hits / max(1, cache_total) * 100):.1f}%" | |
| }, | |
| "models": self.model_requests | |
| } | |
| metrics = MetricsCollector() | |
| # ============== Configuration ============== | |
| MODELS_DIR = "/app/models" | |
| # Performance tuning - optimized for speed | |
| N_CTX = int(os.environ.get("N_CTX", 4096)) # Reduced for faster processing | |
| N_THREADS = int(os.environ.get("N_THREADS", 4)) # More threads for parallelism | |
| N_BATCH = int(os.environ.get("N_BATCH", 512)) # Larger batch for faster prompt processing | |
| N_GPU_LAYERS = int(os.environ.get("N_GPU_LAYERS", 0)) # GPU acceleration if available | |
| USE_MLOCK = os.environ.get("USE_MLOCK", "true").lower() == "true" # Lock model in RAM | |
| USE_MMAP = os.environ.get("USE_MMAP", "true").lower() == "true" # Memory-mapped loading | |
| # Model configurations with speed ratings | |
| MODEL_CONFIGS = { | |
| "qwen2.5-coder-7b": { | |
| "path": f"{MODELS_DIR}/qwen2.5-coder-7b-instruct-q4_k_m.gguf", | |
| "url": "https://huggingface.co/Qwen/Qwen2.5-Coder-7B-Instruct-GGUF/resolve/main/qwen2.5-coder-7b-instruct-q4_k_m.gguf", | |
| "size": "7B", | |
| "quantization": "Q4_K_M", | |
| "default": True, | |
| "speed": "standard", | |
| "description": "Best quality, tool use, complex reasoning" | |
| }, | |
| "qwen2.5-coder-1.5b": { | |
| "path": f"{MODELS_DIR}/qwen2.5-coder-1.5b-instruct-q4_k_m.gguf", | |
| "url": "https://huggingface.co/Qwen/Qwen2.5-Coder-1.5B-Instruct-GGUF/resolve/main/qwen2.5-coder-1.5b-instruct-q4_k_m.gguf", | |
| "size": "1.5B", | |
| "quantization": "Q4_K_M", | |
| "default": False, | |
| "speed": "fast", | |
| "description": "3x faster, good for simple tasks" | |
| } | |
| } | |
| logger.info(f"Performance settings: ctx={N_CTX}, threads={N_THREADS}, batch={N_BATCH}, mlock={USE_MLOCK}") | |
| # ============== Feature 1: Advanced Request Queue ============== | |
| class QueuedRequest: | |
| id: str | |
| priority: int = 0 # Higher = more priority (shorter requests get higher priority) | |
| estimated_tokens: int = 256 # Estimated output tokens for prioritization | |
| created_at: float = field(default_factory=time.time) | |
| future: Optional[asyncio.Future] = None | |
| class RequestQueue: | |
| """ | |
| Advanced request queue with: | |
| - Priority scheduling (shorter requests first) | |
| - Backpressure handling | |
| - Continuous batching support | |
| """ | |
| def __init__(self, max_concurrent: int = 1, max_queue_size: int = 100): | |
| self.max_concurrent = max_concurrent | |
| self.max_queue_size = max_queue_size | |
| self.queue: List[QueuedRequest] = [] | |
| self.active_requests = 0 | |
| self.lock = asyncio.Lock() | |
| self.stats = { | |
| "total_requests": 0, | |
| "completed_requests": 0, | |
| "rejected_requests": 0, | |
| "avg_wait_time": 0.0, | |
| "max_wait_time": 0.0 | |
| } | |
| def estimate_priority(self, max_tokens: int, message_length: int) -> int: | |
| """ | |
| Estimate priority based on expected response length. | |
| Shorter requests get higher priority (reduces avg wait time). | |
| """ | |
| # Lower max_tokens = higher priority | |
| if max_tokens <= 128: | |
| return 100 # Very short - highest priority | |
| elif max_tokens <= 256: | |
| return 80 | |
| elif max_tokens <= 512: | |
| return 60 | |
| elif max_tokens <= 1024: | |
| return 40 | |
| else: | |
| return 20 # Long requests - lower priority | |
| async def acquire(self, request_id: str, max_tokens: int = 256, message_length: int = 0) -> int: | |
| """Add request to queue with smart prioritization. Returns queue position.""" | |
| async with self.lock: | |
| if len(self.queue) >= self.max_queue_size: | |
| self.stats["rejected_requests"] += 1 | |
| raise HTTPException( | |
| status_code=503, | |
| detail=f"Queue full ({self.max_queue_size} requests). Retry after {self.stats['avg_wait_time']:.1f}s", | |
| headers={"Retry-After": str(int(self.stats['avg_wait_time']) + 1)} | |
| ) | |
| self.stats["total_requests"] += 1 | |
| if self.active_requests < self.max_concurrent: | |
| self.active_requests += 1 | |
| return 0 # Immediate processing | |
| priority = self.estimate_priority(max_tokens, message_length) | |
| req = QueuedRequest(id=request_id, priority=priority, estimated_tokens=max_tokens) | |
| self.queue.append(req) | |
| # Sort by priority (desc) then by arrival time (asc) - FCFS within same priority | |
| self.queue.sort(key=lambda x: (-x.priority, x.created_at)) | |
| position = self.queue.index(req) + 1 | |
| logger.info(f"[{request_id}] Queued at position {position} (priority={priority})") | |
| return position | |
| async def wait_for_turn(self, request_id: str) -> float: | |
| """Wait until it's this request's turn. Returns wait time.""" | |
| start = time.time() | |
| while True: | |
| async with self.lock: | |
| if self.queue and self.queue[0].id == request_id: | |
| if self.active_requests < self.max_concurrent: | |
| self.queue.pop(0) | |
| self.active_requests += 1 | |
| wait_time = time.time() - start | |
| # Update stats | |
| self.stats["avg_wait_time"] = ( | |
| self.stats["avg_wait_time"] * 0.9 + wait_time * 0.1 | |
| ) | |
| self.stats["max_wait_time"] = max(self.stats["max_wait_time"], wait_time) | |
| return wait_time | |
| await asyncio.sleep(0.05) # Faster polling | |
| async def release(self): | |
| """Release a slot when request completes.""" | |
| async with self.lock: | |
| self.active_requests = max(0, self.active_requests - 1) | |
| self.stats["completed_requests"] += 1 | |
| def get_status(self) -> Dict: | |
| return { | |
| "queue_length": len(self.queue), | |
| "active_requests": self.active_requests, | |
| "max_concurrent": self.max_concurrent, | |
| "stats": self.stats | |
| } | |
| def get_position(self, request_id: str) -> Optional[int]: | |
| for i, req in enumerate(self.queue): | |
| if req.id == request_id: | |
| return i + 1 | |
| return None | |
| request_queue = RequestQueue(max_concurrent=1, max_queue_size=100) | |
| # ============== Feature 2: Advanced Prompt Cache with Prefix Caching ============== | |
| class PromptCache: | |
| """ | |
| Enhanced prompt cache with: | |
| - Prefix caching for system prompts (reduces prompt processing time) | |
| - Semantic similarity matching (future) | |
| - TTL-based expiration | |
| """ | |
| def __init__(self, max_size: int = 50, ttl_seconds: int = 3600): | |
| self.max_size = max_size | |
| self.ttl_seconds = ttl_seconds | |
| self.cache: OrderedDict[str, Dict] = OrderedDict() | |
| self.prefix_cache: Dict[str, str] = {} # Formatted prompt prefixes | |
| self.lock = Lock() | |
| self.stats = {"hits": 0, "misses": 0, "prefix_hits": 0} | |
| def _hash_prompt(self, system: str, tools: Optional[List] = None) -> str: | |
| """Generate hash for system prompt + tools combination.""" | |
| content = system or "" | |
| if tools: | |
| content += json.dumps(tools, sort_keys=True) | |
| return hashlib.md5(content.encode()).hexdigest()[:16] | |
| def get(self, system: str, tools: Optional[List] = None) -> Optional[Dict]: | |
| """Get cached prompt data with TTL check.""" | |
| with self.lock: | |
| key = self._hash_prompt(system, tools) | |
| if key in self.cache: | |
| entry = self.cache[key] | |
| # Check TTL | |
| if time.time() - entry.get("created", 0) < self.ttl_seconds: | |
| self.stats["hits"] += 1 | |
| self.cache.move_to_end(key) | |
| metrics.record_cache_hit() | |
| return entry | |
| else: | |
| # Expired, remove it | |
| del self.cache[key] | |
| self.stats["misses"] += 1 | |
| metrics.record_cache_miss() | |
| return None | |
| def get_prefix(self, system: str, tools: Optional[List] = None) -> Optional[str]: | |
| """Get cached formatted prompt prefix.""" | |
| with self.lock: | |
| key = self._hash_prompt(system, tools) | |
| if key in self.prefix_cache: | |
| self.stats["prefix_hits"] += 1 | |
| return self.prefix_cache[key] | |
| return None | |
| def set_prefix(self, system: str, tools: Optional[List], formatted_prefix: str): | |
| """Cache the formatted prompt prefix.""" | |
| with self.lock: | |
| key = self._hash_prompt(system, tools) | |
| self.prefix_cache[key] = formatted_prefix | |
| def set(self, system: str, tools: Optional[List], data: Dict): | |
| """Cache prompt data with timestamp.""" | |
| with self.lock: | |
| key = self._hash_prompt(system, tools) | |
| if len(self.cache) >= self.max_size: | |
| oldest = next(iter(self.cache)) | |
| del self.cache[oldest] | |
| data["created"] = time.time() | |
| self.cache[key] = data | |
| def get_stats(self) -> Dict: | |
| total = self.stats["hits"] + self.stats["misses"] | |
| hit_rate = (self.stats["hits"] / total * 100) if total > 0 else 0 | |
| return { | |
| "size": len(self.cache), | |
| "prefix_cache_size": len(self.prefix_cache), | |
| "max_size": self.max_size, | |
| "hits": self.stats["hits"], | |
| "misses": self.stats["misses"], | |
| "prefix_hits": self.stats["prefix_hits"], | |
| "hit_rate": f"{hit_rate:.1f}%", | |
| "ttl_seconds": self.ttl_seconds | |
| } | |
| prompt_cache = PromptCache(max_size=50, ttl_seconds=3600) | |
| # ============== Feature 3: Multi-Model Manager ============== | |
| class ModelManager: | |
| def __init__(self): | |
| self.models: Dict[str, Llama] = {} | |
| self.current_model: Optional[str] = None | |
| self.lock = Lock() | |
| self.load_stats: Dict[str, Dict] = {} | |
| def load_model(self, model_id: str) -> Llama: | |
| """Load a model (lazy loading with hot-swap).""" | |
| with self.lock: | |
| if model_id in self.models: | |
| self.current_model = model_id | |
| return self.models[model_id] | |
| if model_id not in MODEL_CONFIGS: | |
| raise HTTPException(status_code=400, detail=f"Unknown model: {model_id}") | |
| config = MODEL_CONFIGS[model_id] | |
| # Check if model file exists | |
| if not os.path.exists(config["path"]): | |
| raise HTTPException( | |
| status_code=503, | |
| detail=f"Model file not found: {model_id}. Available: {list(self.models.keys())}" | |
| ) | |
| logger.info(f"Loading model: {model_id}") | |
| start = time.time() | |
| try: | |
| llm = Llama( | |
| model_path=config["path"], | |
| n_ctx=N_CTX, | |
| n_threads=N_THREADS, | |
| n_batch=N_BATCH, | |
| n_gpu_layers=N_GPU_LAYERS, | |
| use_mlock=USE_MLOCK, | |
| use_mmap=USE_MMAP, | |
| verbose=False | |
| ) | |
| load_time = time.time() - start | |
| self.models[model_id] = llm | |
| self.current_model = model_id | |
| self.load_stats[model_id] = { | |
| "loaded_at": datetime.now().isoformat(), | |
| "load_time": f"{load_time:.2f}s" | |
| } | |
| logger.info(f"Model {model_id} loaded in {load_time:.2f}s") | |
| return llm | |
| except Exception as e: | |
| logger.error(f"Failed to load model {model_id}: {e}") | |
| raise HTTPException(status_code=500, detail=f"Failed to load model: {e}") | |
| def get_model(self, model_id: Optional[str] = None) -> Llama: | |
| """Get a model, loading if necessary.""" | |
| if model_id is None: | |
| # Use default or current model | |
| model_id = self.current_model or self._get_default_model() | |
| # Normalize model name | |
| model_id = self._normalize_model_id(model_id) | |
| if model_id in self.models: | |
| return self.models[model_id] | |
| return self.load_model(model_id) | |
| def _normalize_model_id(self, model_id: str) -> str: | |
| """Normalize model ID to match config keys.""" | |
| model_id = model_id.lower().strip() | |
| # Handle common variations | |
| if "7b" in model_id and "qwen" in model_id: | |
| return "qwen2.5-coder-7b" | |
| if "1.5b" in model_id and "qwen" in model_id: | |
| return "qwen2.5-coder-1.5b" | |
| # Check if exact match | |
| if model_id in MODEL_CONFIGS: | |
| return model_id | |
| # Default to 7B | |
| return "qwen2.5-coder-7b" | |
| def _get_default_model(self) -> str: | |
| for model_id, config in MODEL_CONFIGS.items(): | |
| if config.get("default"): | |
| return model_id | |
| return list(MODEL_CONFIGS.keys())[0] | |
| def list_models(self) -> List[Dict]: | |
| """List all available models.""" | |
| models = [] | |
| for model_id, config in MODEL_CONFIGS.items(): | |
| models.append({ | |
| "id": model_id, | |
| "size": config["size"], | |
| "quantization": config["quantization"], | |
| "loaded": model_id in self.models, | |
| "available": os.path.exists(config["path"]), | |
| "default": config.get("default", False) | |
| }) | |
| return models | |
| def get_stats(self) -> Dict: | |
| return { | |
| "current_model": self.current_model, | |
| "loaded_models": list(self.models.keys()), | |
| "load_stats": self.load_stats | |
| } | |
| def unload_model(self, model_id: str): | |
| """Unload a model to free memory.""" | |
| with self.lock: | |
| if model_id in self.models: | |
| del self.models[model_id] | |
| if self.current_model == model_id: | |
| self.current_model = None | |
| logger.info(f"Model {model_id} unloaded") | |
| model_manager = ModelManager() | |
| # ============== App Initialization ============== | |
| async def lifespan(app: FastAPI): | |
| # Load default model on startup | |
| default_model = None | |
| for model_id, config in MODEL_CONFIGS.items(): | |
| if config.get("default") and os.path.exists(config["path"]): | |
| default_model = model_id | |
| break | |
| if default_model: | |
| try: | |
| model_manager.load_model(default_model) | |
| except Exception as e: | |
| logger.error(f"Failed to load default model: {e}") | |
| else: | |
| logger.warning("No default model found, will load on first request") | |
| yield | |
| logger.info("Shutting down...") | |
| app = FastAPI( | |
| title="Dual-Compatible API (OpenAI + Anthropic) v3.0", | |
| description="llama.cpp API with Queue, Caching, and Multi-Model support", | |
| version="3.0.0", | |
| lifespan=lifespan | |
| ) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| async def log_requests(request: Request, call_next): | |
| request_id = str(uuid.uuid4())[:8] | |
| start_time = time.time() | |
| # Add request ID to headers for tracking | |
| response = await call_next(request) | |
| duration = (time.time() - start_time) * 1000 | |
| logger.info(f"[{request_id}] {request.method} {request.url.path} - {response.status_code} ({duration:.2f}ms)") | |
| response.headers["X-Request-ID"] = request_id | |
| response.headers["X-Processing-Time"] = f"{duration:.2f}ms" | |
| return response | |
| # ============================================================ | |
| # ANTHROPIC-COMPATIBLE MODELS | |
| # ============================================================ | |
| class AnthropicTextBlock(BaseModel): | |
| type: Literal["text"] = "text" | |
| text: str | |
| class AnthropicImageSource(BaseModel): | |
| type: Literal["base64", "url"] = "base64" | |
| media_type: Optional[str] = None | |
| data: Optional[str] = None | |
| url: Optional[str] = None | |
| class AnthropicImageBlock(BaseModel): | |
| type: Literal["image"] = "image" | |
| source: AnthropicImageSource | |
| class AnthropicToolUseBlock(BaseModel): | |
| type: Literal["tool_use"] = "tool_use" | |
| id: str | |
| name: str | |
| input: Dict[str, Any] | |
| class AnthropicToolResultBlock(BaseModel): | |
| type: Literal["tool_result"] = "tool_result" | |
| tool_use_id: str | |
| content: Optional[Union[str, List[AnthropicTextBlock]]] = None | |
| is_error: Optional[bool] = False | |
| AnthropicContentBlock = Union[AnthropicTextBlock, AnthropicImageBlock, AnthropicToolUseBlock, AnthropicToolResultBlock] | |
| class AnthropicMessage(BaseModel): | |
| role: Literal["user", "assistant"] | |
| content: Union[str, List[AnthropicContentBlock]] | |
| class AnthropicToolInputSchema(BaseModel): | |
| type: Literal["object"] = "object" | |
| properties: Optional[Dict[str, Any]] = None | |
| required: Optional[List[str]] = None | |
| class AnthropicTool(BaseModel): | |
| name: str | |
| description: Optional[str] = None | |
| input_schema: AnthropicToolInputSchema | |
| class AnthropicToolChoiceAuto(BaseModel): | |
| type: Literal["auto"] = "auto" | |
| disable_parallel_tool_use: Optional[bool] = None | |
| class AnthropicToolChoiceAny(BaseModel): | |
| type: Literal["any"] = "any" | |
| disable_parallel_tool_use: Optional[bool] = None | |
| class AnthropicToolChoiceTool(BaseModel): | |
| type: Literal["tool"] = "tool" | |
| name: str | |
| disable_parallel_tool_use: Optional[bool] = None | |
| AnthropicToolChoice = Union[AnthropicToolChoiceAuto, AnthropicToolChoiceAny, AnthropicToolChoiceTool] | |
| class AnthropicMetadata(BaseModel): | |
| user_id: Optional[str] = None | |
| class AnthropicCacheControl(BaseModel): | |
| type: Literal["ephemeral"] = "ephemeral" | |
| class AnthropicSystemContent(BaseModel): | |
| type: Literal["text"] = "text" | |
| text: str | |
| cache_control: Optional[AnthropicCacheControl] = None | |
| class AnthropicThinkingConfig(BaseModel): | |
| type: Literal["enabled", "disabled"] = "enabled" | |
| budget_tokens: Optional[int] = Field(default=1024, ge=1, le=128000) | |
| class AnthropicMessageRequest(BaseModel): | |
| model: str | |
| max_tokens: int | |
| messages: List[AnthropicMessage] | |
| metadata: Optional[AnthropicMetadata] = None | |
| stop_sequences: Optional[List[str]] = None | |
| stream: Optional[bool] = False | |
| system: Optional[Union[str, List[AnthropicSystemContent]]] = None | |
| temperature: Optional[float] = Field(default=0.7, ge=0.0, le=1.0) | |
| tool_choice: Optional[AnthropicToolChoice] = None | |
| tools: Optional[List[AnthropicTool]] = None | |
| top_k: Optional[int] = Field(default=None, ge=0) | |
| top_p: Optional[float] = Field(default=None, ge=0.0, le=1.0) | |
| thinking: Optional[AnthropicThinkingConfig] = None | |
| class AnthropicUsage(BaseModel): | |
| input_tokens: int | |
| output_tokens: int | |
| cache_creation_input_tokens: Optional[int] = None | |
| cache_read_input_tokens: Optional[int] = None | |
| class AnthropicResponseTextBlock(BaseModel): | |
| type: Literal["text"] = "text" | |
| text: str | |
| class AnthropicResponseThinkingBlock(BaseModel): | |
| type: Literal["thinking"] = "thinking" | |
| thinking: str | |
| class AnthropicResponseToolUseBlock(BaseModel): | |
| type: Literal["tool_use"] = "tool_use" | |
| id: str | |
| name: str | |
| input: Dict[str, Any] | |
| AnthropicResponseContentBlock = Union[AnthropicResponseTextBlock, AnthropicResponseThinkingBlock, AnthropicResponseToolUseBlock] | |
| class AnthropicMessageResponse(BaseModel): | |
| id: str | |
| type: Literal["message"] = "message" | |
| role: Literal["assistant"] = "assistant" | |
| content: List[AnthropicResponseContentBlock] | |
| model: str | |
| stop_reason: Optional[Literal["end_turn", "max_tokens", "stop_sequence", "tool_use"]] = None | |
| stop_sequence: Optional[str] = None | |
| usage: AnthropicUsage | |
| class AnthropicTokenCountRequest(BaseModel): | |
| model: str | |
| messages: List[AnthropicMessage] | |
| system: Optional[Union[str, List[AnthropicSystemContent]]] = None | |
| tools: Optional[List[AnthropicTool]] = None | |
| thinking: Optional[AnthropicThinkingConfig] = None | |
| class AnthropicTokenCountResponse(BaseModel): | |
| input_tokens: int | |
| # ============================================================ | |
| # OPENAI-COMPATIBLE MODELS | |
| # ============================================================ | |
| class OpenAIMessage(BaseModel): | |
| role: Literal["system", "user", "assistant", "tool"] | |
| content: Optional[Union[str, List[Dict[str, Any]]]] = None | |
| name: Optional[str] = None | |
| tool_calls: Optional[List[Dict[str, Any]]] = None | |
| tool_call_id: Optional[str] = None | |
| class OpenAITool(BaseModel): | |
| type: Literal["function"] = "function" | |
| function: Dict[str, Any] | |
| class OpenAIToolChoice(BaseModel): | |
| type: str | |
| function: Optional[Dict[str, str]] = None | |
| class OpenAIChatRequest(BaseModel): | |
| model: str | |
| messages: List[OpenAIMessage] | |
| max_tokens: Optional[int] = 1024 | |
| temperature: Optional[float] = Field(default=0.7, ge=0.0, le=2.0) | |
| top_p: Optional[float] = Field(default=0.95, ge=0.0, le=1.0) | |
| n: Optional[int] = 1 | |
| stream: Optional[bool] = False | |
| stop: Optional[Union[str, List[str]]] = None | |
| presence_penalty: Optional[float] = 0.0 | |
| frequency_penalty: Optional[float] = 0.0 | |
| logit_bias: Optional[Dict[str, float]] = None | |
| user: Optional[str] = None | |
| tools: Optional[List[OpenAITool]] = None | |
| tool_choice: Optional[Union[str, OpenAIToolChoice]] = None | |
| seed: Optional[int] = None | |
| class OpenAIUsage(BaseModel): | |
| prompt_tokens: int | |
| completion_tokens: int | |
| total_tokens: int | |
| class OpenAIChoice(BaseModel): | |
| index: int | |
| message: Dict[str, Any] | |
| finish_reason: Optional[str] = None | |
| class OpenAIChatResponse(BaseModel): | |
| id: str | |
| object: Literal["chat.completion"] = "chat.completion" | |
| created: int | |
| model: str | |
| choices: List[OpenAIChoice] | |
| usage: OpenAIUsage | |
| system_fingerprint: Optional[str] = None | |
| class OpenAIModel(BaseModel): | |
| id: str | |
| object: Literal["model"] = "model" | |
| created: int | |
| owned_by: str | |
| class OpenAIModelList(BaseModel): | |
| object: Literal["list"] = "list" | |
| data: List[OpenAIModel] | |
| # ============== Helper Functions ============== | |
| def extract_anthropic_text(content: Union[str, List[AnthropicContentBlock]]) -> str: | |
| if isinstance(content, str): | |
| return content | |
| texts = [] | |
| for block in content: | |
| if isinstance(block, dict): | |
| if block.get("type") == "text": | |
| texts.append(block.get("text", "")) | |
| elif hasattr(block, "type") and block.type == "text": | |
| texts.append(block.text) | |
| return " ".join(texts) | |
| def extract_anthropic_system(system: Optional[Union[str, List[AnthropicSystemContent]]]) -> Optional[str]: | |
| if system is None: | |
| return None | |
| if isinstance(system, str): | |
| return system | |
| texts = [] | |
| for block in system: | |
| if isinstance(block, dict): | |
| texts.append(block.get("text", "")) | |
| elif hasattr(block, "text"): | |
| texts.append(block.text) | |
| return " ".join(texts) | |
| def check_cache_control(system: Optional[Union[str, List[AnthropicSystemContent]]]) -> bool: | |
| """Check if cache_control is set to ephemeral.""" | |
| if system is None or isinstance(system, str): | |
| return False | |
| for block in system: | |
| if isinstance(block, dict) and block.get("cache_control", {}).get("type") == "ephemeral": | |
| return True | |
| elif hasattr(block, "cache_control") and block.cache_control and block.cache_control.type == "ephemeral": | |
| return True | |
| return False | |
| def extract_openai_content(content: Optional[Union[str, List[Dict[str, Any]]]]) -> str: | |
| if content is None: | |
| return "" | |
| if isinstance(content, str): | |
| return content | |
| texts = [] | |
| for item in content: | |
| if isinstance(item, dict) and item.get("type") == "text": | |
| texts.append(item.get("text", "")) | |
| return " ".join(texts) | |
| def format_chat_prompt(messages: List[Dict[str, str]], system: Optional[str] = None) -> str: | |
| """Format messages for Qwen2.5 chat template""" | |
| prompt = "" | |
| if system: | |
| prompt += f"<|im_start|>system\n{system}<|im_end|>\n" | |
| for msg in messages: | |
| role = msg["role"] | |
| content = msg["content"] | |
| prompt += f"<|im_start|>{role}\n{content}<|im_end|>\n" | |
| prompt += "<|im_start|>assistant\n" | |
| return prompt | |
| def format_anthropic_messages( | |
| messages: List[AnthropicMessage], | |
| system: Optional[Union[str, List[AnthropicSystemContent]]] = None, | |
| tools: Optional[List[AnthropicTool]] = None, | |
| thinking_enabled: bool = False, | |
| budget_tokens: int = 1024 | |
| ) -> str: | |
| formatted_messages = [] | |
| system_text = extract_anthropic_system(system) or "" | |
| # Add tool definitions to system prompt if provided | |
| if tools: | |
| tool_defs = [] | |
| for tool in tools: | |
| tool_def = { | |
| "name": tool.name, | |
| "description": tool.description, | |
| "parameters": tool.input_schema.model_dump() | |
| } | |
| tool_defs.append(tool_def) | |
| tool_instruction = f"""You have access to the following tools: | |
| {json.dumps(tool_defs, indent=2)} | |
| To use a tool, respond with a JSON object in this exact format: | |
| {{"tool": "tool_name", "arguments": {{"arg1": "value1"}}}} | |
| Only use tools when necessary. If you don't need a tool, respond normally.""" | |
| system_text = f"{tool_instruction}\n\n{system_text}" if system_text else tool_instruction | |
| if thinking_enabled: | |
| thinking_instruction = f"""When solving complex problems: | |
| 1. Think through the problem step by step inside <thinking>...</thinking> tags | |
| 2. After thinking, provide your final answer outside the thinking tags | |
| Budget for thinking: up to {budget_tokens} tokens.""" | |
| system_text = f"{thinking_instruction}\n\n{system_text}" if system_text else thinking_instruction | |
| for msg in messages: | |
| content = extract_anthropic_text(msg.content) | |
| formatted_messages.append({"role": msg.role, "content": content}) | |
| return format_chat_prompt(formatted_messages, system_text if system_text else None) | |
| def format_openai_messages(messages: List[OpenAIMessage]) -> str: | |
| system_text = None | |
| formatted_messages = [] | |
| for msg in messages: | |
| if msg.role == "system": | |
| system_text = extract_openai_content(msg.content) | |
| else: | |
| content = extract_openai_content(msg.content) | |
| formatted_messages.append({"role": msg.role, "content": content}) | |
| return format_chat_prompt(formatted_messages, system_text) | |
| def parse_thinking_response(text: str) -> tuple: | |
| thinking_pattern = r'<thinking>(.*?)</thinking>' | |
| thinking_matches = re.findall(thinking_pattern, text, re.DOTALL) | |
| if thinking_matches: | |
| thinking_text = "\n".join(thinking_matches).strip() | |
| answer_text = re.sub(thinking_pattern, '', text, flags=re.DOTALL).strip() | |
| return thinking_text, answer_text | |
| return None, text.strip() | |
| def parse_tool_use(text: str) -> Optional[Dict[str, Any]]: | |
| """Parse tool use from model response""" | |
| try: | |
| text_stripped = text.strip() | |
| if text_stripped.startswith("{") and text_stripped.endswith("}"): | |
| parsed = json.loads(text_stripped) | |
| if "tool" in parsed: | |
| return parsed | |
| brace_count = 0 | |
| start_idx = None | |
| for i, char in enumerate(text): | |
| if char == '{': | |
| if brace_count == 0: | |
| start_idx = i | |
| brace_count += 1 | |
| elif char == '}': | |
| brace_count -= 1 | |
| if brace_count == 0 and start_idx is not None: | |
| json_str = text[start_idx:i+1] | |
| try: | |
| parsed = json.loads(json_str) | |
| if "tool" in parsed: | |
| return parsed | |
| except: | |
| pass | |
| start_idx = None | |
| except: | |
| pass | |
| return None | |
| def generate_id(prefix: str = "msg") -> str: | |
| return f"{prefix}_{uuid.uuid4().hex[:24]}" | |
| # ============== STATIC FILES ============== | |
| STATIC_DIR = os.path.join(os.path.dirname(__file__), "static") | |
| if os.path.exists(STATIC_DIR): | |
| app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") | |
| logger.info(f"Static files mounted from {STATIC_DIR}") | |
| # ============== ROOT ENDPOINTS ============== | |
| async def root(): | |
| """Serve the dashboard or API status""" | |
| static_file = os.path.join(STATIC_DIR, "index.html") | |
| if os.path.exists(static_file): | |
| return FileResponse(static_file, media_type="text/html") | |
| # Fallback to JSON if no static file | |
| return JSONResponse({ | |
| "status": "healthy", | |
| "version": "4.0.0", | |
| "backend": "llama.cpp + OpenBLAS", | |
| "features": [ | |
| "priority-queue", | |
| "prefix-caching", | |
| "ttl-cache", | |
| "multi-model", | |
| "extended-thinking", | |
| "streaming", | |
| "tool-use", | |
| "dual-compatibility", | |
| "metrics" | |
| ], | |
| "endpoints": { | |
| "openai": "/v1/chat/completions", | |
| "anthropic": "/anthropic/v1/messages", | |
| "metrics": "/metrics" | |
| }, | |
| "models": model_manager.list_models(), | |
| "queue": request_queue.get_status(), | |
| "cache": prompt_cache.get_stats(), | |
| "performance": metrics.get_stats() | |
| }) | |
| async def api_status(): | |
| """API status as JSON (for dashboard AJAX calls)""" | |
| return { | |
| "status": "healthy", | |
| "version": "4.0.0", | |
| "backend": "llama.cpp", | |
| "features": [ | |
| "priority-queue", | |
| "prefix-caching", | |
| "ttl-cache", | |
| "multi-model", | |
| "extended-thinking", | |
| "streaming", | |
| "tool-use", | |
| "dual-compatibility", | |
| "metrics" | |
| ], | |
| "endpoints": { | |
| "openai": "/v1/chat/completions", | |
| "anthropic": "/anthropic/v1/messages", | |
| "metrics": "/metrics" | |
| }, | |
| "models": model_manager.list_models(), | |
| "queue": request_queue.get_status(), | |
| "cache": prompt_cache.get_stats() | |
| } | |
| async def get_metrics(): | |
| """Detailed performance metrics for monitoring""" | |
| return { | |
| "api": metrics.get_stats(), | |
| "queue": request_queue.get_status(), | |
| "cache": prompt_cache.get_stats(), | |
| "models": model_manager.get_stats() | |
| } | |
| async def get_logs(lines: int = 100): | |
| try: | |
| with open(LOG_FILE, 'r') as f: | |
| all_lines = f.readlines() | |
| recent_lines = all_lines[-lines:] if len(all_lines) > lines else all_lines | |
| return {"log_file": LOG_FILE, "total_lines": len(all_lines), "logs": "".join(recent_lines)} | |
| except FileNotFoundError: | |
| return {"error": "Log file not found"} | |
| async def health(): | |
| return { | |
| "status": "ok", | |
| "models": model_manager.get_stats(), | |
| "queue": request_queue.get_status(), | |
| "cache": prompt_cache.get_stats() | |
| } | |
| async def queue_status(): | |
| return request_queue.get_status() | |
| async def models_status(): | |
| return { | |
| "models": model_manager.list_models(), | |
| "stats": model_manager.get_stats() | |
| } | |
| async def load_model(model_id: str): | |
| """Manually load a model.""" | |
| model_manager.load_model(model_id) | |
| return {"status": "loaded", "model": model_id} | |
| async def unload_model(model_id: str): | |
| """Unload a model to free memory.""" | |
| model_manager.unload_model(model_id) | |
| return {"status": "unloaded", "model": model_id} | |
| # ============================================================ | |
| # OPENAI-COMPATIBLE ENDPOINTS (/v1) | |
| # ============================================================ | |
| async def openai_list_models(): | |
| models = [] | |
| for model_id, config in MODEL_CONFIGS.items(): | |
| models.append(OpenAIModel( | |
| id=model_id, | |
| created=int(time.time()), | |
| owned_by="qwen" | |
| )) | |
| return OpenAIModelList(data=models) | |
| async def openai_chat_completions( | |
| request: OpenAIChatRequest, | |
| authorization: Optional[str] = Header(None) | |
| ): | |
| chat_id = generate_id("chatcmpl") | |
| # Queue management | |
| position = await request_queue.acquire(chat_id) | |
| if position > 0: | |
| await request_queue.wait_for_turn(chat_id) | |
| try: | |
| llm = model_manager.get_model(request.model) | |
| prompt = format_openai_messages(request.messages) | |
| if request.stream: | |
| return await openai_stream_response(request, prompt, chat_id, llm) | |
| stop_tokens = ["<|im_end|>", "<|endoftext|>"] | |
| if request.stop: | |
| if isinstance(request.stop, str): | |
| stop_tokens.append(request.stop) | |
| else: | |
| stop_tokens.extend(request.stop) | |
| gen_start = time.time() | |
| output = llm( | |
| prompt, | |
| max_tokens=request.max_tokens or 1024, | |
| temperature=request.temperature or 0.7, | |
| top_p=request.top_p or 0.95, | |
| stop=stop_tokens, | |
| echo=False | |
| ) | |
| gen_time = time.time() - gen_start | |
| generated_text = output["choices"][0]["text"].strip() | |
| usage = output["usage"] | |
| logger.info(f"[{chat_id}] Generated in {gen_time:.2f}s - tokens: {usage['completion_tokens']}") | |
| return OpenAIChatResponse( | |
| id=chat_id, | |
| created=int(time.time()), | |
| model=request.model, | |
| choices=[OpenAIChoice( | |
| index=0, | |
| message={"role": "assistant", "content": generated_text}, | |
| finish_reason="stop" | |
| )], | |
| usage=OpenAIUsage( | |
| prompt_tokens=usage["prompt_tokens"], | |
| completion_tokens=usage["completion_tokens"], | |
| total_tokens=usage["total_tokens"] | |
| ) | |
| ) | |
| except Exception as e: | |
| logger.error(f"[{chat_id}] Error: {e}", exc_info=True) | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| finally: | |
| await request_queue.release() | |
| async def openai_stream_response(request: OpenAIChatRequest, prompt: str, chat_id: str, llm: Llama): | |
| async def generate(): | |
| try: | |
| created = int(time.time()) | |
| initial_chunk = { | |
| "id": chat_id, | |
| "object": "chat.completion.chunk", | |
| "created": created, | |
| "model": request.model, | |
| "choices": [{"index": 0, "delta": {"role": "assistant", "content": ""}, "finish_reason": None}] | |
| } | |
| yield f"data: {json.dumps(initial_chunk)}\n\n" | |
| stop_tokens = ["<|im_end|>", "<|endoftext|>"] | |
| if request.stop: | |
| if isinstance(request.stop, str): | |
| stop_tokens.append(request.stop) | |
| else: | |
| stop_tokens.extend(request.stop) | |
| for output in llm( | |
| prompt, | |
| max_tokens=request.max_tokens or 1024, | |
| temperature=request.temperature or 0.7, | |
| top_p=request.top_p or 0.95, | |
| stop=stop_tokens, | |
| stream=True, | |
| echo=False | |
| ): | |
| text = output["choices"][0]["text"] | |
| if text: | |
| chunk = { | |
| "id": chat_id, | |
| "object": "chat.completion.chunk", | |
| "created": created, | |
| "model": request.model, | |
| "choices": [{"index": 0, "delta": {"content": text}, "finish_reason": None}] | |
| } | |
| yield f"data: {json.dumps(chunk)}\n\n" | |
| final_chunk = { | |
| "id": chat_id, | |
| "object": "chat.completion.chunk", | |
| "created": created, | |
| "model": request.model, | |
| "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}] | |
| } | |
| yield f"data: {json.dumps(final_chunk)}\n\n" | |
| yield "data: [DONE]\n\n" | |
| finally: | |
| await request_queue.release() | |
| return StreamingResponse(generate(), media_type="text/event-stream", headers={"Cache-Control": "no-cache"}) | |
| # ============================================================ | |
| # ANTHROPIC-COMPATIBLE ENDPOINTS (/anthropic) | |
| # ============================================================ | |
| async def anthropic_list_models(): | |
| models = [] | |
| for model_id, config in MODEL_CONFIGS.items(): | |
| models.append({ | |
| "id": model_id, | |
| "object": "model", | |
| "created": int(time.time()), | |
| "owned_by": "qwen", | |
| "display_name": f"Qwen2.5 Coder {config['size']} ({config['quantization']})", | |
| "supports_thinking": True, | |
| "supports_tools": True, | |
| "loaded": model_id in model_manager.models, | |
| "available": os.path.exists(config["path"]) | |
| }) | |
| return {"object": "list", "data": models} | |
| async def anthropic_create_message( | |
| request: AnthropicMessageRequest, | |
| x_api_key: Optional[str] = Header(None, alias="x-api-key"), | |
| anthropic_version: Optional[str] = Header(None, alias="anthropic-version"), | |
| anthropic_beta: Optional[str] = Header(None, alias="anthropic-beta") | |
| ): | |
| message_id = generate_id("msg") | |
| request_start = time.time() | |
| ttft = 0 # Time to first token | |
| # Estimate message length for priority queue | |
| msg_length = sum(len(str(m.content)) for m in request.messages) | |
| # Queue management with priority based on expected response length | |
| position = await request_queue.acquire(message_id, max_tokens=request.max_tokens, message_length=msg_length) | |
| if position > 0: | |
| await request_queue.wait_for_turn(message_id) | |
| thinking_enabled = False | |
| budget_tokens = 1024 | |
| if request.thinking: | |
| thinking_enabled = request.thinking.type == "enabled" | |
| budget_tokens = request.thinking.budget_tokens or 1024 | |
| # Check for cache control | |
| use_cache = check_cache_control(request.system) | |
| cache_hit = False | |
| cache_tokens = 0 | |
| try: | |
| llm = model_manager.get_model(request.model) | |
| # Check prompt cache | |
| system_text = extract_anthropic_system(request.system) | |
| tools_list = [t.model_dump() for t in request.tools] if request.tools else None | |
| if use_cache: | |
| cached = prompt_cache.get(system_text or "", tools_list) | |
| if cached: | |
| cache_hit = True | |
| cache_tokens = cached.get("tokens", 0) | |
| logger.info(f"[{message_id}] Prompt cache hit, saved ~{cache_tokens} tokens") | |
| prompt = format_anthropic_messages( | |
| request.messages, | |
| request.system, | |
| request.tools, | |
| thinking_enabled, | |
| budget_tokens | |
| ) | |
| # Cache the prompt prefix if cache_control is set | |
| if use_cache and not cache_hit: | |
| prompt_cache.set(system_text or "", tools_list, { | |
| "tokens": len(llm.tokenize(prompt.encode())) // 2, # Estimate prefix tokens | |
| "created": time.time() | |
| }) | |
| if request.stream: | |
| return await anthropic_stream_response(request, prompt, message_id, thinking_enabled, llm) | |
| total_max_tokens = request.max_tokens + (budget_tokens if thinking_enabled else 0) | |
| stop_tokens = ["<|im_end|>", "<|endoftext|>"] | |
| if request.stop_sequences: | |
| stop_tokens.extend(request.stop_sequences) | |
| gen_start = time.time() | |
| output = llm( | |
| prompt, | |
| max_tokens=total_max_tokens, | |
| temperature=request.temperature or 0.7, | |
| top_p=request.top_p or 0.95, | |
| top_k=request.top_k or 40, | |
| stop=stop_tokens, | |
| echo=False | |
| ) | |
| gen_time = time.time() - gen_start | |
| generated_text = output["choices"][0]["text"].strip() | |
| usage = output["usage"] | |
| # Parse response for tool use, thinking, etc. | |
| content_blocks = [] | |
| stop_reason = "end_turn" | |
| # Check for tool use | |
| tool_call = parse_tool_use(generated_text) | |
| if tool_call and request.tools: | |
| tool_id = f"toolu_{uuid.uuid4().hex[:24]}" | |
| content_blocks.append(AnthropicResponseToolUseBlock( | |
| type="tool_use", | |
| id=tool_id, | |
| name=tool_call["tool"], | |
| input=tool_call.get("arguments", {}) | |
| )) | |
| stop_reason = "tool_use" | |
| elif thinking_enabled: | |
| thinking_text, answer_text = parse_thinking_response(generated_text) | |
| if thinking_text: | |
| content_blocks.append(AnthropicResponseThinkingBlock(type="thinking", thinking=thinking_text)) | |
| content_blocks.append(AnthropicResponseTextBlock(type="text", text=answer_text)) | |
| else: | |
| content_blocks.append(AnthropicResponseTextBlock(type="text", text=generated_text)) | |
| if usage["completion_tokens"] >= total_max_tokens: | |
| stop_reason = "max_tokens" | |
| total_time = time.time() - request_start | |
| ttft = gen_time # For non-streaming, TTFT ~ generation time | |
| # Record metrics | |
| metrics.record_request( | |
| model=request.model, | |
| ttft=ttft, | |
| total_time=total_time, | |
| tokens=usage["completion_tokens"] | |
| ) | |
| logger.info(f"[{message_id}] Generated in {gen_time:.2f}s - tokens: {usage['completion_tokens']}, cache_hit: {cache_hit}, total: {total_time:.2f}s") | |
| return AnthropicMessageResponse( | |
| id=message_id, | |
| content=content_blocks, | |
| model=request.model, | |
| stop_reason=stop_reason, | |
| usage=AnthropicUsage( | |
| input_tokens=usage["prompt_tokens"], | |
| output_tokens=usage["completion_tokens"], | |
| cache_creation_input_tokens=cache_tokens if use_cache and not cache_hit else None, | |
| cache_read_input_tokens=cache_tokens if cache_hit else None | |
| ) | |
| ) | |
| except Exception as e: | |
| logger.error(f"[{message_id}] Error: {e}", exc_info=True) | |
| metrics.record_error() | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| finally: | |
| await request_queue.release() | |
| async def anthropic_stream_response(request: AnthropicMessageRequest, prompt: str, message_id: str, thinking_enabled: bool, llm: Llama): | |
| async def generate(): | |
| try: | |
| start_event = { | |
| "type": "message_start", | |
| "message": { | |
| "id": message_id, "type": "message", "role": "assistant", "content": [], | |
| "model": request.model, "stop_reason": None, "stop_sequence": None, | |
| "usage": {"input_tokens": 0, "output_tokens": 0} | |
| } | |
| } | |
| yield f"event: message_start\ndata: {json.dumps(start_event)}\n\n" | |
| yield f"event: content_block_start\ndata: {json.dumps({'type': 'content_block_start', 'index': 0, 'content_block': {'type': 'text', 'text': ''}})}\n\n" | |
| stop_tokens = ["<|im_end|>", "<|endoftext|>"] | |
| if request.stop_sequences: | |
| stop_tokens.extend(request.stop_sequences) | |
| total_tokens = 0 | |
| for output in llm( | |
| prompt, | |
| max_tokens=request.max_tokens, | |
| temperature=request.temperature or 0.7, | |
| top_p=request.top_p or 0.95, | |
| stop=stop_tokens, | |
| stream=True, | |
| echo=False | |
| ): | |
| text = output["choices"][0]["text"] | |
| if text: | |
| total_tokens += 1 | |
| yield f"event: content_block_delta\ndata: {json.dumps({'type': 'content_block_delta', 'index': 0, 'delta': {'type': 'text_delta', 'text': text}})}\n\n" | |
| yield f"event: content_block_stop\ndata: {json.dumps({'type': 'content_block_stop', 'index': 0})}\n\n" | |
| yield f"event: message_delta\ndata: {json.dumps({'type': 'message_delta', 'delta': {'stop_reason': 'end_turn'}, 'usage': {'output_tokens': total_tokens}})}\n\n" | |
| yield f"event: message_stop\ndata: {json.dumps({'type': 'message_stop'})}\n\n" | |
| finally: | |
| await request_queue.release() | |
| return StreamingResponse(generate(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) | |
| async def anthropic_count_tokens(request: AnthropicTokenCountRequest): | |
| llm = model_manager.get_model(request.model) | |
| prompt = format_anthropic_messages(request.messages, request.system) | |
| tokens = llm.tokenize(prompt.encode()) | |
| return AnthropicTokenCountResponse(input_tokens=len(tokens)) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860, log_config=None) | |