Qwen3-Coder-0.6B / server.py
naimulislam's picture
Update server.py
d2543ed verified
"""
OpenAI-Compatible API Server for Qwen3-0.6B-GGUF
Supports: streaming, tool calling, thinking modes (true/false/auto)
"""
import os
import sys
import json
import time
import uuid
import copy
import re
from typing import Optional, List, Dict, Any, Union, AsyncGenerator
import uvicorn
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel, Field
from sse_starlette.sse import EventSourceResponse
from llama_cpp import Llama
# ─── Configuration ───────────────────────────────────────────────────────────
MODEL_PATH = os.environ.get("MODEL_PATH", "/app/models/Qwen3-0.6B-Q4_K_M.gguf")
CONTEXT_SIZE = 16384
MAX_OUTPUT_TOKENS = 8192
HOST = "0.0.0.0"
PORT = 7860
MODEL_NAME = "qwen3-0.6b"
# ─── Initialize FastAPI ──────────────────────────────────────────────────────
app = FastAPI(
title="Qwen3-0.6B OpenAI-Compatible API",
version="1.0.0",
description="OpenAI-compatible server with thinking modes and tool calling"
)
templates = Jinja2Templates(directory="templates")
# ─── Load Model ──────────────────────────────────────────────────────────────
print(f"Loading model from {MODEL_PATH}...")
print(f"Context size: {CONTEXT_SIZE}, Max output: {MAX_OUTPUT_TOKENS}")
llm = Llama(
model_path=MODEL_PATH,
n_ctx=CONTEXT_SIZE,
n_threads=4,
n_gpu_layers=0,
verbose=True,
chat_format="chatml",
)
print("Model loaded successfully!")
# ─── Pydantic Models ─────────────────────────────────────────────────────────
class FunctionDefinition(BaseModel):
name: str
description: Optional[str] = None
parameters: Optional[Dict[str, Any]] = None
class ToolDefinition(BaseModel):
type: str = "function"
function: FunctionDefinition
class ToolCallFunction(BaseModel):
name: str
arguments: str
class ToolCall(BaseModel):
id: str
type: str = "function"
function: ToolCallFunction
class ChatMessage(BaseModel):
role: str
content: Optional[str] = None
name: Optional[str] = None
tool_calls: Optional[List[ToolCall]] = None
tool_call_id: Optional[str] = None
class ChatCompletionRequest(BaseModel):
model: Optional[str] = MODEL_NAME
messages: List[ChatMessage]
temperature: Optional[float] = 0.7
top_p: Optional[float] = 0.9
max_tokens: Optional[int] = MAX_OUTPUT_TOKENS
max_completion_tokens: Optional[int] = None
stream: Optional[bool] = False
stop: Optional[Union[str, List[str]]] = None
presence_penalty: Optional[float] = 0.0
frequency_penalty: Optional[float] = 0.0
tools: Optional[List[ToolDefinition]] = None
tool_choice: Optional[Union[str, Dict]] = None
extra_body: Optional[Dict[str, Any]] = None
# Thinking mode: true, false, "auto"
enable_thinking: Optional[Any] = None
# ─── Helper Functions ─────────────────────────────────────────────────────────
def generate_id():
return f"chatcmpl-{uuid.uuid4().hex[:24]}"
def get_timestamp():
return int(time.time())
def build_system_prompt_for_tools(tools: List[ToolDefinition]) -> str:
"""Build a system prompt that instructs the model about available tools."""
tool_descriptions = []
for tool in tools:
func = tool.function
tool_info = {
"type": "function",
"function": {
"name": func.name,
"description": func.description or "",
"parameters": func.parameters or {}
}
}
tool_descriptions.append(tool_info)
tools_json = json.dumps(tool_descriptions, indent=2)
tool_system = f"""You are a helpful assistant with access to the following tools. Use them when needed.
# Tools
You have access to the following tools:
{tools_json}
# Tool Call Format
When you need to call a tool, respond with the following format:
<tool_call>
{{"name": "function_name", "arguments": {{"param1": "value1", "param2": "value2"}}}}
</tool_call>
You can call multiple tools. Each tool call should be in its own <tool_call> block.
If you don't need to use any tool, just respond normally."""
return tool_system
def determine_thinking_mode(enable_thinking, messages, tools) -> str:
"""
Determine the thinking mode.
Returns: 'enabled', 'disabled'
"""
if enable_thinking is True or enable_thinking == "true":
return "enabled"
elif enable_thinking is False or enable_thinking == "false":
return "disabled"
elif enable_thinking == "auto" or enable_thinking is None:
# Auto mode: decide based on task complexity
if tools and len(tools) > 0:
return "enabled"
last_user_msg = ""
for msg in reversed(messages):
if msg.role == "user" and msg.content:
last_user_msg = msg.content.lower()
break
complexity_indicators = [
"explain", "analyze", "compare", "why", "how does",
"step by step", "reason", "think", "calculate",
"solve", "debug", "implement", "design", "architect",
"evaluate", "assess", "critique", "prove", "derive",
"what are the implications", "trade-off", "pros and cons",
"complex", "difficult", "challenging", "advanced",
"algorithm", "optimize", "mathematics", "logic",
"code", "program", "function", "write a",
"plan", "strategy", "approach"
]
complexity_score = sum(1 for indicator in complexity_indicators if indicator in last_user_msg)
if complexity_score >= 2 or len(last_user_msg) > 200:
return "enabled"
return "disabled"
return "disabled"
def apply_thinking_prompt(messages: List[ChatMessage], thinking_mode: str) -> List[Dict[str, str]]:
"""Apply thinking mode instructions to the messages."""
formatted = []
for msg in messages:
m = {"role": msg.role, "content": msg.content or ""}
if msg.role == "tool":
m["role"] = "user"
m["content"] = f"[Tool Result (call_id: {msg.tool_call_id})]\n{msg.content}"
formatted.append(m)
if thinking_mode == "enabled":
thinking_instruction = {
"role": "system",
"content": (
"You should think step by step before responding. "
"Put your reasoning inside <think>...</think> tags, "
"then provide your final answer outside the tags."
)
}
# Prepend or merge with existing system
if formatted and formatted[0]["role"] == "system":
formatted[0]["content"] = formatted[0]["content"] + "\n\n" + thinking_instruction["content"]
else:
formatted.insert(0, thinking_instruction)
elif thinking_mode == "disabled":
no_think_instruction = "/no_think"
if formatted and formatted[0]["role"] == "system":
formatted[0]["content"] = formatted[0]["content"] + "\n\n" + no_think_instruction
else:
formatted.insert(0, {"role": "system", "content": no_think_instruction})
return formatted
def parse_tool_calls(text: str) -> tuple:
"""Parse tool calls from model output. Returns (content, tool_calls)."""
tool_call_pattern = r'<tool_call>\s*(\{.*?\})\s*</tool_call>'
matches = re.findall(tool_call_pattern, text, re.DOTALL)
if not matches:
return text, None
tool_calls = []
for i, match in enumerate(matches):
try:
call_data = json.loads(match)
tool_call = ToolCall(
id=f"call_{uuid.uuid4().hex[:24]}",
type="function",
function=ToolCallFunction(
name=call_data.get("name", ""),
arguments=json.dumps(call_data.get("arguments", {}))
)
)
tool_calls.append(tool_call)
except json.JSONDecodeError:
continue
# Remove tool call blocks from content
clean_content = re.sub(tool_call_pattern, '', text, flags=re.DOTALL).strip()
return clean_content if clean_content else None, tool_calls if tool_calls else None
def parse_thinking(text: str) -> tuple:
"""
Extract thinking content from response.
Returns (thinking_content, main_content)
"""
think_pattern = r'<think>(.*?)</think>'
matches = re.findall(think_pattern, text, re.DOTALL)
if matches:
thinking = "\n\n".join(m.strip() for m in matches)
main_content = re.sub(think_pattern, '', text, flags=re.DOTALL).strip()
return thinking, main_content
return None, text
# ─── API Endpoints ────────────────────────────────────────────────────────────
@app.get("/", response_class=HTMLResponse)
async def root(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.get("/health")
async def health():
return {"status": "healthy", "model": MODEL_NAME}
@app.get("/v1/models")
async def list_models():
return {
"object": "list",
"data": [
{
"id": MODEL_NAME,
"object": "model",
"created": get_timestamp(),
"owned_by": "local",
"permission": [],
"root": MODEL_NAME,
"parent": None,
}
]
}
@app.get("/v1/models/{model_id}")
async def get_model(model_id: str):
return {
"id": MODEL_NAME,
"object": "model",
"created": get_timestamp(),
"owned_by": "local",
}
@app.post("/v1/chat/completions")
async def chat_completions(request: Request):
try:
body = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON body")
try:
# Extract enable_thinking from various places
enable_thinking = body.pop("enable_thinking", None)
if enable_thinking is None and "extra_body" in body:
eb = body.get("extra_body", {})
if eb and "enable_thinking" in eb:
enable_thinking = eb.pop("enable_thinking")
if eb is not None and not eb:
body.pop("extra_body", None)
# Clean extra_body before validation
body.pop("extra_body", None)
req = ChatCompletionRequest(**body)
req.enable_thinking = enable_thinking
except Exception as e:
raise HTTPException(status_code=400, detail=f"Invalid request: {str(e)}")
max_tokens = req.max_completion_tokens or req.max_tokens or MAX_OUTPUT_TOKENS
max_tokens = min(max_tokens, MAX_OUTPUT_TOKENS)
# Determine thinking mode
thinking_mode = determine_thinking_mode(req.enable_thinking, req.messages, req.tools)
# Build messages with tool support
messages = list(req.messages)
if req.tools:
tool_system = build_system_prompt_for_tools(req.tools)
if messages and messages[0].role == "system":
messages[0] = ChatMessage(
role="system",
content=messages[0].content + "\n\n" + tool_system
)
else:
messages.insert(0, ChatMessage(role="system", content=tool_system))
# Apply thinking mode
formatted_messages = apply_thinking_prompt(messages, thinking_mode)
# Build stop sequences
stop_sequences = ["<|endoftext|>", "<|im_end|>"]
if req.stop:
if isinstance(req.stop, str):
stop_sequences.append(req.stop)
else:
stop_sequences.extend(req.stop)
if req.stream:
return EventSourceResponse(
stream_response(
formatted_messages, max_tokens, req.temperature,
req.top_p, stop_sequences, req.presence_penalty,
req.frequency_penalty, thinking_mode, req.tools
),
media_type="text/event-stream"
)
else:
return await non_stream_response(
formatted_messages, max_tokens, req.temperature,
req.top_p, stop_sequences, req.presence_penalty,
req.frequency_penalty, thinking_mode, req.tools
)
async def non_stream_response(
messages, max_tokens, temperature, top_p,
stop, presence_penalty, frequency_penalty,
thinking_mode, tools
):
"""Generate a non-streaming response."""
completion_id = generate_id()
created = get_timestamp()
try:
response = llm.create_chat_completion(
messages=messages,
max_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
stop=stop,
presence_penalty=presence_penalty,
frequency_penalty=frequency_penalty,
)
raw_content = response["choices"][0]["message"].get("content", "") or ""
# Parse thinking
thinking_content, main_content = parse_thinking(raw_content)
# Parse tool calls
final_content, tool_calls = parse_tool_calls(main_content)
# Build response message
message = {"role": "assistant"}
if tool_calls:
message["content"] = final_content
message["tool_calls"] = [
{
"id": tc.id,
"type": tc.type,
"function": {
"name": tc.function.name,
"arguments": tc.function.arguments
}
}
for tc in tool_calls
]
finish_reason = "tool_calls"
else:
message["content"] = final_content or main_content
finish_reason = response["choices"][0].get("finish_reason", "stop")
result = {
"id": completion_id,
"object": "chat.completion",
"created": created,
"model": MODEL_NAME,
"choices": [
{
"index": 0,
"message": message,
"finish_reason": finish_reason
}
],
"usage": response.get("usage", {
"prompt_tokens": 0,
"completion_tokens": 0,
"total_tokens": 0
})
}
# Add thinking metadata
if thinking_content and thinking_mode != "disabled":
result["thinking"] = thinking_content
return JSONResponse(content=result)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Generation error: {str(e)}")
async def stream_response(
messages, max_tokens, temperature, top_p,
stop, presence_penalty, frequency_penalty,
thinking_mode, tools
):
"""Generate a streaming response with proper SSE formatting."""
completion_id = generate_id()
created = get_timestamp()
# Send initial chunk with role
initial_chunk = {
"id": completion_id,
"object": "chat.completion.chunk",
"created": created,
"model": MODEL_NAME,
"choices": [
{
"index": 0,
"delta": {"role": "assistant", "content": ""},
"finish_reason": None
}
]
}
yield {"data": json.dumps(initial_chunk)}
try:
stream = llm.create_chat_completion(
messages=messages,
max_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
stop=stop,
presence_penalty=presence_penalty,
frequency_penalty=frequency_penalty,
stream=True,
)
full_response = ""
buffer = ""
in_think_tag = False
think_buffer = ""
in_tool_call = False
tool_buffer = ""
sent_thinking_start = False
sent_thinking_end = False
pending_tool_calls = []
for chunk_data in stream:
delta = chunk_data["choices"][0].get("delta", {})
content = delta.get("content", "")
finish_reason = chunk_data["choices"][0].get("finish_reason")
if content:
full_response += content
buffer += content
# Process buffer for think tags and tool calls
while buffer:
if in_tool_call:
end_idx = buffer.find("</tool_call>")
if end_idx != -1:
tool_buffer += buffer[:end_idx]
buffer = buffer[end_idx + len("</tool_call>"):]
in_tool_call = False
# Parse the tool call
try:
call_data = json.loads(tool_buffer.strip())
tc = {
"id": f"call_{uuid.uuid4().hex[:24]}",
"type": "function",
"function": {
"name": call_data.get("name", ""),
"arguments": json.dumps(call_data.get("arguments", {}))
}
}
pending_tool_calls.append(tc)
except json.JSONDecodeError:
pass
tool_buffer = ""
else:
tool_buffer += buffer
buffer = ""
elif in_think_tag:
end_idx = buffer.find("</think>")
if end_idx != -1:
think_content = buffer[:end_idx]
buffer = buffer[end_idx + len("</think>"):]
in_think_tag = False
# Send thinking content as special chunk
if thinking_mode != "disabled" and think_content.strip():
think_chunk = {
"id": completion_id,
"object": "chat.completion.chunk",
"created": created,
"model": MODEL_NAME,
"choices": [{
"index": 0,
"delta": {
"content": f"<think>{think_content}</think>"
},
"finish_reason": None
}]
}
yield {"data": json.dumps(think_chunk)}
else:
think_buffer += buffer
buffer = ""
else:
# Check for <think> start
think_start = buffer.find("<think>")
tool_start = buffer.find("<tool_call>")
# Find the earliest tag
next_tag = -1
tag_type = None
if think_start != -1:
next_tag = think_start
tag_type = "think"
if tool_start != -1 and (next_tag == -1 or tool_start < next_tag):
next_tag = tool_start
tag_type = "tool"
if next_tag != -1:
# Send content before the tag
before = buffer[:next_tag]
if before:
content_chunk = {
"id": completion_id,
"object": "chat.completion.chunk",
"created": created,
"model": MODEL_NAME,
"choices": [{
"index": 0,
"delta": {"content": before},
"finish_reason": None
}]
}
yield {"data": json.dumps(content_chunk)}
if tag_type == "think":
in_think_tag = True
buffer = buffer[next_tag + len("<think>"):]
elif tag_type == "tool":
in_tool_call = True
buffer = buffer[next_tag + len("<tool_call>"):]
else:
# Check for partial tags at end of buffer
partial_tags = ["<think", "<tool_call", "</think", "</tool_call"]
has_partial = False
for pt in partial_tags:
for i in range(1, len(pt) + 1):
if buffer.endswith(pt[:i]):
# Keep partial tag in buffer
safe = buffer[:-i]
if safe:
content_chunk = {
"id": completion_id,
"object": "chat.completion.chunk",
"created": created,
"model": MODEL_NAME,
"choices": [{
"index": 0,
"delta": {"content": safe},
"finish_reason": None
}]
}
yield {"data": json.dumps(content_chunk)}
buffer = buffer[-i:]
has_partial = True
break
if has_partial:
break
if not has_partial and buffer:
content_chunk = {
"id": completion_id,
"object": "chat.completion.chunk",
"created": created,
"model": MODEL_NAME,
"choices": [{
"index": 0,
"delta": {"content": buffer},
"finish_reason": None
}]
}
yield {"data": json.dumps(content_chunk)}
buffer = ""
if finish_reason:
# Flush remaining buffer
if buffer and not in_tool_call and not in_think_tag:
flush_chunk = {
"id": completion_id,
"object": "chat.completion.chunk",
"created": created,
"model": MODEL_NAME,
"choices": [{
"index": 0,
"delta": {"content": buffer},
"finish_reason": None
}]
}
yield {"data": json.dumps(flush_chunk)}
# Send tool calls if any
if pending_tool_calls:
for i, tc in enumerate(pending_tool_calls):
tc_chunk = {
"id": completion_id,
"object": "chat.completion.chunk",
"created": created,
"model": MODEL_NAME,
"choices": [{
"index": 0,
"delta": {
"tool_calls": [{
"index": i,
"id": tc["id"],
"type": "function",
"function": {
"name": tc["function"]["name"],
"arguments": tc["function"]["arguments"]
}
}]
},
"finish_reason": None
}]
}
yield {"data": json.dumps(tc_chunk)}
finish_reason = "tool_calls"
# Send final chunk
final_chunk = {
"id": completion_id,
"object": "chat.completion.chunk",
"created": created,
"model": MODEL_NAME,
"choices": [{
"index": 0,
"delta": {},
"finish_reason": finish_reason
}]
}
yield {"data": json.dumps(final_chunk)}
except Exception as e:
error_chunk = {
"id": completion_id,
"object": "chat.completion.chunk",
"created": created,
"model": MODEL_NAME,
"choices": [{
"index": 0,
"delta": {"content": f"\n\n[Error: {str(e)}]"},
"finish_reason": "stop"
}]
}
yield {"data": json.dumps(error_chunk)}
yield {"data": "[DONE]"}
# ─── Main ─────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
uvicorn.run(
app,
host=HOST,
port=PORT,
log_level="info",
timeout_keep_alive=300,
)