lh4b / app.py
nagose's picture
Update app.py
14e8f86 verified
import logging
import json
import time
import uuid
from typing import List, Optional, Dict, Any, Union
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from llama_cpp import Llama
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ====================== 模型配置 ======================
# 使用 Hugging Face 上的 GGUF 模型(4B Q4_K_M 版本)
REPO_ID = "lmstudio-community/Qwen3.5-4B-GGUF"
FILENAME = "Qwen3.5-4B-Q4_K_M.gguf"
MODEL_ID = "qwen3.5-4b" # CoPaw 中配置的模型名称
# 加载模型(自动从 HF 下载并缓存)
logger.info(f"正在从 {REPO_ID} 加载模型 {FILENAME}...")
llm = Llama.from_pretrained(
repo_id=REPO_ID,
filename=FILENAME,
n_ctx=4096, # 上下文窗口,可根据需求调整
n_threads=None, # 自动使用所有 CPU 线程
verbose=False,
)
logger.info("模型加载完成!")
app = FastAPI(title="Qwen3.5-4B GGUF API (CoPaw兼容)")
# ====================== CORS 中间件 ======================
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ====================== CoPaw 所需端点 ======================
@app.get("/health")
async def health():
return {"status": "healthy"}
@app.get("/v1/me")
async def get_me():
return {
"id": "local-user",
"name": "Local User",
"email": "user@localhost",
"is_admin": True
}
@app.get("/v1/dashboard/bots")
async def get_bots():
return {"objects": []}
@app.get("/v1/models")
async def list_models():
return {
"object": "list",
"data": [
{
"id": MODEL_ID,
"object": "model",
"created": 1773000000,
"owned_by": "user"
}
]
}
# ====================== 请求/响应数据模型 ======================
class Message(BaseModel):
role: str
content: Optional[Union[str, List[Dict[str, Any]]]] = None
class ChatRequest(BaseModel):
messages: List[Message]
temperature: Optional[float] = 0.7
max_tokens: Optional[int] = 1024
model: Optional[str] = MODEL_ID
stream: Optional[bool] = False
tools: Optional[List[Dict[str, Any]]] = None
tool_choice: Optional[str] = None
# ====================== 辅助函数 ======================
def convert_content_to_str(content: Optional[Union[str, List[Dict[str, Any]]]]) -> str:
if content is None:
return ""
if isinstance(content, str):
return content
if isinstance(content, list):
texts = []
for part in content:
if isinstance(part, dict) and part.get("type") == "text":
texts.append(part.get("text", ""))
return "\n".join(texts)
return str(content)
# ====================== 聊天接口 ======================
@app.post("/v1/chat/completions")
async def chat_completions(req: ChatRequest):
# 转换消息格式
messages = [{"role": m.role, "content": convert_content_to_str(m.content)} for m in req.messages]
# 处理 tools:将工具描述合并到 system 消息中
if req.tools:
tools_json = json.dumps(req.tools, ensure_ascii=False)
tool_prompt = (
f"你是一个助手,可以使用以下工具:\n{tools_json}\n"
f"当用户的问题需要调用工具时,请输出 <tool_call>{{...}}</tool_call> 格式的 JSON。"
)
# 查找现有 system 消息,有则合并,否则创建
system_index = next((i for i, m in enumerate(messages) if m["role"] == "system"), None)
if system_index is not None:
messages[system_index]["content"] += "\n\n" + tool_prompt
else:
messages.insert(0, {"role": "system", "content": tool_prompt})
# 流式处理
if req.stream:
stream = llm.create_chat_completion_openai_v1(
messages=messages,
temperature=req.temperature,
max_tokens=req.max_tokens,
stream=True,
)
async def generate():
chunk_id = f"chatcmpl-{uuid.uuid4().hex}"
for chunk in stream:
if chunk.choices:
delta = chunk.choices[0].delta
finish_reason = chunk.choices[0].finish_reason
response_chunk = {
"id": chunk_id,
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": req.model,
"choices": [{
"index": 0,
"delta": delta.model_dump(exclude_none=True),
"finish_reason": finish_reason
}]
}
yield f"data: {json.dumps(response_chunk)}\n\n"
if finish_reason:
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
# 非流式处理
else:
response = llm.create_chat_completion_openai_v1(
messages=messages,
temperature=req.temperature,
max_tokens=req.max_tokens,
stream=False,
)
return response
@app.get("/")
async def root():
return {"status": "running", "model": f"{REPO_ID}/{FILENAME}"}