Spaces:
nagose
/
Runtime error

qw4b / app.py
nagose's picture
Update app.py
c18b8e3 verified
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig, TextIteratorStreamer
import torch
import json
import time
import uuid
import re
from typing import List, Optional, Dict, Any
from threading import Thread
# ====================== 模型配置 ======================
MODEL_NAME = "Qwen/Qwen2.5-7B-Instruct"
MODEL_ID = "qwen2.5-7b" # 自定义模型标识符,前端需与此一致
# 4-bit 量化配置(适用于 CPU/GPU)
bnb_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_use_double_quant=True,
bnb_4bit_quant_type="nf4",
bnb_4bit_compute_dtype=torch.float16
)
print("🔹 加载模型:Qwen2.5-7B-Instruct (4-bit 量化)")
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, trust_remote_code=True)
# 确保 tokenizer 有 pad_token
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
model = AutoModelForCausalLM.from_pretrained(
MODEL_NAME,
quantization_config=bnb_config,
device_map="auto", # 自动选择设备(CPU/GPU)
trust_remote_code=True,
low_cpu_mem_usage=True
)
print("✅ 模型加载完成")
app = FastAPI(title="Qwen2.5-7B API (OpenAI 兼容)")
# ====================== CORS 中间件 ======================
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ====================== CoPaw 所需额外端点 ======================
@app.get("/health")
async def health():
return {"status": "healthy"}
@app.get("/v1/me")
async def get_me():
return {
"id": "local-user",
"name": "Local User",
"email": "user@localhost",
"is_admin": True
}
@app.get("/v1/dashboard/bots")
async def get_bots():
return {"objects": []}
@app.get("/v1/models")
async def list_models():
"""返回 OpenAI 格式的模型列表"""
return {
"object": "list",
"data": [
{
"id": MODEL_ID,
"object": "model",
"created": 1773000000,
"owned_by": "qwen"
}
]
}
# ====================== 请求/响应数据模型 ======================
class Message(BaseModel):
role: str
content: Optional[str] = None
class ChatRequest(BaseModel):
messages: List[Message]
temperature: Optional[float] = 0.7
max_tokens: Optional[int] = 1024
model: Optional[str] = MODEL_ID
stream: Optional[bool] = False
tools: Optional[List[Dict[str, Any]]] = None
tool_choice: Optional[str] = None
# ====================== 流式生成 ======================
def stream_generate(messages, temperature=0.7, max_new_tokens=1024):
try:
# 使用 chat template 构建提示词
text = tokenizer.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
inputs = tokenizer([text], return_tensors="pt", padding=True).to(model.device)
streamer = TextIteratorStreamer(
tokenizer,
skip_prompt=True,
skip_special_tokens=True,
timeout=60.0
)
gen_kwargs = {
**inputs,
"streamer": streamer,
"max_new_tokens": max_new_tokens,
"temperature": temperature,
"do_sample": temperature > 0,
"pad_token_id": tokenizer.pad_token_id,
"eos_token_id": tokenizer.eos_token_id
}
thread = Thread(target=model.generate, kwargs=gen_kwargs)
thread.start()
# 首先发送角色信息(OpenAI 格式要求)
chunk_id = f"chatcmpl-{uuid.uuid4().hex}"
yield f"data: {json.dumps({'id': chunk_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': MODEL_ID, 'choices': [{'index': 0, 'delta': {'role': 'assistant'}, 'finish_reason': None}]})}\n\n"
for new_text in streamer:
if new_text:
chunk = {
"id": chunk_id,
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": MODEL_ID,
"choices": [{
"index": 0,
"delta": {"content": new_text},
"finish_reason": None
}]
}
yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n"
# 发送结束 chunk
yield f"data: {json.dumps({'id': chunk_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': MODEL_ID, 'choices': [{'index': 0, 'delta': {}, 'finish_reason': 'stop'}]})}\n\n"
yield "data: [DONE]\n\n"
except Exception as e:
yield f"data: {json.dumps({'error': {'message': str(e)}})}\n\n"
# ====================== 非流式生成(支持工具调用)======================
@app.post("/v1/chat/completions")
async def chat_completions(req: ChatRequest):
# 构建基础消息列表
base_messages = [{"role": m.role, "content": m.content} for m in req.messages]
# 如果提供了 tools,将其转换为系统提示(Qwen 2.5 推荐方式)
if req.tools:
tools_json = json.dumps(req.tools, ensure_ascii=False)
# 构造工具调用提示,要求输出特定格式
tool_prompt = f"""你是一个助手,可以使用以下工具:
{tools_json}
当用户的问题需要调用工具时,请输出 <tool_call>...</tool_call> 标签,内部是一个 JSON 对象,必须包含 "name" 和 "arguments" 字段。arguments 是一个对象,包含工具所需的参数。
例如:<tool_call>{{"name": "get_weather", "arguments": {{"location": "Beijing"}}}}</tool_call>
如果不需要调用工具,则正常回答。"""
messages = [{"role": "system", "content": tool_prompt}] + base_messages
else:
messages = base_messages
# 流式处理
if req.stream:
return StreamingResponse(
stream_generate(messages, req.temperature, req.max_tokens),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Content-Type": "text/event-stream"
}
)
# 非流式生成
text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
inputs = tokenizer([text], return_tensors="pt", padding=True).to(model.device)
with torch.no_grad():
outputs = model.generate(
**inputs,
max_new_tokens=req.max_tokens,
temperature=req.temperature,
do_sample=req.temperature > 0,
pad_token_id=tokenizer.pad_token_id,
eos_token_id=tokenizer.eos_token_id
)
response = tokenizer.decode(outputs[0][len(inputs.input_ids[0]):], skip_special_tokens=True)
# 解析工具调用(Qwen 返回 <tool_call>...</tool_call> 标签)
tool_calls = None
clean_response = response
tool_call_matches = re.findall(r'<tool_call>(.*?)</tool_call>', response, re.DOTALL)
if tool_call_matches:
tool_calls = []
for match in tool_call_matches:
try:
tool_call_data = json.loads(match)
# 转换为 OpenAI 工具调用格式
tool_calls.append({
"id": f"call_{uuid.uuid4().hex[:8]}",
"type": "function",
"function": {
"name": tool_call_data.get("name"),
"arguments": json.dumps(tool_call_data.get("arguments", {}), ensure_ascii=False)
}
})
except Exception as e:
print(f"工具调用解析失败: {e}")
# 移除所有 tool_call 标签,保留剩余文本(如果有)
clean_response = re.sub(r'<tool_call>.*?</tool_call>', '', response, flags=re.DOTALL).strip()
# 计算 token 用量
prompt_tokens = len(inputs.input_ids[0])
completion_tokens = len(outputs[0]) - prompt_tokens
# 构建 OpenAI 格式响应
return {
"id": f"chatcmpl-{uuid.uuid4().hex}",
"object": "chat.completion",
"created": int(time.time()),
"model": req.model,
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": clean_response if not tool_calls else None,
"tool_calls": tool_calls
},
"finish_reason": "tool_calls" if tool_calls else "stop"
}],
"usage": {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": prompt_tokens + completion_tokens
}
}
@app.get("/")
async def root():
return {"status": "running", "model": MODEL_NAME}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)