""" Dual-Compatible API Endpoint (OpenAI + Anthropic) llama.cpp powered - Qwen2.5-Coder-7B-Instruct Q4_K_M - OpenAI format: /v1/chat/completions - Anthropic format: /anthropic/v1/messages """ import os import time import uuid import logging import re import json from datetime import datetime from logging.handlers import RotatingFileHandler from typing import List, Optional, Union, Dict, Any, Literal from contextlib import asynccontextmanager from threading import Thread from fastapi import FastAPI, HTTPException, Header, Request from fastapi.responses import StreamingResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field from llama_cpp import Llama # ============== Logging Configuration ============== LOG_DIR = "/tmp/logs" os.makedirs(LOG_DIR, exist_ok=True) LOG_FILE = os.path.join(LOG_DIR, "api.log") log_format = logging.Formatter( '%(asctime)s | %(levelname)-8s | %(name)s | %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) file_handler = RotatingFileHandler( LOG_FILE, maxBytes=10*1024*1024, backupCount=5, encoding='utf-8' ) file_handler.setFormatter(log_format) file_handler.setLevel(logging.DEBUG) console_handler = logging.StreamHandler() console_handler.setFormatter(log_format) console_handler.setLevel(logging.INFO) logging.basicConfig(level=logging.DEBUG, handlers=[file_handler, console_handler]) logger = logging.getLogger("llama-api") for uvicorn_logger in ["uvicorn", "uvicorn.error", "uvicorn.access"]: uv_log = logging.getLogger(uvicorn_logger) uv_log.handlers = [file_handler, console_handler] logger.info("=" * 60) logger.info(f"llama.cpp API (OpenAI + Anthropic) Startup at {datetime.now().isoformat()}") logger.info(f"Log file: {LOG_FILE}") logger.info("=" * 60) # ============== Configuration ============== MODEL_PATH = "/app/models/qwen2.5-coder-7b-instruct-q4_k_m.gguf" N_CTX = 8192 # Context window N_THREADS = 2 # CPU threads N_BATCH = 128 # Batch size llm = None @asynccontextmanager async def lifespan(app: FastAPI): global llm logger.info(f"Loading model: {MODEL_PATH}") try: llm = Llama( model_path=MODEL_PATH, n_ctx=N_CTX, n_threads=N_THREADS, n_batch=N_BATCH, verbose=True ) logger.info("Model loaded successfully!") except Exception as e: logger.error(f"Failed to load model: {e}", exc_info=True) raise yield logger.info("Shutting down...") del llm app = FastAPI( title="Dual-Compatible API (OpenAI + Anthropic)", description="llama.cpp powered API with dual SDK compatibility", version="2.0.0", lifespan=lifespan ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.middleware("http") async def log_requests(request: Request, call_next): request_id = str(uuid.uuid4())[:8] start_time = time.time() logger.info(f"[{request_id}] {request.method} {request.url.path} - Started") try: response = await call_next(request) duration = (time.time() - start_time) * 1000 logger.info(f"[{request_id}] {request.method} {request.url.path} - {response.status_code} ({duration:.2f}ms)") return response except Exception as e: duration = (time.time() - start_time) * 1000 logger.error(f"[{request_id}] {request.method} {request.url.path} - Error: {e} ({duration:.2f}ms)") raise # ============================================================ # ANTHROPIC-COMPATIBLE MODELS # ============================================================ class AnthropicTextBlock(BaseModel): type: Literal["text"] = "text" text: str class AnthropicImageSource(BaseModel): type: Literal["base64", "url"] = "base64" media_type: Optional[str] = None data: Optional[str] = None url: Optional[str] = None class AnthropicImageBlock(BaseModel): type: Literal["image"] = "image" source: AnthropicImageSource class AnthropicToolUseBlock(BaseModel): type: Literal["tool_use"] = "tool_use" id: str name: str input: Dict[str, Any] class AnthropicToolResultBlock(BaseModel): type: Literal["tool_result"] = "tool_result" tool_use_id: str content: Optional[Union[str, List[AnthropicTextBlock]]] = None is_error: Optional[bool] = False AnthropicContentBlock = Union[AnthropicTextBlock, AnthropicImageBlock, AnthropicToolUseBlock, AnthropicToolResultBlock] class AnthropicMessage(BaseModel): role: Literal["user", "assistant"] content: Union[str, List[AnthropicContentBlock]] class AnthropicToolInputSchema(BaseModel): type: Literal["object"] = "object" properties: Optional[Dict[str, Any]] = None required: Optional[List[str]] = None class AnthropicTool(BaseModel): name: str description: Optional[str] = None input_schema: AnthropicToolInputSchema class AnthropicToolChoiceAuto(BaseModel): type: Literal["auto"] = "auto" disable_parallel_tool_use: Optional[bool] = None class AnthropicToolChoiceAny(BaseModel): type: Literal["any"] = "any" disable_parallel_tool_use: Optional[bool] = None class AnthropicToolChoiceTool(BaseModel): type: Literal["tool"] = "tool" name: str disable_parallel_tool_use: Optional[bool] = None AnthropicToolChoice = Union[AnthropicToolChoiceAuto, AnthropicToolChoiceAny, AnthropicToolChoiceTool] class AnthropicMetadata(BaseModel): user_id: Optional[str] = None class AnthropicSystemContent(BaseModel): type: Literal["text"] = "text" text: str cache_control: Optional[Dict[str, str]] = None class AnthropicThinkingConfig(BaseModel): type: Literal["enabled", "disabled"] = "enabled" budget_tokens: Optional[int] = Field(default=1024, ge=1, le=128000) class AnthropicMessageRequest(BaseModel): model: str max_tokens: int messages: List[AnthropicMessage] metadata: Optional[AnthropicMetadata] = None stop_sequences: Optional[List[str]] = None stream: Optional[bool] = False system: Optional[Union[str, List[AnthropicSystemContent]]] = None temperature: Optional[float] = Field(default=0.7, ge=0.0, le=1.0) tool_choice: Optional[AnthropicToolChoice] = None tools: Optional[List[AnthropicTool]] = None top_k: Optional[int] = Field(default=None, ge=0) top_p: Optional[float] = Field(default=None, ge=0.0, le=1.0) thinking: Optional[AnthropicThinkingConfig] = None class AnthropicUsage(BaseModel): input_tokens: int output_tokens: int cache_creation_input_tokens: Optional[int] = None cache_read_input_tokens: Optional[int] = None class AnthropicResponseTextBlock(BaseModel): type: Literal["text"] = "text" text: str class AnthropicResponseThinkingBlock(BaseModel): type: Literal["thinking"] = "thinking" thinking: str class AnthropicResponseToolUseBlock(BaseModel): type: Literal["tool_use"] = "tool_use" id: str name: str input: Dict[str, Any] AnthropicResponseContentBlock = Union[AnthropicResponseTextBlock, AnthropicResponseThinkingBlock, AnthropicResponseToolUseBlock] class AnthropicMessageResponse(BaseModel): id: str type: Literal["message"] = "message" role: Literal["assistant"] = "assistant" content: List[AnthropicResponseContentBlock] model: str stop_reason: Optional[Literal["end_turn", "max_tokens", "stop_sequence", "tool_use"]] = None stop_sequence: Optional[str] = None usage: AnthropicUsage class AnthropicTokenCountRequest(BaseModel): model: str messages: List[AnthropicMessage] system: Optional[Union[str, List[AnthropicSystemContent]]] = None tools: Optional[List[AnthropicTool]] = None thinking: Optional[AnthropicThinkingConfig] = None class AnthropicTokenCountResponse(BaseModel): input_tokens: int # ============================================================ # OPENAI-COMPATIBLE MODELS # ============================================================ class OpenAIMessage(BaseModel): role: Literal["system", "user", "assistant", "tool"] content: Optional[Union[str, List[Dict[str, Any]]]] = None name: Optional[str] = None tool_calls: Optional[List[Dict[str, Any]]] = None tool_call_id: Optional[str] = None class OpenAITool(BaseModel): type: Literal["function"] = "function" function: Dict[str, Any] class OpenAIToolChoice(BaseModel): type: str function: Optional[Dict[str, str]] = None class OpenAIChatRequest(BaseModel): model: str messages: List[OpenAIMessage] max_tokens: Optional[int] = 1024 temperature: Optional[float] = Field(default=0.7, ge=0.0, le=2.0) top_p: Optional[float] = Field(default=0.95, ge=0.0, le=1.0) n: Optional[int] = 1 stream: Optional[bool] = False stop: Optional[Union[str, List[str]]] = None presence_penalty: Optional[float] = 0.0 frequency_penalty: Optional[float] = 0.0 logit_bias: Optional[Dict[str, float]] = None user: Optional[str] = None tools: Optional[List[OpenAITool]] = None tool_choice: Optional[Union[str, OpenAIToolChoice]] = None seed: Optional[int] = None class OpenAIUsage(BaseModel): prompt_tokens: int completion_tokens: int total_tokens: int class OpenAIChoice(BaseModel): index: int message: Dict[str, Any] finish_reason: Optional[str] = None class OpenAIChatResponse(BaseModel): id: str object: Literal["chat.completion"] = "chat.completion" created: int model: str choices: List[OpenAIChoice] usage: OpenAIUsage system_fingerprint: Optional[str] = None class OpenAIModel(BaseModel): id: str object: Literal["model"] = "model" created: int owned_by: str class OpenAIModelList(BaseModel): object: Literal["list"] = "list" data: List[OpenAIModel] # ============== Helper Functions ============== def extract_anthropic_text(content: Union[str, List[AnthropicContentBlock]]) -> str: if isinstance(content, str): return content texts = [] for block in content: if isinstance(block, dict): if block.get("type") == "text": texts.append(block.get("text", "")) elif hasattr(block, "type") and block.type == "text": texts.append(block.text) return " ".join(texts) def extract_anthropic_system(system: Optional[Union[str, List[AnthropicSystemContent]]]) -> Optional[str]: if system is None: return None if isinstance(system, str): return system texts = [] for block in system: if isinstance(block, dict): texts.append(block.get("text", "")) elif hasattr(block, "text"): texts.append(block.text) return " ".join(texts) def extract_openai_content(content: Optional[Union[str, List[Dict[str, Any]]]]) -> str: if content is None: return "" if isinstance(content, str): return content texts = [] for item in content: if isinstance(item, dict) and item.get("type") == "text": texts.append(item.get("text", "")) return " ".join(texts) def format_chat_prompt(messages: List[Dict[str, str]], system: Optional[str] = None) -> str: """Format messages for Qwen2.5 chat template""" prompt = "" if system: prompt += f"<|im_start|>system\n{system}<|im_end|>\n" for msg in messages: role = msg["role"] content = msg["content"] prompt += f"<|im_start|>{role}\n{content}<|im_end|>\n" prompt += "<|im_start|>assistant\n" return prompt def format_anthropic_messages( messages: List[AnthropicMessage], system: Optional[Union[str, List[AnthropicSystemContent]]] = None, tools: Optional[List[AnthropicTool]] = None, thinking_enabled: bool = False, budget_tokens: int = 1024 ) -> str: formatted_messages = [] system_text = extract_anthropic_system(system) or "" # Add tool definitions to system prompt if provided if tools: tool_defs = [] for tool in tools: tool_def = { "name": tool.name, "description": tool.description, "parameters": tool.input_schema.model_dump() } tool_defs.append(tool_def) tool_instruction = f"""You have access to the following tools: {json.dumps(tool_defs, indent=2)} To use a tool, respond with a JSON object in this exact format: {{"tool": "tool_name", "arguments": {{"arg1": "value1"}}}} Only use tools when necessary. If you don't need a tool, respond normally.""" system_text = f"{tool_instruction}\n\n{system_text}" if system_text else tool_instruction if thinking_enabled: thinking_instruction = f"""When solving complex problems: 1. Think through the problem step by step inside ... tags 2. After thinking, provide your final answer outside the thinking tags Budget for thinking: up to {budget_tokens} tokens.""" system_text = f"{thinking_instruction}\n\n{system_text}" if system_text else thinking_instruction for msg in messages: content = extract_anthropic_text(msg.content) formatted_messages.append({"role": msg.role, "content": content}) return format_chat_prompt(formatted_messages, system_text if system_text else None) def format_openai_messages(messages: List[OpenAIMessage]) -> str: system_text = None formatted_messages = [] for msg in messages: if msg.role == "system": system_text = extract_openai_content(msg.content) else: content = extract_openai_content(msg.content) formatted_messages.append({"role": msg.role, "content": content}) return format_chat_prompt(formatted_messages, system_text) def parse_thinking_response(text: str) -> tuple: thinking_pattern = r'(.*?)' thinking_matches = re.findall(thinking_pattern, text, re.DOTALL) if thinking_matches: thinking_text = "\n".join(thinking_matches).strip() answer_text = re.sub(thinking_pattern, '', text, flags=re.DOTALL).strip() return thinking_text, answer_text return None, text.strip() def parse_tool_use(text: str) -> Optional[Dict[str, Any]]: """Parse tool use from model response""" try: # Look for JSON tool call pattern json_pattern = r'\{[^{}]*"tool"[^{}]*\}' matches = re.findall(json_pattern, text, re.DOTALL) if matches: for match in matches: parsed = json.loads(match) if "tool" in parsed: return parsed except: pass return None def generate_id(prefix: str = "msg") -> str: return f"{prefix}_{uuid.uuid4().hex[:24]}" # ============== ROOT ENDPOINTS ============== @app.get("/") async def root(): return { "status": "healthy", "model": "qwen2.5-coder-7b-instruct-q4_k_m", "backend": "llama.cpp", "endpoints": { "openai": "/v1/chat/completions", "anthropic": "/anthropic/v1/messages" }, "features": ["extended-thinking", "streaming", "tool-use", "dual-compatibility"], "context_length": N_CTX } @app.get("/logs") async def get_logs(lines: int = 100): try: with open(LOG_FILE, 'r') as f: all_lines = f.readlines() recent_lines = all_lines[-lines:] if len(all_lines) > lines else all_lines return {"log_file": LOG_FILE, "total_lines": len(all_lines), "logs": "".join(recent_lines)} except FileNotFoundError: return {"error": "Log file not found"} @app.get("/health") async def health(): return {"status": "ok", "model_loaded": llm is not None, "backend": "llama.cpp"} # ============================================================ # OPENAI-COMPATIBLE ENDPOINTS (/v1) # ============================================================ @app.get("/v1/models") async def openai_list_models(): return OpenAIModelList( data=[OpenAIModel(id="qwen2.5-coder-7b", created=int(time.time()), owned_by="qwen")] ) @app.post("/v1/chat/completions") async def openai_chat_completions( request: OpenAIChatRequest, authorization: Optional[str] = Header(None) ): chat_id = generate_id("chatcmpl") logger.info(f"[{chat_id}] OpenAI chat - model: {request.model}, max_tokens: {request.max_tokens}") try: prompt = format_openai_messages(request.messages) if request.stream: return await openai_stream_response(request, prompt, chat_id) stop_tokens = ["<|im_end|>", "<|endoftext|>"] if request.stop: if isinstance(request.stop, str): stop_tokens.append(request.stop) else: stop_tokens.extend(request.stop) gen_start = time.time() output = llm( prompt, max_tokens=request.max_tokens or 1024, temperature=request.temperature or 0.7, top_p=request.top_p or 0.95, stop=stop_tokens, echo=False ) gen_time = time.time() - gen_start generated_text = output["choices"][0]["text"].strip() usage = output["usage"] logger.info(f"[{chat_id}] Generated in {gen_time:.2f}s - tokens: {usage['completion_tokens']}") return OpenAIChatResponse( id=chat_id, created=int(time.time()), model=request.model, choices=[OpenAIChoice( index=0, message={"role": "assistant", "content": generated_text}, finish_reason="stop" )], usage=OpenAIUsage( prompt_tokens=usage["prompt_tokens"], completion_tokens=usage["completion_tokens"], total_tokens=usage["total_tokens"] ) ) except Exception as e: logger.error(f"[{chat_id}] Error: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) async def openai_stream_response(request: OpenAIChatRequest, prompt: str, chat_id: str): async def generate(): created = int(time.time()) initial_chunk = { "id": chat_id, "object": "chat.completion.chunk", "created": created, "model": request.model, "choices": [{"index": 0, "delta": {"role": "assistant", "content": ""}, "finish_reason": None}] } yield f"data: {json.dumps(initial_chunk)}\n\n" stop_tokens = ["<|im_end|>", "<|endoftext|>"] if request.stop: if isinstance(request.stop, str): stop_tokens.append(request.stop) else: stop_tokens.extend(request.stop) for output in llm( prompt, max_tokens=request.max_tokens or 1024, temperature=request.temperature or 0.7, top_p=request.top_p or 0.95, stop=stop_tokens, stream=True, echo=False ): text = output["choices"][0]["text"] if text: chunk = { "id": chat_id, "object": "chat.completion.chunk", "created": created, "model": request.model, "choices": [{"index": 0, "delta": {"content": text}, "finish_reason": None}] } yield f"data: {json.dumps(chunk)}\n\n" final_chunk = { "id": chat_id, "object": "chat.completion.chunk", "created": created, "model": request.model, "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}] } yield f"data: {json.dumps(final_chunk)}\n\n" yield "data: [DONE]\n\n" return StreamingResponse(generate(), media_type="text/event-stream", headers={"Cache-Control": "no-cache"}) # ============================================================ # ANTHROPIC-COMPATIBLE ENDPOINTS (/anthropic) # ============================================================ @app.get("/anthropic/v1/models") async def anthropic_list_models(): return { "object": "list", "data": [{ "id": "qwen2.5-coder-7b", "object": "model", "created": int(time.time()), "owned_by": "qwen", "display_name": "Qwen2.5 Coder 7B Instruct (Q4_K_M)", "supports_thinking": True, "supports_tools": True }] } @app.post("/anthropic/v1/messages", response_model=AnthropicMessageResponse) async def anthropic_create_message( request: AnthropicMessageRequest, x_api_key: Optional[str] = Header(None, alias="x-api-key"), anthropic_version: Optional[str] = Header(None, alias="anthropic-version"), anthropic_beta: Optional[str] = Header(None, alias="anthropic-beta") ): message_id = generate_id("msg") thinking_enabled = False budget_tokens = 1024 if request.thinking: thinking_enabled = request.thinking.type == "enabled" budget_tokens = request.thinking.budget_tokens or 1024 logger.info(f"[{message_id}] Anthropic msg - model: {request.model}, max_tokens: {request.max_tokens}, thinking: {thinking_enabled}, tools: {len(request.tools) if request.tools else 0}") try: prompt = format_anthropic_messages( request.messages, request.system, request.tools, thinking_enabled, budget_tokens ) if request.stream: return await anthropic_stream_response(request, prompt, message_id, thinking_enabled) total_max_tokens = request.max_tokens + (budget_tokens if thinking_enabled else 0) stop_tokens = ["<|im_end|>", "<|endoftext|>"] if request.stop_sequences: stop_tokens.extend(request.stop_sequences) gen_start = time.time() output = llm( prompt, max_tokens=total_max_tokens, temperature=request.temperature or 0.7, top_p=request.top_p or 0.95, top_k=request.top_k or 40, stop=stop_tokens, echo=False ) gen_time = time.time() - gen_start generated_text = output["choices"][0]["text"].strip() usage = output["usage"] # Parse response for tool use, thinking, etc. content_blocks = [] stop_reason = "end_turn" # Check for tool use tool_call = parse_tool_use(generated_text) if tool_call and request.tools: tool_id = f"toolu_{uuid.uuid4().hex[:24]}" content_blocks.append(AnthropicResponseToolUseBlock( type="tool_use", id=tool_id, name=tool_call["tool"], input=tool_call.get("arguments", {}) )) stop_reason = "tool_use" elif thinking_enabled: thinking_text, answer_text = parse_thinking_response(generated_text) if thinking_text: content_blocks.append(AnthropicResponseThinkingBlock(type="thinking", thinking=thinking_text)) content_blocks.append(AnthropicResponseTextBlock(type="text", text=answer_text)) else: content_blocks.append(AnthropicResponseTextBlock(type="text", text=generated_text)) if usage["completion_tokens"] >= total_max_tokens: stop_reason = "max_tokens" logger.info(f"[{message_id}] Generated in {gen_time:.2f}s - tokens: {usage['completion_tokens']}") return AnthropicMessageResponse( id=message_id, content=content_blocks, model=request.model, stop_reason=stop_reason, usage=AnthropicUsage( input_tokens=usage["prompt_tokens"], output_tokens=usage["completion_tokens"] ) ) except Exception as e: logger.error(f"[{message_id}] Error: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) async def anthropic_stream_response(request: AnthropicMessageRequest, prompt: str, message_id: str, thinking_enabled: bool): async def generate(): start_event = { "type": "message_start", "message": { "id": message_id, "type": "message", "role": "assistant", "content": [], "model": request.model, "stop_reason": None, "stop_sequence": None, "usage": {"input_tokens": 0, "output_tokens": 0} } } yield f"event: message_start\ndata: {json.dumps(start_event)}\n\n" # Start text block yield f"event: content_block_start\ndata: {json.dumps({'type': 'content_block_start', 'index': 0, 'content_block': {'type': 'text', 'text': ''}})}\n\n" stop_tokens = ["<|im_end|>", "<|endoftext|>"] if request.stop_sequences: stop_tokens.extend(request.stop_sequences) total_tokens = 0 for output in llm( prompt, max_tokens=request.max_tokens, temperature=request.temperature or 0.7, top_p=request.top_p or 0.95, stop=stop_tokens, stream=True, echo=False ): text = output["choices"][0]["text"] if text: total_tokens += 1 yield f"event: content_block_delta\ndata: {json.dumps({'type': 'content_block_delta', 'index': 0, 'delta': {'type': 'text_delta', 'text': text}})}\n\n" yield f"event: content_block_stop\ndata: {json.dumps({'type': 'content_block_stop', 'index': 0})}\n\n" yield f"event: message_delta\ndata: {json.dumps({'type': 'message_delta', 'delta': {'stop_reason': 'end_turn'}, 'usage': {'output_tokens': total_tokens}})}\n\n" yield f"event: message_stop\ndata: {json.dumps({'type': 'message_stop'})}\n\n" return StreamingResponse(generate(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) @app.post("/anthropic/v1/messages/count_tokens", response_model=AnthropicTokenCountResponse) async def anthropic_count_tokens(request: AnthropicTokenCountRequest): prompt = format_anthropic_messages(request.messages, request.system) tokens = llm.tokenize(prompt.encode()) return AnthropicTokenCountResponse(input_tokens=len(tokens)) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860, log_config=None)