|
|
""" |
|
|
Dual-Compatible API Endpoint (OpenAI + Anthropic) |
|
|
llama.cpp powered with advanced features: |
|
|
- Request Queue & Rate Limiting |
|
|
- Prompt Caching (KV Cache) |
|
|
- Multi-Model Hot-Swap |
|
|
""" |
|
|
|
|
|
import os |
|
|
import time |
|
|
import uuid |
|
|
import logging |
|
|
import re |
|
|
import json |
|
|
import asyncio |
|
|
import hashlib |
|
|
from datetime import datetime |
|
|
from logging.handlers import RotatingFileHandler |
|
|
from typing import List, Optional, Union, Dict, Any, Literal |
|
|
from contextlib import asynccontextmanager |
|
|
from threading import Thread, Lock |
|
|
from collections import OrderedDict |
|
|
from dataclasses import dataclass, field |
|
|
|
|
|
from fastapi import FastAPI, HTTPException, Header, Request, BackgroundTasks |
|
|
from fastapi.responses import StreamingResponse, JSONResponse, HTMLResponse, FileResponse |
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
from fastapi.staticfiles import StaticFiles |
|
|
from pydantic import BaseModel, Field |
|
|
from llama_cpp import Llama |
|
|
|
|
|
|
|
|
LOG_DIR = "/tmp/logs" |
|
|
os.makedirs(LOG_DIR, exist_ok=True) |
|
|
LOG_FILE = os.path.join(LOG_DIR, "api.log") |
|
|
|
|
|
log_format = logging.Formatter( |
|
|
'%(asctime)s | %(levelname)-8s | %(name)s | %(message)s', |
|
|
datefmt='%Y-%m-%d %H:%M:%S' |
|
|
) |
|
|
|
|
|
file_handler = RotatingFileHandler( |
|
|
LOG_FILE, maxBytes=10*1024*1024, backupCount=5, encoding='utf-8' |
|
|
) |
|
|
file_handler.setFormatter(log_format) |
|
|
file_handler.setLevel(logging.DEBUG) |
|
|
|
|
|
console_handler = logging.StreamHandler() |
|
|
console_handler.setFormatter(log_format) |
|
|
console_handler.setLevel(logging.INFO) |
|
|
|
|
|
logging.basicConfig(level=logging.DEBUG, handlers=[file_handler, console_handler]) |
|
|
logger = logging.getLogger("llama-api") |
|
|
|
|
|
for uvicorn_logger in ["uvicorn", "uvicorn.error", "uvicorn.access"]: |
|
|
uv_log = logging.getLogger(uvicorn_logger) |
|
|
uv_log.handlers = [file_handler, console_handler] |
|
|
|
|
|
logger.info("=" * 60) |
|
|
logger.info(f"llama.cpp API v3.0 Startup at {datetime.now().isoformat()}") |
|
|
logger.info(f"Log file: {LOG_FILE}") |
|
|
logger.info("=" * 60) |
|
|
|
|
|
|
|
|
MODELS_DIR = "/app/models" |
|
|
N_CTX = 8192 |
|
|
N_THREADS = 2 |
|
|
N_BATCH = 128 |
|
|
|
|
|
|
|
|
MODEL_CONFIGS = { |
|
|
"qwen2.5-coder-7b": { |
|
|
"path": f"{MODELS_DIR}/qwen2.5-coder-7b-instruct-q4_k_m.gguf", |
|
|
"url": "https://huggingface.co/Qwen/Qwen2.5-Coder-7B-Instruct-GGUF/resolve/main/qwen2.5-coder-7b-instruct-q4_k_m.gguf", |
|
|
"size": "7B", |
|
|
"quantization": "Q4_K_M", |
|
|
"default": True |
|
|
}, |
|
|
"qwen2.5-coder-1.5b": { |
|
|
"path": f"{MODELS_DIR}/qwen2.5-coder-1.5b-instruct-q8_0.gguf", |
|
|
"url": "https://huggingface.co/Qwen/Qwen2.5-Coder-1.5B-Instruct-GGUF/resolve/main/qwen2.5-coder-1.5b-instruct-q8_0.gguf", |
|
|
"size": "1.5B", |
|
|
"quantization": "Q8_0", |
|
|
"default": False |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class QueuedRequest: |
|
|
id: str |
|
|
priority: int = 0 |
|
|
created_at: float = field(default_factory=time.time) |
|
|
|
|
|
future: Optional[asyncio.Future] = None |
|
|
|
|
|
class RequestQueue: |
|
|
def __init__(self, max_concurrent: int = 1, max_queue_size: int = 50): |
|
|
self.max_concurrent = max_concurrent |
|
|
self.max_queue_size = max_queue_size |
|
|
self.queue: List[QueuedRequest] = [] |
|
|
self.active_requests = 0 |
|
|
self.lock = asyncio.Lock() |
|
|
self.stats = { |
|
|
"total_requests": 0, |
|
|
"completed_requests": 0, |
|
|
"rejected_requests": 0, |
|
|
"avg_wait_time": 0.0 |
|
|
} |
|
|
|
|
|
async def acquire(self, request_id: str, priority: int = 0) -> int: |
|
|
"""Add request to queue, return position. Raises if queue full.""" |
|
|
async with self.lock: |
|
|
if len(self.queue) >= self.max_queue_size: |
|
|
self.stats["rejected_requests"] += 1 |
|
|
raise HTTPException(status_code=503, detail="Queue full, try again later") |
|
|
|
|
|
self.stats["total_requests"] += 1 |
|
|
|
|
|
if self.active_requests < self.max_concurrent: |
|
|
self.active_requests += 1 |
|
|
return 0 |
|
|
|
|
|
req = QueuedRequest(id=request_id, priority=priority) |
|
|
self.queue.append(req) |
|
|
self.queue.sort(key=lambda x: (-x.priority, x.created_at)) |
|
|
position = self.queue.index(req) + 1 |
|
|
|
|
|
logger.info(f"[{request_id}] Queued at position {position}") |
|
|
return position |
|
|
|
|
|
async def wait_for_turn(self, request_id: str) -> float: |
|
|
"""Wait until it's this request's turn. Returns wait time.""" |
|
|
start = time.time() |
|
|
while True: |
|
|
async with self.lock: |
|
|
|
|
|
if self.queue and self.queue[0].id == request_id: |
|
|
if self.active_requests < self.max_concurrent: |
|
|
self.queue.pop(0) |
|
|
self.active_requests += 1 |
|
|
wait_time = time.time() - start |
|
|
|
|
|
self.stats["avg_wait_time"] = ( |
|
|
self.stats["avg_wait_time"] * 0.9 + wait_time * 0.1 |
|
|
) |
|
|
return wait_time |
|
|
await asyncio.sleep(0.1) |
|
|
|
|
|
async def release(self): |
|
|
"""Release a slot when request completes.""" |
|
|
async with self.lock: |
|
|
self.active_requests = max(0, self.active_requests - 1) |
|
|
self.stats["completed_requests"] += 1 |
|
|
|
|
|
def get_status(self) -> Dict: |
|
|
return { |
|
|
"queue_length": len(self.queue), |
|
|
"active_requests": self.active_requests, |
|
|
"max_concurrent": self.max_concurrent, |
|
|
"stats": self.stats |
|
|
} |
|
|
|
|
|
def get_position(self, request_id: str) -> Optional[int]: |
|
|
for i, req in enumerate(self.queue): |
|
|
if req.id == request_id: |
|
|
return i + 1 |
|
|
return None |
|
|
|
|
|
request_queue = RequestQueue(max_concurrent=1, max_queue_size=50) |
|
|
|
|
|
|
|
|
class PromptCache: |
|
|
def __init__(self, max_size: int = 10): |
|
|
self.max_size = max_size |
|
|
self.cache: OrderedDict[str, Dict] = OrderedDict() |
|
|
self.lock = Lock() |
|
|
self.stats = {"hits": 0, "misses": 0} |
|
|
|
|
|
def _hash_prompt(self, system: str, tools: Optional[List] = None) -> str: |
|
|
"""Generate hash for system prompt + tools combination.""" |
|
|
content = system or "" |
|
|
if tools: |
|
|
content += json.dumps(tools, sort_keys=True) |
|
|
return hashlib.md5(content.encode()).hexdigest()[:16] |
|
|
|
|
|
def get(self, system: str, tools: Optional[List] = None) -> Optional[Dict]: |
|
|
"""Get cached prompt prefix.""" |
|
|
with self.lock: |
|
|
key = self._hash_prompt(system, tools) |
|
|
if key in self.cache: |
|
|
self.stats["hits"] += 1 |
|
|
self.cache.move_to_end(key) |
|
|
logger.debug(f"Prompt cache HIT: {key}") |
|
|
return self.cache[key] |
|
|
self.stats["misses"] += 1 |
|
|
return None |
|
|
|
|
|
def set(self, system: str, tools: Optional[List], data: Dict): |
|
|
"""Cache prompt prefix data.""" |
|
|
with self.lock: |
|
|
key = self._hash_prompt(system, tools) |
|
|
if len(self.cache) >= self.max_size: |
|
|
oldest = next(iter(self.cache)) |
|
|
del self.cache[oldest] |
|
|
logger.debug(f"Prompt cache evicted: {oldest}") |
|
|
self.cache[key] = data |
|
|
logger.debug(f"Prompt cache SET: {key}") |
|
|
|
|
|
def get_stats(self) -> Dict: |
|
|
total = self.stats["hits"] + self.stats["misses"] |
|
|
hit_rate = (self.stats["hits"] / total * 100) if total > 0 else 0 |
|
|
return { |
|
|
"size": len(self.cache), |
|
|
"max_size": self.max_size, |
|
|
"hits": self.stats["hits"], |
|
|
"misses": self.stats["misses"], |
|
|
"hit_rate": f"{hit_rate:.1f}%" |
|
|
} |
|
|
|
|
|
prompt_cache = PromptCache(max_size=10) |
|
|
|
|
|
|
|
|
class ModelManager: |
|
|
def __init__(self): |
|
|
self.models: Dict[str, Llama] = {} |
|
|
self.current_model: Optional[str] = None |
|
|
self.lock = Lock() |
|
|
self.load_stats: Dict[str, Dict] = {} |
|
|
|
|
|
def load_model(self, model_id: str) -> Llama: |
|
|
"""Load a model (lazy loading with hot-swap).""" |
|
|
with self.lock: |
|
|
if model_id in self.models: |
|
|
self.current_model = model_id |
|
|
return self.models[model_id] |
|
|
|
|
|
if model_id not in MODEL_CONFIGS: |
|
|
raise HTTPException(status_code=400, detail=f"Unknown model: {model_id}") |
|
|
|
|
|
config = MODEL_CONFIGS[model_id] |
|
|
|
|
|
|
|
|
if not os.path.exists(config["path"]): |
|
|
raise HTTPException( |
|
|
status_code=503, |
|
|
detail=f"Model file not found: {model_id}. Available: {list(self.models.keys())}" |
|
|
) |
|
|
|
|
|
logger.info(f"Loading model: {model_id}") |
|
|
start = time.time() |
|
|
|
|
|
try: |
|
|
llm = Llama( |
|
|
model_path=config["path"], |
|
|
n_ctx=N_CTX, |
|
|
n_threads=N_THREADS, |
|
|
n_batch=N_BATCH, |
|
|
verbose=False |
|
|
) |
|
|
|
|
|
load_time = time.time() - start |
|
|
self.models[model_id] = llm |
|
|
self.current_model = model_id |
|
|
self.load_stats[model_id] = { |
|
|
"loaded_at": datetime.now().isoformat(), |
|
|
"load_time": f"{load_time:.2f}s" |
|
|
} |
|
|
|
|
|
logger.info(f"Model {model_id} loaded in {load_time:.2f}s") |
|
|
return llm |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to load model {model_id}: {e}") |
|
|
raise HTTPException(status_code=500, detail=f"Failed to load model: {e}") |
|
|
|
|
|
def get_model(self, model_id: Optional[str] = None) -> Llama: |
|
|
"""Get a model, loading if necessary.""" |
|
|
if model_id is None: |
|
|
|
|
|
model_id = self.current_model or self._get_default_model() |
|
|
|
|
|
|
|
|
model_id = self._normalize_model_id(model_id) |
|
|
|
|
|
if model_id in self.models: |
|
|
return self.models[model_id] |
|
|
|
|
|
return self.load_model(model_id) |
|
|
|
|
|
def _normalize_model_id(self, model_id: str) -> str: |
|
|
"""Normalize model ID to match config keys.""" |
|
|
model_id = model_id.lower().strip() |
|
|
|
|
|
if "7b" in model_id and "qwen" in model_id: |
|
|
return "qwen2.5-coder-7b" |
|
|
if "1.5b" in model_id and "qwen" in model_id: |
|
|
return "qwen2.5-coder-1.5b" |
|
|
|
|
|
if model_id in MODEL_CONFIGS: |
|
|
return model_id |
|
|
|
|
|
return "qwen2.5-coder-7b" |
|
|
|
|
|
def _get_default_model(self) -> str: |
|
|
for model_id, config in MODEL_CONFIGS.items(): |
|
|
if config.get("default"): |
|
|
return model_id |
|
|
return list(MODEL_CONFIGS.keys())[0] |
|
|
|
|
|
def list_models(self) -> List[Dict]: |
|
|
"""List all available models.""" |
|
|
models = [] |
|
|
for model_id, config in MODEL_CONFIGS.items(): |
|
|
models.append({ |
|
|
"id": model_id, |
|
|
"size": config["size"], |
|
|
"quantization": config["quantization"], |
|
|
"loaded": model_id in self.models, |
|
|
"available": os.path.exists(config["path"]), |
|
|
"default": config.get("default", False) |
|
|
}) |
|
|
return models |
|
|
|
|
|
def get_stats(self) -> Dict: |
|
|
return { |
|
|
"current_model": self.current_model, |
|
|
"loaded_models": list(self.models.keys()), |
|
|
"load_stats": self.load_stats |
|
|
} |
|
|
|
|
|
def unload_model(self, model_id: str): |
|
|
"""Unload a model to free memory.""" |
|
|
with self.lock: |
|
|
if model_id in self.models: |
|
|
del self.models[model_id] |
|
|
if self.current_model == model_id: |
|
|
self.current_model = None |
|
|
logger.info(f"Model {model_id} unloaded") |
|
|
|
|
|
model_manager = ModelManager() |
|
|
|
|
|
|
|
|
@asynccontextmanager |
|
|
async def lifespan(app: FastAPI): |
|
|
|
|
|
default_model = None |
|
|
for model_id, config in MODEL_CONFIGS.items(): |
|
|
if config.get("default") and os.path.exists(config["path"]): |
|
|
default_model = model_id |
|
|
break |
|
|
|
|
|
if default_model: |
|
|
try: |
|
|
model_manager.load_model(default_model) |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to load default model: {e}") |
|
|
else: |
|
|
logger.warning("No default model found, will load on first request") |
|
|
|
|
|
yield |
|
|
logger.info("Shutting down...") |
|
|
|
|
|
app = FastAPI( |
|
|
title="Dual-Compatible API (OpenAI + Anthropic) v3.0", |
|
|
description="llama.cpp API with Queue, Caching, and Multi-Model support", |
|
|
version="3.0.0", |
|
|
lifespan=lifespan |
|
|
) |
|
|
|
|
|
app.add_middleware( |
|
|
CORSMiddleware, |
|
|
allow_origins=["*"], |
|
|
allow_credentials=True, |
|
|
allow_methods=["*"], |
|
|
allow_headers=["*"], |
|
|
) |
|
|
|
|
|
@app.middleware("http") |
|
|
async def log_requests(request: Request, call_next): |
|
|
request_id = str(uuid.uuid4())[:8] |
|
|
start_time = time.time() |
|
|
|
|
|
|
|
|
response = await call_next(request) |
|
|
|
|
|
duration = (time.time() - start_time) * 1000 |
|
|
logger.info(f"[{request_id}] {request.method} {request.url.path} - {response.status_code} ({duration:.2f}ms)") |
|
|
|
|
|
response.headers["X-Request-ID"] = request_id |
|
|
response.headers["X-Processing-Time"] = f"{duration:.2f}ms" |
|
|
|
|
|
return response |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AnthropicTextBlock(BaseModel): |
|
|
type: Literal["text"] = "text" |
|
|
text: str |
|
|
|
|
|
class AnthropicImageSource(BaseModel): |
|
|
type: Literal["base64", "url"] = "base64" |
|
|
media_type: Optional[str] = None |
|
|
data: Optional[str] = None |
|
|
url: Optional[str] = None |
|
|
|
|
|
class AnthropicImageBlock(BaseModel): |
|
|
type: Literal["image"] = "image" |
|
|
source: AnthropicImageSource |
|
|
|
|
|
class AnthropicToolUseBlock(BaseModel): |
|
|
type: Literal["tool_use"] = "tool_use" |
|
|
id: str |
|
|
name: str |
|
|
input: Dict[str, Any] |
|
|
|
|
|
class AnthropicToolResultBlock(BaseModel): |
|
|
type: Literal["tool_result"] = "tool_result" |
|
|
tool_use_id: str |
|
|
content: Optional[Union[str, List[AnthropicTextBlock]]] = None |
|
|
is_error: Optional[bool] = False |
|
|
|
|
|
AnthropicContentBlock = Union[AnthropicTextBlock, AnthropicImageBlock, AnthropicToolUseBlock, AnthropicToolResultBlock] |
|
|
|
|
|
class AnthropicMessage(BaseModel): |
|
|
role: Literal["user", "assistant"] |
|
|
content: Union[str, List[AnthropicContentBlock]] |
|
|
|
|
|
class AnthropicToolInputSchema(BaseModel): |
|
|
type: Literal["object"] = "object" |
|
|
properties: Optional[Dict[str, Any]] = None |
|
|
required: Optional[List[str]] = None |
|
|
|
|
|
class AnthropicTool(BaseModel): |
|
|
name: str |
|
|
description: Optional[str] = None |
|
|
input_schema: AnthropicToolInputSchema |
|
|
|
|
|
class AnthropicToolChoiceAuto(BaseModel): |
|
|
type: Literal["auto"] = "auto" |
|
|
disable_parallel_tool_use: Optional[bool] = None |
|
|
|
|
|
class AnthropicToolChoiceAny(BaseModel): |
|
|
type: Literal["any"] = "any" |
|
|
disable_parallel_tool_use: Optional[bool] = None |
|
|
|
|
|
class AnthropicToolChoiceTool(BaseModel): |
|
|
type: Literal["tool"] = "tool" |
|
|
name: str |
|
|
disable_parallel_tool_use: Optional[bool] = None |
|
|
|
|
|
AnthropicToolChoice = Union[AnthropicToolChoiceAuto, AnthropicToolChoiceAny, AnthropicToolChoiceTool] |
|
|
|
|
|
class AnthropicMetadata(BaseModel): |
|
|
user_id: Optional[str] = None |
|
|
|
|
|
class AnthropicCacheControl(BaseModel): |
|
|
type: Literal["ephemeral"] = "ephemeral" |
|
|
|
|
|
class AnthropicSystemContent(BaseModel): |
|
|
type: Literal["text"] = "text" |
|
|
text: str |
|
|
cache_control: Optional[AnthropicCacheControl] = None |
|
|
|
|
|
class AnthropicThinkingConfig(BaseModel): |
|
|
type: Literal["enabled", "disabled"] = "enabled" |
|
|
budget_tokens: Optional[int] = Field(default=1024, ge=1, le=128000) |
|
|
|
|
|
class AnthropicMessageRequest(BaseModel): |
|
|
model: str |
|
|
max_tokens: int |
|
|
messages: List[AnthropicMessage] |
|
|
metadata: Optional[AnthropicMetadata] = None |
|
|
stop_sequences: Optional[List[str]] = None |
|
|
stream: Optional[bool] = False |
|
|
system: Optional[Union[str, List[AnthropicSystemContent]]] = None |
|
|
temperature: Optional[float] = Field(default=0.7, ge=0.0, le=1.0) |
|
|
tool_choice: Optional[AnthropicToolChoice] = None |
|
|
tools: Optional[List[AnthropicTool]] = None |
|
|
top_k: Optional[int] = Field(default=None, ge=0) |
|
|
top_p: Optional[float] = Field(default=None, ge=0.0, le=1.0) |
|
|
thinking: Optional[AnthropicThinkingConfig] = None |
|
|
|
|
|
class AnthropicUsage(BaseModel): |
|
|
input_tokens: int |
|
|
output_tokens: int |
|
|
cache_creation_input_tokens: Optional[int] = None |
|
|
cache_read_input_tokens: Optional[int] = None |
|
|
|
|
|
class AnthropicResponseTextBlock(BaseModel): |
|
|
type: Literal["text"] = "text" |
|
|
text: str |
|
|
|
|
|
class AnthropicResponseThinkingBlock(BaseModel): |
|
|
type: Literal["thinking"] = "thinking" |
|
|
thinking: str |
|
|
|
|
|
class AnthropicResponseToolUseBlock(BaseModel): |
|
|
type: Literal["tool_use"] = "tool_use" |
|
|
id: str |
|
|
name: str |
|
|
input: Dict[str, Any] |
|
|
|
|
|
AnthropicResponseContentBlock = Union[AnthropicResponseTextBlock, AnthropicResponseThinkingBlock, AnthropicResponseToolUseBlock] |
|
|
|
|
|
class AnthropicMessageResponse(BaseModel): |
|
|
id: str |
|
|
type: Literal["message"] = "message" |
|
|
role: Literal["assistant"] = "assistant" |
|
|
content: List[AnthropicResponseContentBlock] |
|
|
model: str |
|
|
stop_reason: Optional[Literal["end_turn", "max_tokens", "stop_sequence", "tool_use"]] = None |
|
|
stop_sequence: Optional[str] = None |
|
|
usage: AnthropicUsage |
|
|
|
|
|
class AnthropicTokenCountRequest(BaseModel): |
|
|
model: str |
|
|
messages: List[AnthropicMessage] |
|
|
system: Optional[Union[str, List[AnthropicSystemContent]]] = None |
|
|
tools: Optional[List[AnthropicTool]] = None |
|
|
thinking: Optional[AnthropicThinkingConfig] = None |
|
|
|
|
|
class AnthropicTokenCountResponse(BaseModel): |
|
|
input_tokens: int |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class OpenAIMessage(BaseModel): |
|
|
role: Literal["system", "user", "assistant", "tool"] |
|
|
content: Optional[Union[str, List[Dict[str, Any]]]] = None |
|
|
name: Optional[str] = None |
|
|
tool_calls: Optional[List[Dict[str, Any]]] = None |
|
|
tool_call_id: Optional[str] = None |
|
|
|
|
|
class OpenAITool(BaseModel): |
|
|
type: Literal["function"] = "function" |
|
|
function: Dict[str, Any] |
|
|
|
|
|
class OpenAIToolChoice(BaseModel): |
|
|
type: str |
|
|
function: Optional[Dict[str, str]] = None |
|
|
|
|
|
class OpenAIChatRequest(BaseModel): |
|
|
model: str |
|
|
messages: List[OpenAIMessage] |
|
|
max_tokens: Optional[int] = 1024 |
|
|
temperature: Optional[float] = Field(default=0.7, ge=0.0, le=2.0) |
|
|
top_p: Optional[float] = Field(default=0.95, ge=0.0, le=1.0) |
|
|
n: Optional[int] = 1 |
|
|
stream: Optional[bool] = False |
|
|
stop: Optional[Union[str, List[str]]] = None |
|
|
presence_penalty: Optional[float] = 0.0 |
|
|
frequency_penalty: Optional[float] = 0.0 |
|
|
logit_bias: Optional[Dict[str, float]] = None |
|
|
user: Optional[str] = None |
|
|
tools: Optional[List[OpenAITool]] = None |
|
|
tool_choice: Optional[Union[str, OpenAIToolChoice]] = None |
|
|
seed: Optional[int] = None |
|
|
|
|
|
class OpenAIUsage(BaseModel): |
|
|
prompt_tokens: int |
|
|
completion_tokens: int |
|
|
total_tokens: int |
|
|
|
|
|
class OpenAIChoice(BaseModel): |
|
|
index: int |
|
|
message: Dict[str, Any] |
|
|
finish_reason: Optional[str] = None |
|
|
|
|
|
class OpenAIChatResponse(BaseModel): |
|
|
id: str |
|
|
object: Literal["chat.completion"] = "chat.completion" |
|
|
created: int |
|
|
model: str |
|
|
choices: List[OpenAIChoice] |
|
|
usage: OpenAIUsage |
|
|
system_fingerprint: Optional[str] = None |
|
|
|
|
|
class OpenAIModel(BaseModel): |
|
|
id: str |
|
|
object: Literal["model"] = "model" |
|
|
created: int |
|
|
owned_by: str |
|
|
|
|
|
class OpenAIModelList(BaseModel): |
|
|
object: Literal["list"] = "list" |
|
|
data: List[OpenAIModel] |
|
|
|
|
|
|
|
|
|
|
|
def extract_anthropic_text(content: Union[str, List[AnthropicContentBlock]]) -> str: |
|
|
if isinstance(content, str): |
|
|
return content |
|
|
texts = [] |
|
|
for block in content: |
|
|
if isinstance(block, dict): |
|
|
if block.get("type") == "text": |
|
|
texts.append(block.get("text", "")) |
|
|
elif hasattr(block, "type") and block.type == "text": |
|
|
texts.append(block.text) |
|
|
return " ".join(texts) |
|
|
|
|
|
def extract_anthropic_system(system: Optional[Union[str, List[AnthropicSystemContent]]]) -> Optional[str]: |
|
|
if system is None: |
|
|
return None |
|
|
if isinstance(system, str): |
|
|
return system |
|
|
texts = [] |
|
|
for block in system: |
|
|
if isinstance(block, dict): |
|
|
texts.append(block.get("text", "")) |
|
|
elif hasattr(block, "text"): |
|
|
texts.append(block.text) |
|
|
return " ".join(texts) |
|
|
|
|
|
def check_cache_control(system: Optional[Union[str, List[AnthropicSystemContent]]]) -> bool: |
|
|
"""Check if cache_control is set to ephemeral.""" |
|
|
if system is None or isinstance(system, str): |
|
|
return False |
|
|
for block in system: |
|
|
if isinstance(block, dict) and block.get("cache_control", {}).get("type") == "ephemeral": |
|
|
return True |
|
|
elif hasattr(block, "cache_control") and block.cache_control and block.cache_control.type == "ephemeral": |
|
|
return True |
|
|
return False |
|
|
|
|
|
def extract_openai_content(content: Optional[Union[str, List[Dict[str, Any]]]]) -> str: |
|
|
if content is None: |
|
|
return "" |
|
|
if isinstance(content, str): |
|
|
return content |
|
|
texts = [] |
|
|
for item in content: |
|
|
if isinstance(item, dict) and item.get("type") == "text": |
|
|
texts.append(item.get("text", "")) |
|
|
return " ".join(texts) |
|
|
|
|
|
def format_chat_prompt(messages: List[Dict[str, str]], system: Optional[str] = None) -> str: |
|
|
"""Format messages for Qwen2.5 chat template""" |
|
|
prompt = "" |
|
|
if system: |
|
|
prompt += f"<|im_start|>system\n{system}<|im_end|>\n" |
|
|
|
|
|
for msg in messages: |
|
|
role = msg["role"] |
|
|
content = msg["content"] |
|
|
prompt += f"<|im_start|>{role}\n{content}<|im_end|>\n" |
|
|
|
|
|
prompt += "<|im_start|>assistant\n" |
|
|
return prompt |
|
|
|
|
|
def format_anthropic_messages( |
|
|
messages: List[AnthropicMessage], |
|
|
system: Optional[Union[str, List[AnthropicSystemContent]]] = None, |
|
|
tools: Optional[List[AnthropicTool]] = None, |
|
|
thinking_enabled: bool = False, |
|
|
budget_tokens: int = 1024 |
|
|
) -> str: |
|
|
formatted_messages = [] |
|
|
system_text = extract_anthropic_system(system) or "" |
|
|
|
|
|
|
|
|
if tools: |
|
|
tool_defs = [] |
|
|
for tool in tools: |
|
|
tool_def = { |
|
|
"name": tool.name, |
|
|
"description": tool.description, |
|
|
"parameters": tool.input_schema.model_dump() |
|
|
} |
|
|
tool_defs.append(tool_def) |
|
|
|
|
|
tool_instruction = f"""You have access to the following tools: |
|
|
|
|
|
{json.dumps(tool_defs, indent=2)} |
|
|
|
|
|
To use a tool, respond with a JSON object in this exact format: |
|
|
{{"tool": "tool_name", "arguments": {{"arg1": "value1"}}}} |
|
|
|
|
|
Only use tools when necessary. If you don't need a tool, respond normally.""" |
|
|
system_text = f"{tool_instruction}\n\n{system_text}" if system_text else tool_instruction |
|
|
|
|
|
if thinking_enabled: |
|
|
thinking_instruction = f"""When solving complex problems: |
|
|
1. Think through the problem step by step inside <thinking>...</thinking> tags |
|
|
2. After thinking, provide your final answer outside the thinking tags |
|
|
Budget for thinking: up to {budget_tokens} tokens.""" |
|
|
system_text = f"{thinking_instruction}\n\n{system_text}" if system_text else thinking_instruction |
|
|
|
|
|
for msg in messages: |
|
|
content = extract_anthropic_text(msg.content) |
|
|
formatted_messages.append({"role": msg.role, "content": content}) |
|
|
|
|
|
return format_chat_prompt(formatted_messages, system_text if system_text else None) |
|
|
|
|
|
def format_openai_messages(messages: List[OpenAIMessage]) -> str: |
|
|
system_text = None |
|
|
formatted_messages = [] |
|
|
|
|
|
for msg in messages: |
|
|
if msg.role == "system": |
|
|
system_text = extract_openai_content(msg.content) |
|
|
else: |
|
|
content = extract_openai_content(msg.content) |
|
|
formatted_messages.append({"role": msg.role, "content": content}) |
|
|
|
|
|
return format_chat_prompt(formatted_messages, system_text) |
|
|
|
|
|
def parse_thinking_response(text: str) -> tuple: |
|
|
thinking_pattern = r'<thinking>(.*?)</thinking>' |
|
|
thinking_matches = re.findall(thinking_pattern, text, re.DOTALL) |
|
|
if thinking_matches: |
|
|
thinking_text = "\n".join(thinking_matches).strip() |
|
|
answer_text = re.sub(thinking_pattern, '', text, flags=re.DOTALL).strip() |
|
|
return thinking_text, answer_text |
|
|
return None, text.strip() |
|
|
|
|
|
def parse_tool_use(text: str) -> Optional[Dict[str, Any]]: |
|
|
"""Parse tool use from model response""" |
|
|
try: |
|
|
text_stripped = text.strip() |
|
|
if text_stripped.startswith("{") and text_stripped.endswith("}"): |
|
|
parsed = json.loads(text_stripped) |
|
|
if "tool" in parsed: |
|
|
return parsed |
|
|
|
|
|
brace_count = 0 |
|
|
start_idx = None |
|
|
for i, char in enumerate(text): |
|
|
if char == '{': |
|
|
if brace_count == 0: |
|
|
start_idx = i |
|
|
brace_count += 1 |
|
|
elif char == '}': |
|
|
brace_count -= 1 |
|
|
if brace_count == 0 and start_idx is not None: |
|
|
json_str = text[start_idx:i+1] |
|
|
try: |
|
|
parsed = json.loads(json_str) |
|
|
if "tool" in parsed: |
|
|
return parsed |
|
|
except: |
|
|
pass |
|
|
start_idx = None |
|
|
except: |
|
|
pass |
|
|
return None |
|
|
|
|
|
def generate_id(prefix: str = "msg") -> str: |
|
|
return f"{prefix}_{uuid.uuid4().hex[:24]}" |
|
|
|
|
|
|
|
|
STATIC_DIR = os.path.join(os.path.dirname(__file__), "static") |
|
|
if os.path.exists(STATIC_DIR): |
|
|
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") |
|
|
logger.info(f"Static files mounted from {STATIC_DIR}") |
|
|
|
|
|
|
|
|
|
|
|
@app.get("/", response_class=HTMLResponse) |
|
|
async def root(): |
|
|
"""Serve the dashboard or API status""" |
|
|
static_file = os.path.join(STATIC_DIR, "index.html") |
|
|
if os.path.exists(static_file): |
|
|
return FileResponse(static_file, media_type="text/html") |
|
|
|
|
|
return JSONResponse({ |
|
|
"status": "healthy", |
|
|
"version": "3.0.0", |
|
|
"backend": "llama.cpp", |
|
|
"features": [ |
|
|
"request-queue", |
|
|
"prompt-caching", |
|
|
"multi-model", |
|
|
"extended-thinking", |
|
|
"streaming", |
|
|
"tool-use", |
|
|
"dual-compatibility" |
|
|
], |
|
|
"endpoints": { |
|
|
"openai": "/v1/chat/completions", |
|
|
"anthropic": "/anthropic/v1/messages" |
|
|
}, |
|
|
"models": model_manager.list_models(), |
|
|
"queue": request_queue.get_status(), |
|
|
"cache": prompt_cache.get_stats() |
|
|
}) |
|
|
|
|
|
@app.get("/api/status") |
|
|
async def api_status(): |
|
|
"""API status as JSON (for dashboard AJAX calls)""" |
|
|
return { |
|
|
"status": "healthy", |
|
|
"version": "3.0.0", |
|
|
"backend": "llama.cpp", |
|
|
"features": [ |
|
|
"request-queue", |
|
|
"prompt-caching", |
|
|
"multi-model", |
|
|
"extended-thinking", |
|
|
"streaming", |
|
|
"tool-use", |
|
|
"dual-compatibility" |
|
|
], |
|
|
"endpoints": { |
|
|
"openai": "/v1/chat/completions", |
|
|
"anthropic": "/anthropic/v1/messages" |
|
|
}, |
|
|
"models": model_manager.list_models(), |
|
|
"queue": request_queue.get_status(), |
|
|
"cache": prompt_cache.get_stats() |
|
|
} |
|
|
|
|
|
@app.get("/logs") |
|
|
async def get_logs(lines: int = 100): |
|
|
try: |
|
|
with open(LOG_FILE, 'r') as f: |
|
|
all_lines = f.readlines() |
|
|
recent_lines = all_lines[-lines:] if len(all_lines) > lines else all_lines |
|
|
return {"log_file": LOG_FILE, "total_lines": len(all_lines), "logs": "".join(recent_lines)} |
|
|
except FileNotFoundError: |
|
|
return {"error": "Log file not found"} |
|
|
|
|
|
@app.get("/health") |
|
|
async def health(): |
|
|
return { |
|
|
"status": "ok", |
|
|
"models": model_manager.get_stats(), |
|
|
"queue": request_queue.get_status(), |
|
|
"cache": prompt_cache.get_stats() |
|
|
} |
|
|
|
|
|
@app.get("/queue/status") |
|
|
async def queue_status(): |
|
|
return request_queue.get_status() |
|
|
|
|
|
@app.get("/models/status") |
|
|
async def models_status(): |
|
|
return { |
|
|
"models": model_manager.list_models(), |
|
|
"stats": model_manager.get_stats() |
|
|
} |
|
|
|
|
|
@app.post("/models/{model_id}/load") |
|
|
async def load_model(model_id: str): |
|
|
"""Manually load a model.""" |
|
|
model_manager.load_model(model_id) |
|
|
return {"status": "loaded", "model": model_id} |
|
|
|
|
|
@app.post("/models/{model_id}/unload") |
|
|
async def unload_model(model_id: str): |
|
|
"""Unload a model to free memory.""" |
|
|
model_manager.unload_model(model_id) |
|
|
return {"status": "unloaded", "model": model_id} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/v1/models") |
|
|
async def openai_list_models(): |
|
|
models = [] |
|
|
for model_id, config in MODEL_CONFIGS.items(): |
|
|
models.append(OpenAIModel( |
|
|
id=model_id, |
|
|
created=int(time.time()), |
|
|
owned_by="qwen" |
|
|
)) |
|
|
return OpenAIModelList(data=models) |
|
|
|
|
|
@app.post("/v1/chat/completions") |
|
|
async def openai_chat_completions( |
|
|
request: OpenAIChatRequest, |
|
|
authorization: Optional[str] = Header(None) |
|
|
): |
|
|
chat_id = generate_id("chatcmpl") |
|
|
|
|
|
|
|
|
position = await request_queue.acquire(chat_id) |
|
|
if position > 0: |
|
|
await request_queue.wait_for_turn(chat_id) |
|
|
|
|
|
try: |
|
|
llm = model_manager.get_model(request.model) |
|
|
prompt = format_openai_messages(request.messages) |
|
|
|
|
|
if request.stream: |
|
|
return await openai_stream_response(request, prompt, chat_id, llm) |
|
|
|
|
|
stop_tokens = ["<|im_end|>", "<|endoftext|>"] |
|
|
if request.stop: |
|
|
if isinstance(request.stop, str): |
|
|
stop_tokens.append(request.stop) |
|
|
else: |
|
|
stop_tokens.extend(request.stop) |
|
|
|
|
|
gen_start = time.time() |
|
|
output = llm( |
|
|
prompt, |
|
|
max_tokens=request.max_tokens or 1024, |
|
|
temperature=request.temperature or 0.7, |
|
|
top_p=request.top_p or 0.95, |
|
|
stop=stop_tokens, |
|
|
echo=False |
|
|
) |
|
|
gen_time = time.time() - gen_start |
|
|
|
|
|
generated_text = output["choices"][0]["text"].strip() |
|
|
usage = output["usage"] |
|
|
|
|
|
logger.info(f"[{chat_id}] Generated in {gen_time:.2f}s - tokens: {usage['completion_tokens']}") |
|
|
|
|
|
return OpenAIChatResponse( |
|
|
id=chat_id, |
|
|
created=int(time.time()), |
|
|
model=request.model, |
|
|
choices=[OpenAIChoice( |
|
|
index=0, |
|
|
message={"role": "assistant", "content": generated_text}, |
|
|
finish_reason="stop" |
|
|
)], |
|
|
usage=OpenAIUsage( |
|
|
prompt_tokens=usage["prompt_tokens"], |
|
|
completion_tokens=usage["completion_tokens"], |
|
|
total_tokens=usage["total_tokens"] |
|
|
) |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"[{chat_id}] Error: {e}", exc_info=True) |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
finally: |
|
|
await request_queue.release() |
|
|
|
|
|
async def openai_stream_response(request: OpenAIChatRequest, prompt: str, chat_id: str, llm: Llama): |
|
|
async def generate(): |
|
|
try: |
|
|
created = int(time.time()) |
|
|
|
|
|
initial_chunk = { |
|
|
"id": chat_id, |
|
|
"object": "chat.completion.chunk", |
|
|
"created": created, |
|
|
"model": request.model, |
|
|
"choices": [{"index": 0, "delta": {"role": "assistant", "content": ""}, "finish_reason": None}] |
|
|
} |
|
|
yield f"data: {json.dumps(initial_chunk)}\n\n" |
|
|
|
|
|
stop_tokens = ["<|im_end|>", "<|endoftext|>"] |
|
|
if request.stop: |
|
|
if isinstance(request.stop, str): |
|
|
stop_tokens.append(request.stop) |
|
|
else: |
|
|
stop_tokens.extend(request.stop) |
|
|
|
|
|
for output in llm( |
|
|
prompt, |
|
|
max_tokens=request.max_tokens or 1024, |
|
|
temperature=request.temperature or 0.7, |
|
|
top_p=request.top_p or 0.95, |
|
|
stop=stop_tokens, |
|
|
stream=True, |
|
|
echo=False |
|
|
): |
|
|
text = output["choices"][0]["text"] |
|
|
if text: |
|
|
chunk = { |
|
|
"id": chat_id, |
|
|
"object": "chat.completion.chunk", |
|
|
"created": created, |
|
|
"model": request.model, |
|
|
"choices": [{"index": 0, "delta": {"content": text}, "finish_reason": None}] |
|
|
} |
|
|
yield f"data: {json.dumps(chunk)}\n\n" |
|
|
|
|
|
final_chunk = { |
|
|
"id": chat_id, |
|
|
"object": "chat.completion.chunk", |
|
|
"created": created, |
|
|
"model": request.model, |
|
|
"choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}] |
|
|
} |
|
|
yield f"data: {json.dumps(final_chunk)}\n\n" |
|
|
yield "data: [DONE]\n\n" |
|
|
finally: |
|
|
await request_queue.release() |
|
|
|
|
|
return StreamingResponse(generate(), media_type="text/event-stream", headers={"Cache-Control": "no-cache"}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/anthropic/v1/models") |
|
|
async def anthropic_list_models(): |
|
|
models = [] |
|
|
for model_id, config in MODEL_CONFIGS.items(): |
|
|
models.append({ |
|
|
"id": model_id, |
|
|
"object": "model", |
|
|
"created": int(time.time()), |
|
|
"owned_by": "qwen", |
|
|
"display_name": f"Qwen2.5 Coder {config['size']} ({config['quantization']})", |
|
|
"supports_thinking": True, |
|
|
"supports_tools": True, |
|
|
"loaded": model_id in model_manager.models, |
|
|
"available": os.path.exists(config["path"]) |
|
|
}) |
|
|
return {"object": "list", "data": models} |
|
|
|
|
|
@app.post("/anthropic/v1/messages", response_model=AnthropicMessageResponse) |
|
|
async def anthropic_create_message( |
|
|
request: AnthropicMessageRequest, |
|
|
x_api_key: Optional[str] = Header(None, alias="x-api-key"), |
|
|
anthropic_version: Optional[str] = Header(None, alias="anthropic-version"), |
|
|
anthropic_beta: Optional[str] = Header(None, alias="anthropic-beta") |
|
|
): |
|
|
message_id = generate_id("msg") |
|
|
|
|
|
|
|
|
position = await request_queue.acquire(message_id) |
|
|
if position > 0: |
|
|
await request_queue.wait_for_turn(message_id) |
|
|
|
|
|
thinking_enabled = False |
|
|
budget_tokens = 1024 |
|
|
if request.thinking: |
|
|
thinking_enabled = request.thinking.type == "enabled" |
|
|
budget_tokens = request.thinking.budget_tokens or 1024 |
|
|
|
|
|
|
|
|
use_cache = check_cache_control(request.system) |
|
|
cache_hit = False |
|
|
cache_tokens = 0 |
|
|
|
|
|
try: |
|
|
llm = model_manager.get_model(request.model) |
|
|
|
|
|
|
|
|
system_text = extract_anthropic_system(request.system) |
|
|
tools_list = [t.model_dump() for t in request.tools] if request.tools else None |
|
|
|
|
|
if use_cache: |
|
|
cached = prompt_cache.get(system_text or "", tools_list) |
|
|
if cached: |
|
|
cache_hit = True |
|
|
cache_tokens = cached.get("tokens", 0) |
|
|
logger.info(f"[{message_id}] Prompt cache hit, saved ~{cache_tokens} tokens") |
|
|
|
|
|
prompt = format_anthropic_messages( |
|
|
request.messages, |
|
|
request.system, |
|
|
request.tools, |
|
|
thinking_enabled, |
|
|
budget_tokens |
|
|
) |
|
|
|
|
|
|
|
|
if use_cache and not cache_hit: |
|
|
prompt_cache.set(system_text or "", tools_list, { |
|
|
"tokens": len(llm.tokenize(prompt.encode())) // 2, |
|
|
"created": time.time() |
|
|
}) |
|
|
|
|
|
if request.stream: |
|
|
return await anthropic_stream_response(request, prompt, message_id, thinking_enabled, llm) |
|
|
|
|
|
total_max_tokens = request.max_tokens + (budget_tokens if thinking_enabled else 0) |
|
|
|
|
|
stop_tokens = ["<|im_end|>", "<|endoftext|>"] |
|
|
if request.stop_sequences: |
|
|
stop_tokens.extend(request.stop_sequences) |
|
|
|
|
|
gen_start = time.time() |
|
|
output = llm( |
|
|
prompt, |
|
|
max_tokens=total_max_tokens, |
|
|
temperature=request.temperature or 0.7, |
|
|
top_p=request.top_p or 0.95, |
|
|
top_k=request.top_k or 40, |
|
|
stop=stop_tokens, |
|
|
echo=False |
|
|
) |
|
|
gen_time = time.time() - gen_start |
|
|
|
|
|
generated_text = output["choices"][0]["text"].strip() |
|
|
usage = output["usage"] |
|
|
|
|
|
|
|
|
content_blocks = [] |
|
|
stop_reason = "end_turn" |
|
|
|
|
|
|
|
|
tool_call = parse_tool_use(generated_text) |
|
|
if tool_call and request.tools: |
|
|
tool_id = f"toolu_{uuid.uuid4().hex[:24]}" |
|
|
content_blocks.append(AnthropicResponseToolUseBlock( |
|
|
type="tool_use", |
|
|
id=tool_id, |
|
|
name=tool_call["tool"], |
|
|
input=tool_call.get("arguments", {}) |
|
|
)) |
|
|
stop_reason = "tool_use" |
|
|
elif thinking_enabled: |
|
|
thinking_text, answer_text = parse_thinking_response(generated_text) |
|
|
if thinking_text: |
|
|
content_blocks.append(AnthropicResponseThinkingBlock(type="thinking", thinking=thinking_text)) |
|
|
content_blocks.append(AnthropicResponseTextBlock(type="text", text=answer_text)) |
|
|
else: |
|
|
content_blocks.append(AnthropicResponseTextBlock(type="text", text=generated_text)) |
|
|
|
|
|
if usage["completion_tokens"] >= total_max_tokens: |
|
|
stop_reason = "max_tokens" |
|
|
|
|
|
logger.info(f"[{message_id}] Generated in {gen_time:.2f}s - tokens: {usage['completion_tokens']}, cache_hit: {cache_hit}") |
|
|
|
|
|
return AnthropicMessageResponse( |
|
|
id=message_id, |
|
|
content=content_blocks, |
|
|
model=request.model, |
|
|
stop_reason=stop_reason, |
|
|
usage=AnthropicUsage( |
|
|
input_tokens=usage["prompt_tokens"], |
|
|
output_tokens=usage["completion_tokens"], |
|
|
cache_creation_input_tokens=cache_tokens if use_cache and not cache_hit else None, |
|
|
cache_read_input_tokens=cache_tokens if cache_hit else None |
|
|
) |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"[{message_id}] Error: {e}", exc_info=True) |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
finally: |
|
|
await request_queue.release() |
|
|
|
|
|
async def anthropic_stream_response(request: AnthropicMessageRequest, prompt: str, message_id: str, thinking_enabled: bool, llm: Llama): |
|
|
async def generate(): |
|
|
try: |
|
|
start_event = { |
|
|
"type": "message_start", |
|
|
"message": { |
|
|
"id": message_id, "type": "message", "role": "assistant", "content": [], |
|
|
"model": request.model, "stop_reason": None, "stop_sequence": None, |
|
|
"usage": {"input_tokens": 0, "output_tokens": 0} |
|
|
} |
|
|
} |
|
|
yield f"event: message_start\ndata: {json.dumps(start_event)}\n\n" |
|
|
|
|
|
yield f"event: content_block_start\ndata: {json.dumps({'type': 'content_block_start', 'index': 0, 'content_block': {'type': 'text', 'text': ''}})}\n\n" |
|
|
|
|
|
stop_tokens = ["<|im_end|>", "<|endoftext|>"] |
|
|
if request.stop_sequences: |
|
|
stop_tokens.extend(request.stop_sequences) |
|
|
|
|
|
total_tokens = 0 |
|
|
for output in llm( |
|
|
prompt, |
|
|
max_tokens=request.max_tokens, |
|
|
temperature=request.temperature or 0.7, |
|
|
top_p=request.top_p or 0.95, |
|
|
stop=stop_tokens, |
|
|
stream=True, |
|
|
echo=False |
|
|
): |
|
|
text = output["choices"][0]["text"] |
|
|
if text: |
|
|
total_tokens += 1 |
|
|
yield f"event: content_block_delta\ndata: {json.dumps({'type': 'content_block_delta', 'index': 0, 'delta': {'type': 'text_delta', 'text': text}})}\n\n" |
|
|
|
|
|
yield f"event: content_block_stop\ndata: {json.dumps({'type': 'content_block_stop', 'index': 0})}\n\n" |
|
|
yield f"event: message_delta\ndata: {json.dumps({'type': 'message_delta', 'delta': {'stop_reason': 'end_turn'}, 'usage': {'output_tokens': total_tokens}})}\n\n" |
|
|
yield f"event: message_stop\ndata: {json.dumps({'type': 'message_stop'})}\n\n" |
|
|
finally: |
|
|
await request_queue.release() |
|
|
|
|
|
return StreamingResponse(generate(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) |
|
|
|
|
|
@app.post("/anthropic/v1/messages/count_tokens", response_model=AnthropicTokenCountResponse) |
|
|
async def anthropic_count_tokens(request: AnthropicTokenCountRequest): |
|
|
llm = model_manager.get_model(request.model) |
|
|
prompt = format_anthropic_messages(request.messages, request.system) |
|
|
tokens = llm.tokenize(prompt.encode()) |
|
|
return AnthropicTokenCountResponse(input_tokens=len(tokens)) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
import uvicorn |
|
|
uvicorn.run(app, host="0.0.0.0", port=7860, log_config=None) |
|
|
|