| import logging |
| import json |
| import time |
| import uuid |
| from typing import List, Optional, Dict, Any, Union |
|
|
| from fastapi import FastAPI |
| from fastapi.responses import StreamingResponse |
| from fastapi.middleware.cors import CORSMiddleware |
| from pydantic import BaseModel |
| from llama_cpp import Llama |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| |
| |
| REPO_ID = "lmstudio-community/Qwen3.5-4B-GGUF" |
| FILENAME = "Qwen3.5-4B-Q4_K_M.gguf" |
| MODEL_ID = "qwen3.5-4b" |
|
|
| |
| logger.info(f"正在从 {REPO_ID} 加载模型 {FILENAME}...") |
| llm = Llama.from_pretrained( |
| repo_id=REPO_ID, |
| filename=FILENAME, |
| n_ctx=4096, |
| n_threads=None, |
| verbose=False, |
| ) |
| logger.info("模型加载完成!") |
|
|
| app = FastAPI(title="Qwen3.5-4B GGUF API (CoPaw兼容)") |
|
|
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| @app.get("/health") |
| async def health(): |
| return {"status": "healthy"} |
|
|
| @app.get("/v1/me") |
| async def get_me(): |
| return { |
| "id": "local-user", |
| "name": "Local User", |
| "email": "user@localhost", |
| "is_admin": True |
| } |
|
|
| @app.get("/v1/dashboard/bots") |
| async def get_bots(): |
| return {"objects": []} |
|
|
| @app.get("/v1/models") |
| async def list_models(): |
| return { |
| "object": "list", |
| "data": [ |
| { |
| "id": MODEL_ID, |
| "object": "model", |
| "created": 1773000000, |
| "owned_by": "user" |
| } |
| ] |
| } |
|
|
| |
| class Message(BaseModel): |
| role: str |
| content: Optional[Union[str, List[Dict[str, Any]]]] = None |
|
|
| class ChatRequest(BaseModel): |
| messages: List[Message] |
| temperature: Optional[float] = 0.7 |
| max_tokens: Optional[int] = 1024 |
| model: Optional[str] = MODEL_ID |
| stream: Optional[bool] = False |
| tools: Optional[List[Dict[str, Any]]] = None |
| tool_choice: Optional[str] = None |
|
|
| |
| def convert_content_to_str(content: Optional[Union[str, List[Dict[str, Any]]]]) -> str: |
| if content is None: |
| return "" |
| if isinstance(content, str): |
| return content |
| if isinstance(content, list): |
| texts = [] |
| for part in content: |
| if isinstance(part, dict) and part.get("type") == "text": |
| texts.append(part.get("text", "")) |
| return "\n".join(texts) |
| return str(content) |
|
|
| |
| @app.post("/v1/chat/completions") |
| async def chat_completions(req: ChatRequest): |
| |
| messages = [{"role": m.role, "content": convert_content_to_str(m.content)} for m in req.messages] |
|
|
| |
| if req.tools: |
| tools_json = json.dumps(req.tools, ensure_ascii=False) |
| tool_prompt = ( |
| f"你是一个助手,可以使用以下工具:\n{tools_json}\n" |
| f"当用户的问题需要调用工具时,请输出 <tool_call>{{...}}</tool_call> 格式的 JSON。" |
| ) |
| |
| system_index = next((i for i, m in enumerate(messages) if m["role"] == "system"), None) |
| if system_index is not None: |
| messages[system_index]["content"] += "\n\n" + tool_prompt |
| else: |
| messages.insert(0, {"role": "system", "content": tool_prompt}) |
|
|
| |
| if req.stream: |
| stream = llm.create_chat_completion_openai_v1( |
| messages=messages, |
| temperature=req.temperature, |
| max_tokens=req.max_tokens, |
| stream=True, |
| ) |
|
|
| async def generate(): |
| chunk_id = f"chatcmpl-{uuid.uuid4().hex}" |
| for chunk in stream: |
| if chunk.choices: |
| delta = chunk.choices[0].delta |
| finish_reason = chunk.choices[0].finish_reason |
| response_chunk = { |
| "id": chunk_id, |
| "object": "chat.completion.chunk", |
| "created": int(time.time()), |
| "model": req.model, |
| "choices": [{ |
| "index": 0, |
| "delta": delta.model_dump(exclude_none=True), |
| "finish_reason": finish_reason |
| }] |
| } |
| yield f"data: {json.dumps(response_chunk)}\n\n" |
| if finish_reason: |
| yield "data: [DONE]\n\n" |
| return StreamingResponse(generate(), media_type="text/event-stream") |
|
|
| |
| else: |
| response = llm.create_chat_completion_openai_v1( |
| messages=messages, |
| temperature=req.temperature, |
| max_tokens=req.max_tokens, |
| stream=False, |
| ) |
| return response |
|
|
| @app.get("/") |
| async def root(): |
| return {"status": "running", "model": f"{REPO_ID}/{FILENAME}"} |