""" OpenAI-Compatible API Server for Qwen3-0.6B-GGUF Supports: streaming, tool calling, thinking modes (true/false/auto) """ import os import sys import json import time import uuid import copy import re from typing import Optional, List, Dict, Any, Union, AsyncGenerator import uvicorn from fastapi import FastAPI, Request, HTTPException from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from pydantic import BaseModel, Field from sse_starlette.sse import EventSourceResponse from llama_cpp import Llama # ─── Configuration ─────────────────────────────────────────────────────────── MODEL_PATH = os.environ.get("MODEL_PATH", "/app/models/Qwen3-0.6B-Q4_K_M.gguf") CONTEXT_SIZE = 16384 MAX_OUTPUT_TOKENS = 8192 HOST = "0.0.0.0" PORT = 7860 MODEL_NAME = "qwen3-0.6b" # ─── Initialize FastAPI ────────────────────────────────────────────────────── app = FastAPI( title="Qwen3-0.6B OpenAI-Compatible API", version="1.0.0", description="OpenAI-compatible server with thinking modes and tool calling" ) templates = Jinja2Templates(directory="templates") # ─── Load Model ────────────────────────────────────────────────────────────── print(f"Loading model from {MODEL_PATH}...") print(f"Context size: {CONTEXT_SIZE}, Max output: {MAX_OUTPUT_TOKENS}") llm = Llama( model_path=MODEL_PATH, n_ctx=CONTEXT_SIZE, n_threads=4, n_gpu_layers=0, verbose=True, chat_format="chatml", ) print("Model loaded successfully!") # ─── Pydantic Models ───────────────────────────────────────────────────────── class FunctionDefinition(BaseModel): name: str description: Optional[str] = None parameters: Optional[Dict[str, Any]] = None class ToolDefinition(BaseModel): type: str = "function" function: FunctionDefinition class ToolCallFunction(BaseModel): name: str arguments: str class ToolCall(BaseModel): id: str type: str = "function" function: ToolCallFunction class ChatMessage(BaseModel): role: str content: Optional[str] = None name: Optional[str] = None tool_calls: Optional[List[ToolCall]] = None tool_call_id: Optional[str] = None class ChatCompletionRequest(BaseModel): model: Optional[str] = MODEL_NAME messages: List[ChatMessage] temperature: Optional[float] = 0.7 top_p: Optional[float] = 0.9 max_tokens: Optional[int] = MAX_OUTPUT_TOKENS max_completion_tokens: Optional[int] = None stream: Optional[bool] = False stop: Optional[Union[str, List[str]]] = None presence_penalty: Optional[float] = 0.0 frequency_penalty: Optional[float] = 0.0 tools: Optional[List[ToolDefinition]] = None tool_choice: Optional[Union[str, Dict]] = None extra_body: Optional[Dict[str, Any]] = None # Thinking mode: true, false, "auto" enable_thinking: Optional[Any] = None # ─── Helper Functions ───────────────────────────────────────────────────────── def generate_id(): return f"chatcmpl-{uuid.uuid4().hex[:24]}" def get_timestamp(): return int(time.time()) def build_system_prompt_for_tools(tools: List[ToolDefinition]) -> str: """Build a system prompt that instructs the model about available tools.""" tool_descriptions = [] for tool in tools: func = tool.function tool_info = { "type": "function", "function": { "name": func.name, "description": func.description or "", "parameters": func.parameters or {} } } tool_descriptions.append(tool_info) tools_json = json.dumps(tool_descriptions, indent=2) tool_system = f"""You are a helpful assistant with access to the following tools. Use them when needed. # Tools You have access to the following tools: {tools_json} # Tool Call Format When you need to call a tool, respond with the following format: {{"name": "function_name", "arguments": {{"param1": "value1", "param2": "value2"}}}} You can call multiple tools. Each tool call should be in its own block. If you don't need to use any tool, just respond normally.""" return tool_system def determine_thinking_mode(enable_thinking, messages, tools) -> str: """ Determine the thinking mode. Returns: 'enabled', 'disabled' """ if enable_thinking is True or enable_thinking == "true": return "enabled" elif enable_thinking is False or enable_thinking == "false": return "disabled" elif enable_thinking == "auto" or enable_thinking is None: # Auto mode: decide based on task complexity if tools and len(tools) > 0: return "enabled" last_user_msg = "" for msg in reversed(messages): if msg.role == "user" and msg.content: last_user_msg = msg.content.lower() break complexity_indicators = [ "explain", "analyze", "compare", "why", "how does", "step by step", "reason", "think", "calculate", "solve", "debug", "implement", "design", "architect", "evaluate", "assess", "critique", "prove", "derive", "what are the implications", "trade-off", "pros and cons", "complex", "difficult", "challenging", "advanced", "algorithm", "optimize", "mathematics", "logic", "code", "program", "function", "write a", "plan", "strategy", "approach" ] complexity_score = sum(1 for indicator in complexity_indicators if indicator in last_user_msg) if complexity_score >= 2 or len(last_user_msg) > 200: return "enabled" return "disabled" return "disabled" def apply_thinking_prompt(messages: List[ChatMessage], thinking_mode: str) -> List[Dict[str, str]]: """Apply thinking mode instructions to the messages.""" formatted = [] for msg in messages: m = {"role": msg.role, "content": msg.content or ""} if msg.role == "tool": m["role"] = "user" m["content"] = f"[Tool Result (call_id: {msg.tool_call_id})]\n{msg.content}" formatted.append(m) if thinking_mode == "enabled": thinking_instruction = { "role": "system", "content": ( "You should think step by step before responding. " "Put your reasoning inside ... tags, " "then provide your final answer outside the tags." ) } # Prepend or merge with existing system if formatted and formatted[0]["role"] == "system": formatted[0]["content"] = formatted[0]["content"] + "\n\n" + thinking_instruction["content"] else: formatted.insert(0, thinking_instruction) elif thinking_mode == "disabled": no_think_instruction = "/no_think" if formatted and formatted[0]["role"] == "system": formatted[0]["content"] = formatted[0]["content"] + "\n\n" + no_think_instruction else: formatted.insert(0, {"role": "system", "content": no_think_instruction}) return formatted def parse_tool_calls(text: str) -> tuple: """Parse tool calls from model output. Returns (content, tool_calls).""" tool_call_pattern = r'\s*(\{.*?\})\s*' matches = re.findall(tool_call_pattern, text, re.DOTALL) if not matches: return text, None tool_calls = [] for i, match in enumerate(matches): try: call_data = json.loads(match) tool_call = ToolCall( id=f"call_{uuid.uuid4().hex[:24]}", type="function", function=ToolCallFunction( name=call_data.get("name", ""), arguments=json.dumps(call_data.get("arguments", {})) ) ) tool_calls.append(tool_call) except json.JSONDecodeError: continue # Remove tool call blocks from content clean_content = re.sub(tool_call_pattern, '', text, flags=re.DOTALL).strip() return clean_content if clean_content else None, tool_calls if tool_calls else None def parse_thinking(text: str) -> tuple: """ Extract thinking content from response. Returns (thinking_content, main_content) """ think_pattern = r'(.*?)' matches = re.findall(think_pattern, text, re.DOTALL) if matches: thinking = "\n\n".join(m.strip() for m in matches) main_content = re.sub(think_pattern, '', text, flags=re.DOTALL).strip() return thinking, main_content return None, text # ─── API Endpoints ──────────────────────────────────────────────────────────── @app.get("/", response_class=HTMLResponse) async def root(request: Request): return templates.TemplateResponse("index.html", {"request": request}) @app.get("/health") async def health(): return {"status": "healthy", "model": MODEL_NAME} @app.get("/v1/models") async def list_models(): return { "object": "list", "data": [ { "id": MODEL_NAME, "object": "model", "created": get_timestamp(), "owned_by": "local", "permission": [], "root": MODEL_NAME, "parent": None, } ] } @app.get("/v1/models/{model_id}") async def get_model(model_id: str): return { "id": MODEL_NAME, "object": "model", "created": get_timestamp(), "owned_by": "local", } @app.post("/v1/chat/completions") async def chat_completions(request: Request): try: body = await request.json() except Exception: raise HTTPException(status_code=400, detail="Invalid JSON body") try: # Extract enable_thinking from various places enable_thinking = body.pop("enable_thinking", None) if enable_thinking is None and "extra_body" in body: eb = body.get("extra_body", {}) if eb and "enable_thinking" in eb: enable_thinking = eb.pop("enable_thinking") if eb is not None and not eb: body.pop("extra_body", None) # Clean extra_body before validation body.pop("extra_body", None) req = ChatCompletionRequest(**body) req.enable_thinking = enable_thinking except Exception as e: raise HTTPException(status_code=400, detail=f"Invalid request: {str(e)}") max_tokens = req.max_completion_tokens or req.max_tokens or MAX_OUTPUT_TOKENS max_tokens = min(max_tokens, MAX_OUTPUT_TOKENS) # Determine thinking mode thinking_mode = determine_thinking_mode(req.enable_thinking, req.messages, req.tools) # Build messages with tool support messages = list(req.messages) if req.tools: tool_system = build_system_prompt_for_tools(req.tools) if messages and messages[0].role == "system": messages[0] = ChatMessage( role="system", content=messages[0].content + "\n\n" + tool_system ) else: messages.insert(0, ChatMessage(role="system", content=tool_system)) # Apply thinking mode formatted_messages = apply_thinking_prompt(messages, thinking_mode) # Build stop sequences stop_sequences = ["<|endoftext|>", "<|im_end|>"] if req.stop: if isinstance(req.stop, str): stop_sequences.append(req.stop) else: stop_sequences.extend(req.stop) if req.stream: return EventSourceResponse( stream_response( formatted_messages, max_tokens, req.temperature, req.top_p, stop_sequences, req.presence_penalty, req.frequency_penalty, thinking_mode, req.tools ), media_type="text/event-stream" ) else: return await non_stream_response( formatted_messages, max_tokens, req.temperature, req.top_p, stop_sequences, req.presence_penalty, req.frequency_penalty, thinking_mode, req.tools ) async def non_stream_response( messages, max_tokens, temperature, top_p, stop, presence_penalty, frequency_penalty, thinking_mode, tools ): """Generate a non-streaming response.""" completion_id = generate_id() created = get_timestamp() try: response = llm.create_chat_completion( messages=messages, max_tokens=max_tokens, temperature=temperature, top_p=top_p, stop=stop, presence_penalty=presence_penalty, frequency_penalty=frequency_penalty, ) raw_content = response["choices"][0]["message"].get("content", "") or "" # Parse thinking thinking_content, main_content = parse_thinking(raw_content) # Parse tool calls final_content, tool_calls = parse_tool_calls(main_content) # Build response message message = {"role": "assistant"} if tool_calls: message["content"] = final_content message["tool_calls"] = [ { "id": tc.id, "type": tc.type, "function": { "name": tc.function.name, "arguments": tc.function.arguments } } for tc in tool_calls ] finish_reason = "tool_calls" else: message["content"] = final_content or main_content finish_reason = response["choices"][0].get("finish_reason", "stop") result = { "id": completion_id, "object": "chat.completion", "created": created, "model": MODEL_NAME, "choices": [ { "index": 0, "message": message, "finish_reason": finish_reason } ], "usage": response.get("usage", { "prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0 }) } # Add thinking metadata if thinking_content and thinking_mode != "disabled": result["thinking"] = thinking_content return JSONResponse(content=result) except Exception as e: raise HTTPException(status_code=500, detail=f"Generation error: {str(e)}") async def stream_response( messages, max_tokens, temperature, top_p, stop, presence_penalty, frequency_penalty, thinking_mode, tools ): """Generate a streaming response with proper SSE formatting.""" completion_id = generate_id() created = get_timestamp() # Send initial chunk with role initial_chunk = { "id": completion_id, "object": "chat.completion.chunk", "created": created, "model": MODEL_NAME, "choices": [ { "index": 0, "delta": {"role": "assistant", "content": ""}, "finish_reason": None } ] } yield {"data": json.dumps(initial_chunk)} try: stream = llm.create_chat_completion( messages=messages, max_tokens=max_tokens, temperature=temperature, top_p=top_p, stop=stop, presence_penalty=presence_penalty, frequency_penalty=frequency_penalty, stream=True, ) full_response = "" buffer = "" in_think_tag = False think_buffer = "" in_tool_call = False tool_buffer = "" sent_thinking_start = False sent_thinking_end = False pending_tool_calls = [] for chunk_data in stream: delta = chunk_data["choices"][0].get("delta", {}) content = delta.get("content", "") finish_reason = chunk_data["choices"][0].get("finish_reason") if content: full_response += content buffer += content # Process buffer for think tags and tool calls while buffer: if in_tool_call: end_idx = buffer.find("") if end_idx != -1: tool_buffer += buffer[:end_idx] buffer = buffer[end_idx + len(""):] in_tool_call = False # Parse the tool call try: call_data = json.loads(tool_buffer.strip()) tc = { "id": f"call_{uuid.uuid4().hex[:24]}", "type": "function", "function": { "name": call_data.get("name", ""), "arguments": json.dumps(call_data.get("arguments", {})) } } pending_tool_calls.append(tc) except json.JSONDecodeError: pass tool_buffer = "" else: tool_buffer += buffer buffer = "" elif in_think_tag: end_idx = buffer.find("") if end_idx != -1: think_content = buffer[:end_idx] buffer = buffer[end_idx + len(""):] in_think_tag = False # Send thinking content as special chunk if thinking_mode != "disabled" and think_content.strip(): think_chunk = { "id": completion_id, "object": "chat.completion.chunk", "created": created, "model": MODEL_NAME, "choices": [{ "index": 0, "delta": { "content": f"{think_content}" }, "finish_reason": None }] } yield {"data": json.dumps(think_chunk)} else: think_buffer += buffer buffer = "" else: # Check for start think_start = buffer.find("") tool_start = buffer.find("") # Find the earliest tag next_tag = -1 tag_type = None if think_start != -1: next_tag = think_start tag_type = "think" if tool_start != -1 and (next_tag == -1 or tool_start < next_tag): next_tag = tool_start tag_type = "tool" if next_tag != -1: # Send content before the tag before = buffer[:next_tag] if before: content_chunk = { "id": completion_id, "object": "chat.completion.chunk", "created": created, "model": MODEL_NAME, "choices": [{ "index": 0, "delta": {"content": before}, "finish_reason": None }] } yield {"data": json.dumps(content_chunk)} if tag_type == "think": in_think_tag = True buffer = buffer[next_tag + len(""):] elif tag_type == "tool": in_tool_call = True buffer = buffer[next_tag + len(""):] else: # Check for partial tags at end of buffer partial_tags = ["