| | """ |
| | OpenAI-Compatible API Server for Qwen3-0.6B-GGUF |
| | Supports: streaming, tool calling, thinking modes (true/false/auto) |
| | """ |
| |
|
| | import os |
| | import sys |
| | import json |
| | import time |
| | import uuid |
| | import copy |
| | import re |
| | from typing import Optional, List, Dict, Any, Union, AsyncGenerator |
| |
|
| | import uvicorn |
| | from fastapi import FastAPI, Request, HTTPException |
| | from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse |
| | from fastapi.staticfiles import StaticFiles |
| | from fastapi.templating import Jinja2Templates |
| | from pydantic import BaseModel, Field |
| | from sse_starlette.sse import EventSourceResponse |
| |
|
| | from llama_cpp import Llama |
| |
|
| | |
| |
|
| | MODEL_PATH = os.environ.get("MODEL_PATH", "/app/models/Qwen3-0.6B-Q4_K_M.gguf") |
| | CONTEXT_SIZE = 16384 |
| | MAX_OUTPUT_TOKENS = 8192 |
| | HOST = "0.0.0.0" |
| | PORT = 7860 |
| | MODEL_NAME = "qwen3-0.6b" |
| |
|
| | |
| |
|
| | app = FastAPI( |
| | title="Qwen3-0.6B OpenAI-Compatible API", |
| | version="1.0.0", |
| | description="OpenAI-compatible server with thinking modes and tool calling" |
| | ) |
| |
|
| | templates = Jinja2Templates(directory="templates") |
| |
|
| | |
| |
|
| | print(f"Loading model from {MODEL_PATH}...") |
| | print(f"Context size: {CONTEXT_SIZE}, Max output: {MAX_OUTPUT_TOKENS}") |
| |
|
| | llm = Llama( |
| | model_path=MODEL_PATH, |
| | n_ctx=CONTEXT_SIZE, |
| | n_threads=4, |
| | n_gpu_layers=0, |
| | verbose=True, |
| | chat_format="chatml", |
| | ) |
| |
|
| | print("Model loaded successfully!") |
| |
|
| | |
| |
|
| | class FunctionDefinition(BaseModel): |
| | name: str |
| | description: Optional[str] = None |
| | parameters: Optional[Dict[str, Any]] = None |
| |
|
| | class ToolDefinition(BaseModel): |
| | type: str = "function" |
| | function: FunctionDefinition |
| |
|
| | class ToolCallFunction(BaseModel): |
| | name: str |
| | arguments: str |
| |
|
| | class ToolCall(BaseModel): |
| | id: str |
| | type: str = "function" |
| | function: ToolCallFunction |
| |
|
| | class ChatMessage(BaseModel): |
| | role: str |
| | content: Optional[str] = None |
| | name: Optional[str] = None |
| | tool_calls: Optional[List[ToolCall]] = None |
| | tool_call_id: Optional[str] = None |
| |
|
| | class ChatCompletionRequest(BaseModel): |
| | model: Optional[str] = MODEL_NAME |
| | messages: List[ChatMessage] |
| | temperature: Optional[float] = 0.7 |
| | top_p: Optional[float] = 0.9 |
| | max_tokens: Optional[int] = MAX_OUTPUT_TOKENS |
| | max_completion_tokens: Optional[int] = None |
| | stream: Optional[bool] = False |
| | stop: Optional[Union[str, List[str]]] = None |
| | presence_penalty: Optional[float] = 0.0 |
| | frequency_penalty: Optional[float] = 0.0 |
| | tools: Optional[List[ToolDefinition]] = None |
| | tool_choice: Optional[Union[str, Dict]] = None |
| | extra_body: Optional[Dict[str, Any]] = None |
| | |
| | enable_thinking: Optional[Any] = None |
| |
|
| | |
| |
|
| | def generate_id(): |
| | return f"chatcmpl-{uuid.uuid4().hex[:24]}" |
| |
|
| | def get_timestamp(): |
| | return int(time.time()) |
| |
|
| | def build_system_prompt_for_tools(tools: List[ToolDefinition]) -> str: |
| | """Build a system prompt that instructs the model about available tools.""" |
| | tool_descriptions = [] |
| | for tool in tools: |
| | func = tool.function |
| | tool_info = { |
| | "type": "function", |
| | "function": { |
| | "name": func.name, |
| | "description": func.description or "", |
| | "parameters": func.parameters or {} |
| | } |
| | } |
| | tool_descriptions.append(tool_info) |
| |
|
| | tools_json = json.dumps(tool_descriptions, indent=2) |
| |
|
| | tool_system = f"""You are a helpful assistant with access to the following tools. Use them when needed. |
| | |
| | # Tools |
| | |
| | You have access to the following tools: |
| | |
| | {tools_json} |
| | |
| | # Tool Call Format |
| | |
| | When you need to call a tool, respond with the following format: |
| | |
| | <tool_call> |
| | {{"name": "function_name", "arguments": {{"param1": "value1", "param2": "value2"}}}} |
| | </tool_call> |
| | |
| | You can call multiple tools. Each tool call should be in its own <tool_call> block. |
| | If you don't need to use any tool, just respond normally.""" |
| |
|
| | return tool_system |
| |
|
| | def determine_thinking_mode(enable_thinking, messages, tools) -> str: |
| | """ |
| | Determine the thinking mode. |
| | Returns: 'enabled', 'disabled' |
| | """ |
| | if enable_thinking is True or enable_thinking == "true": |
| | return "enabled" |
| | elif enable_thinking is False or enable_thinking == "false": |
| | return "disabled" |
| | elif enable_thinking == "auto" or enable_thinking is None: |
| | |
| | if tools and len(tools) > 0: |
| | return "enabled" |
| | last_user_msg = "" |
| | for msg in reversed(messages): |
| | if msg.role == "user" and msg.content: |
| | last_user_msg = msg.content.lower() |
| | break |
| | complexity_indicators = [ |
| | "explain", "analyze", "compare", "why", "how does", |
| | "step by step", "reason", "think", "calculate", |
| | "solve", "debug", "implement", "design", "architect", |
| | "evaluate", "assess", "critique", "prove", "derive", |
| | "what are the implications", "trade-off", "pros and cons", |
| | "complex", "difficult", "challenging", "advanced", |
| | "algorithm", "optimize", "mathematics", "logic", |
| | "code", "program", "function", "write a", |
| | "plan", "strategy", "approach" |
| | ] |
| | complexity_score = sum(1 for indicator in complexity_indicators if indicator in last_user_msg) |
| | if complexity_score >= 2 or len(last_user_msg) > 200: |
| | return "enabled" |
| | return "disabled" |
| | return "disabled" |
| |
|
| | def apply_thinking_prompt(messages: List[ChatMessage], thinking_mode: str) -> List[Dict[str, str]]: |
| | """Apply thinking mode instructions to the messages.""" |
| | formatted = [] |
| |
|
| | for msg in messages: |
| | m = {"role": msg.role, "content": msg.content or ""} |
| | if msg.role == "tool": |
| | m["role"] = "user" |
| | m["content"] = f"[Tool Result (call_id: {msg.tool_call_id})]\n{msg.content}" |
| | formatted.append(m) |
| |
|
| | if thinking_mode == "enabled": |
| | thinking_instruction = { |
| | "role": "system", |
| | "content": ( |
| | "You should think step by step before responding. " |
| | "Put your reasoning inside <think>...</think> tags, " |
| | "then provide your final answer outside the tags." |
| | ) |
| | } |
| | |
| | if formatted and formatted[0]["role"] == "system": |
| | formatted[0]["content"] = formatted[0]["content"] + "\n\n" + thinking_instruction["content"] |
| | else: |
| | formatted.insert(0, thinking_instruction) |
| | elif thinking_mode == "disabled": |
| | no_think_instruction = "/no_think" |
| | if formatted and formatted[0]["role"] == "system": |
| | formatted[0]["content"] = formatted[0]["content"] + "\n\n" + no_think_instruction |
| | else: |
| | formatted.insert(0, {"role": "system", "content": no_think_instruction}) |
| |
|
| | return formatted |
| |
|
| | def parse_tool_calls(text: str) -> tuple: |
| | """Parse tool calls from model output. Returns (content, tool_calls).""" |
| | tool_call_pattern = r'<tool_call>\s*(\{.*?\})\s*</tool_call>' |
| | matches = re.findall(tool_call_pattern, text, re.DOTALL) |
| |
|
| | if not matches: |
| | return text, None |
| |
|
| | tool_calls = [] |
| | for i, match in enumerate(matches): |
| | try: |
| | call_data = json.loads(match) |
| | tool_call = ToolCall( |
| | id=f"call_{uuid.uuid4().hex[:24]}", |
| | type="function", |
| | function=ToolCallFunction( |
| | name=call_data.get("name", ""), |
| | arguments=json.dumps(call_data.get("arguments", {})) |
| | ) |
| | ) |
| | tool_calls.append(tool_call) |
| | except json.JSONDecodeError: |
| | continue |
| |
|
| | |
| | clean_content = re.sub(tool_call_pattern, '', text, flags=re.DOTALL).strip() |
| |
|
| | return clean_content if clean_content else None, tool_calls if tool_calls else None |
| |
|
| | def parse_thinking(text: str) -> tuple: |
| | """ |
| | Extract thinking content from response. |
| | Returns (thinking_content, main_content) |
| | """ |
| | think_pattern = r'<think>(.*?)</think>' |
| | matches = re.findall(think_pattern, text, re.DOTALL) |
| |
|
| | if matches: |
| | thinking = "\n\n".join(m.strip() for m in matches) |
| | main_content = re.sub(think_pattern, '', text, flags=re.DOTALL).strip() |
| | return thinking, main_content |
| |
|
| | return None, text |
| |
|
| | |
| |
|
| | @app.get("/", response_class=HTMLResponse) |
| | async def root(request: Request): |
| | return templates.TemplateResponse("index.html", {"request": request}) |
| |
|
| | @app.get("/health") |
| | async def health(): |
| | return {"status": "healthy", "model": MODEL_NAME} |
| |
|
| | @app.get("/v1/models") |
| | async def list_models(): |
| | return { |
| | "object": "list", |
| | "data": [ |
| | { |
| | "id": MODEL_NAME, |
| | "object": "model", |
| | "created": get_timestamp(), |
| | "owned_by": "local", |
| | "permission": [], |
| | "root": MODEL_NAME, |
| | "parent": None, |
| | } |
| | ] |
| | } |
| |
|
| | @app.get("/v1/models/{model_id}") |
| | async def get_model(model_id: str): |
| | return { |
| | "id": MODEL_NAME, |
| | "object": "model", |
| | "created": get_timestamp(), |
| | "owned_by": "local", |
| | } |
| |
|
| | @app.post("/v1/chat/completions") |
| | async def chat_completions(request: Request): |
| | try: |
| | body = await request.json() |
| | except Exception: |
| | raise HTTPException(status_code=400, detail="Invalid JSON body") |
| |
|
| | try: |
| | |
| | enable_thinking = body.pop("enable_thinking", None) |
| | if enable_thinking is None and "extra_body" in body: |
| | eb = body.get("extra_body", {}) |
| | if eb and "enable_thinking" in eb: |
| | enable_thinking = eb.pop("enable_thinking") |
| | if eb is not None and not eb: |
| | body.pop("extra_body", None) |
| |
|
| | |
| | body.pop("extra_body", None) |
| |
|
| | req = ChatCompletionRequest(**body) |
| | req.enable_thinking = enable_thinking |
| | except Exception as e: |
| | raise HTTPException(status_code=400, detail=f"Invalid request: {str(e)}") |
| |
|
| | max_tokens = req.max_completion_tokens or req.max_tokens or MAX_OUTPUT_TOKENS |
| | max_tokens = min(max_tokens, MAX_OUTPUT_TOKENS) |
| |
|
| | |
| | thinking_mode = determine_thinking_mode(req.enable_thinking, req.messages, req.tools) |
| |
|
| | |
| | messages = list(req.messages) |
| |
|
| | if req.tools: |
| | tool_system = build_system_prompt_for_tools(req.tools) |
| | if messages and messages[0].role == "system": |
| | messages[0] = ChatMessage( |
| | role="system", |
| | content=messages[0].content + "\n\n" + tool_system |
| | ) |
| | else: |
| | messages.insert(0, ChatMessage(role="system", content=tool_system)) |
| |
|
| | |
| | formatted_messages = apply_thinking_prompt(messages, thinking_mode) |
| |
|
| | |
| | stop_sequences = ["<|endoftext|>", "<|im_end|>"] |
| | if req.stop: |
| | if isinstance(req.stop, str): |
| | stop_sequences.append(req.stop) |
| | else: |
| | stop_sequences.extend(req.stop) |
| |
|
| | if req.stream: |
| | return EventSourceResponse( |
| | stream_response( |
| | formatted_messages, max_tokens, req.temperature, |
| | req.top_p, stop_sequences, req.presence_penalty, |
| | req.frequency_penalty, thinking_mode, req.tools |
| | ), |
| | media_type="text/event-stream" |
| | ) |
| | else: |
| | return await non_stream_response( |
| | formatted_messages, max_tokens, req.temperature, |
| | req.top_p, stop_sequences, req.presence_penalty, |
| | req.frequency_penalty, thinking_mode, req.tools |
| | ) |
| |
|
| |
|
| | async def non_stream_response( |
| | messages, max_tokens, temperature, top_p, |
| | stop, presence_penalty, frequency_penalty, |
| | thinking_mode, tools |
| | ): |
| | """Generate a non-streaming response.""" |
| | completion_id = generate_id() |
| | created = get_timestamp() |
| |
|
| | try: |
| | response = llm.create_chat_completion( |
| | messages=messages, |
| | max_tokens=max_tokens, |
| | temperature=temperature, |
| | top_p=top_p, |
| | stop=stop, |
| | presence_penalty=presence_penalty, |
| | frequency_penalty=frequency_penalty, |
| | ) |
| |
|
| | raw_content = response["choices"][0]["message"].get("content", "") or "" |
| |
|
| | |
| | thinking_content, main_content = parse_thinking(raw_content) |
| |
|
| | |
| | final_content, tool_calls = parse_tool_calls(main_content) |
| |
|
| | |
| | message = {"role": "assistant"} |
| |
|
| | if tool_calls: |
| | message["content"] = final_content |
| | message["tool_calls"] = [ |
| | { |
| | "id": tc.id, |
| | "type": tc.type, |
| | "function": { |
| | "name": tc.function.name, |
| | "arguments": tc.function.arguments |
| | } |
| | } |
| | for tc in tool_calls |
| | ] |
| | finish_reason = "tool_calls" |
| | else: |
| | message["content"] = final_content or main_content |
| | finish_reason = response["choices"][0].get("finish_reason", "stop") |
| |
|
| | result = { |
| | "id": completion_id, |
| | "object": "chat.completion", |
| | "created": created, |
| | "model": MODEL_NAME, |
| | "choices": [ |
| | { |
| | "index": 0, |
| | "message": message, |
| | "finish_reason": finish_reason |
| | } |
| | ], |
| | "usage": response.get("usage", { |
| | "prompt_tokens": 0, |
| | "completion_tokens": 0, |
| | "total_tokens": 0 |
| | }) |
| | } |
| |
|
| | |
| | if thinking_content and thinking_mode != "disabled": |
| | result["thinking"] = thinking_content |
| |
|
| | return JSONResponse(content=result) |
| |
|
| | except Exception as e: |
| | raise HTTPException(status_code=500, detail=f"Generation error: {str(e)}") |
| |
|
| |
|
| | async def stream_response( |
| | messages, max_tokens, temperature, top_p, |
| | stop, presence_penalty, frequency_penalty, |
| | thinking_mode, tools |
| | ): |
| | """Generate a streaming response with proper SSE formatting.""" |
| | completion_id = generate_id() |
| | created = get_timestamp() |
| |
|
| | |
| | initial_chunk = { |
| | "id": completion_id, |
| | "object": "chat.completion.chunk", |
| | "created": created, |
| | "model": MODEL_NAME, |
| | "choices": [ |
| | { |
| | "index": 0, |
| | "delta": {"role": "assistant", "content": ""}, |
| | "finish_reason": None |
| | } |
| | ] |
| | } |
| | yield {"data": json.dumps(initial_chunk)} |
| |
|
| | try: |
| | stream = llm.create_chat_completion( |
| | messages=messages, |
| | max_tokens=max_tokens, |
| | temperature=temperature, |
| | top_p=top_p, |
| | stop=stop, |
| | presence_penalty=presence_penalty, |
| | frequency_penalty=frequency_penalty, |
| | stream=True, |
| | ) |
| |
|
| | full_response = "" |
| | buffer = "" |
| | in_think_tag = False |
| | think_buffer = "" |
| | in_tool_call = False |
| | tool_buffer = "" |
| | sent_thinking_start = False |
| | sent_thinking_end = False |
| | pending_tool_calls = [] |
| |
|
| | for chunk_data in stream: |
| | delta = chunk_data["choices"][0].get("delta", {}) |
| | content = delta.get("content", "") |
| | finish_reason = chunk_data["choices"][0].get("finish_reason") |
| |
|
| | if content: |
| | full_response += content |
| | buffer += content |
| |
|
| | |
| | while buffer: |
| | if in_tool_call: |
| | end_idx = buffer.find("</tool_call>") |
| | if end_idx != -1: |
| | tool_buffer += buffer[:end_idx] |
| | buffer = buffer[end_idx + len("</tool_call>"):] |
| | in_tool_call = False |
| | |
| | try: |
| | call_data = json.loads(tool_buffer.strip()) |
| | tc = { |
| | "id": f"call_{uuid.uuid4().hex[:24]}", |
| | "type": "function", |
| | "function": { |
| | "name": call_data.get("name", ""), |
| | "arguments": json.dumps(call_data.get("arguments", {})) |
| | } |
| | } |
| | pending_tool_calls.append(tc) |
| | except json.JSONDecodeError: |
| | pass |
| | tool_buffer = "" |
| | else: |
| | tool_buffer += buffer |
| | buffer = "" |
| |
|
| | elif in_think_tag: |
| | end_idx = buffer.find("</think>") |
| | if end_idx != -1: |
| | think_content = buffer[:end_idx] |
| | buffer = buffer[end_idx + len("</think>"):] |
| | in_think_tag = False |
| |
|
| | |
| | if thinking_mode != "disabled" and think_content.strip(): |
| | think_chunk = { |
| | "id": completion_id, |
| | "object": "chat.completion.chunk", |
| | "created": created, |
| | "model": MODEL_NAME, |
| | "choices": [{ |
| | "index": 0, |
| | "delta": { |
| | "content": f"<think>{think_content}</think>" |
| | }, |
| | "finish_reason": None |
| | }] |
| | } |
| | yield {"data": json.dumps(think_chunk)} |
| | else: |
| | think_buffer += buffer |
| | buffer = "" |
| |
|
| | else: |
| | |
| | think_start = buffer.find("<think>") |
| | tool_start = buffer.find("<tool_call>") |
| |
|
| | |
| | next_tag = -1 |
| | tag_type = None |
| |
|
| | if think_start != -1: |
| | next_tag = think_start |
| | tag_type = "think" |
| | if tool_start != -1 and (next_tag == -1 or tool_start < next_tag): |
| | next_tag = tool_start |
| | tag_type = "tool" |
| |
|
| | if next_tag != -1: |
| | |
| | before = buffer[:next_tag] |
| | if before: |
| | content_chunk = { |
| | "id": completion_id, |
| | "object": "chat.completion.chunk", |
| | "created": created, |
| | "model": MODEL_NAME, |
| | "choices": [{ |
| | "index": 0, |
| | "delta": {"content": before}, |
| | "finish_reason": None |
| | }] |
| | } |
| | yield {"data": json.dumps(content_chunk)} |
| |
|
| | if tag_type == "think": |
| | in_think_tag = True |
| | buffer = buffer[next_tag + len("<think>"):] |
| | elif tag_type == "tool": |
| | in_tool_call = True |
| | buffer = buffer[next_tag + len("<tool_call>"):] |
| | else: |
| | |
| | partial_tags = ["<think", "<tool_call", "</think", "</tool_call"] |
| | has_partial = False |
| | for pt in partial_tags: |
| | for i in range(1, len(pt) + 1): |
| | if buffer.endswith(pt[:i]): |
| | |
| | safe = buffer[:-i] |
| | if safe: |
| | content_chunk = { |
| | "id": completion_id, |
| | "object": "chat.completion.chunk", |
| | "created": created, |
| | "model": MODEL_NAME, |
| | "choices": [{ |
| | "index": 0, |
| | "delta": {"content": safe}, |
| | "finish_reason": None |
| | }] |
| | } |
| | yield {"data": json.dumps(content_chunk)} |
| | buffer = buffer[-i:] |
| | has_partial = True |
| | break |
| | if has_partial: |
| | break |
| |
|
| | if not has_partial and buffer: |
| | content_chunk = { |
| | "id": completion_id, |
| | "object": "chat.completion.chunk", |
| | "created": created, |
| | "model": MODEL_NAME, |
| | "choices": [{ |
| | "index": 0, |
| | "delta": {"content": buffer}, |
| | "finish_reason": None |
| | }] |
| | } |
| | yield {"data": json.dumps(content_chunk)} |
| | buffer = "" |
| |
|
| | if finish_reason: |
| | |
| | if buffer and not in_tool_call and not in_think_tag: |
| | flush_chunk = { |
| | "id": completion_id, |
| | "object": "chat.completion.chunk", |
| | "created": created, |
| | "model": MODEL_NAME, |
| | "choices": [{ |
| | "index": 0, |
| | "delta": {"content": buffer}, |
| | "finish_reason": None |
| | }] |
| | } |
| | yield {"data": json.dumps(flush_chunk)} |
| |
|
| | |
| | if pending_tool_calls: |
| | for i, tc in enumerate(pending_tool_calls): |
| | tc_chunk = { |
| | "id": completion_id, |
| | "object": "chat.completion.chunk", |
| | "created": created, |
| | "model": MODEL_NAME, |
| | "choices": [{ |
| | "index": 0, |
| | "delta": { |
| | "tool_calls": [{ |
| | "index": i, |
| | "id": tc["id"], |
| | "type": "function", |
| | "function": { |
| | "name": tc["function"]["name"], |
| | "arguments": tc["function"]["arguments"] |
| | } |
| | }] |
| | }, |
| | "finish_reason": None |
| | }] |
| | } |
| | yield {"data": json.dumps(tc_chunk)} |
| | finish_reason = "tool_calls" |
| |
|
| | |
| | final_chunk = { |
| | "id": completion_id, |
| | "object": "chat.completion.chunk", |
| | "created": created, |
| | "model": MODEL_NAME, |
| | "choices": [{ |
| | "index": 0, |
| | "delta": {}, |
| | "finish_reason": finish_reason |
| | }] |
| | } |
| | yield {"data": json.dumps(final_chunk)} |
| |
|
| | except Exception as e: |
| | error_chunk = { |
| | "id": completion_id, |
| | "object": "chat.completion.chunk", |
| | "created": created, |
| | "model": MODEL_NAME, |
| | "choices": [{ |
| | "index": 0, |
| | "delta": {"content": f"\n\n[Error: {str(e)}]"}, |
| | "finish_reason": "stop" |
| | }] |
| | } |
| | yield {"data": json.dumps(error_chunk)} |
| |
|
| | yield {"data": "[DONE]"} |
| |
|
| |
|
| | |
| |
|
| | if __name__ == "__main__": |
| | uvicorn.run( |
| | app, |
| | host=HOST, |
| | port=PORT, |
| | log_level="info", |
| | timeout_keep_alive=300, |
| | ) |