gpt-chat-api-deepseek / inference_node.py
fiewolf1000's picture
Update inference_node.py
8e7b0f7 verified
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import os
import logging
import torch
import asyncio
import time
import psutil # 新增:用于CPU监控
from transformers import (
AutoModelForCausalLM, AutoTokenizer,
BitsAndBytesConfig, TextStreamer
)
# --------------------------
# 1. 环境与性能优化配置(核心)
# --------------------------
# 绑定CPU线程(2核专用配置,避免线程切换开销)
os.environ["OMP_NUM_THREADS"] = "2"
os.environ["MKL_NUM_THREADS"] = "2"
os.environ["TOKENIZERS_PARALLELISM"] = "false" # 禁用tokenizer并行(2核效率低)
# --------------------------
# 2. 日志配置(增强监控粒度)
# --------------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s-%(name)s-%(levelname)s-%(module)s:%(lineno)d-%(message)s"
)
logger = logging.getLogger("optimized_deepseek_math")
app = FastAPI(title="优化版DeepSeek-Math推理服务(2核CPU适配)")
# --------------------------
# 3. 模型配置(量化与加载优化)
# --------------------------
MODEL_NAME = os.getenv("MODEL_NAME", "deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B")
MODEL_REVISION = "main"
HF_TOKEN = os.getenv("HUGGINGFACE_HUB_TOKEN")
# 4bit量化参数调优(适配2核CPU计算特性)
bnb_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_use_double_quant=True,
bnb_4bit_quant_type="nf4", # 数学模型推荐nf4量化,精度损失小
bnb_4bit_compute_dtype=torch.float16,
bnb_4bit_quant_storage_dtype=torch.uint8 # 存储类型降级,减少内存访问耗时
)
# --------------------------
# 4. 模型加载(添加硬件适配逻辑)
# --------------------------
try:
logger.info(f"开始加载模型:{MODEL_NAME}(4bit量化,2核CPU优化)")
# 加载Tokenizer(禁用快速tokenizer,减少内存波动)
tokenizer = AutoTokenizer.from_pretrained(
MODEL_NAME,
revision=MODEL_REVISION,
token=HF_TOKEN,
padding_side="right",
trust_remote_code=True,
use_fast=False # 2核CPU下,慢速tokenizer更稳定
)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
logger.info(f"已设置pad_token: {tokenizer.eos_token}")
# 加载模型(强制CPU运行,禁用GPU检测)
model = AutoModelForCausalLM.from_pretrained(
MODEL_NAME,
revision=MODEL_REVISION,
quantization_config=bnb_config,
device_map="cpu", # 2核环境强制CPU,避免自动分配逻辑消耗资源
token=HF_TOKEN,
trust_remote_code=True,
torch_dtype=torch.float16,
low_cpu_mem_usage=True # 启用低内存模式,减少加载时峰值占用
)
# 验证CPU指令集支持(AVX2对数学计算加速明显)
try:
import subprocess
avx2_support = subprocess.check_output(
"grep -c avx2 /proc/cpuinfo", shell=True
).decode().strip()
logger.info(f"CPU AVX2支持: {'是' if int(avx2_support) > 0 else '否'}")
except Exception as e:
logger.warning(f"AVX2检测失败: {str(e)}")
# 流式生成器配置(减少中间缓存)
streamer = TextStreamer(
tokenizer,
skip_prompt=True,
skip_special_tokens=True,
timeout=30.0 # 适配2核生成速度,避免超时
)
logger.info(f"模型加载完成!内存占用: {psutil.virtual_memory().used / 1024**3:.2f}GB")
except Exception as e:
logger.error(f"模型加载失败: {str(e)}", exc_info=True)
raise SystemExit(f"服务终止: {str(e)}")
# --------------------------
# 5. 请求模型(精简参数)
# --------------------------
class NodeInferenceRequest(BaseModel):
prompt: str
max_tokens: int = 512 # 2核环境缩短默认长度,控制总耗时
is_math: bool = False
request_id: str = None
# --------------------------
# 6. 流式推理接口(核心优化)
# --------------------------
@app.post("/node/stream-infer")
async def stream_infer(req: NodeInferenceRequest, request: Request):
request_id = req.request_id or f"req_{int(time.time()*1000)}"
start_time = time.time()
total_tokens = 0
first_token_time = None
cpu_monitor_interval = 10 # 每生成10个token监控一次CPU
try:
# 记录请求基础信息
logger.info(
f"请求开始 | request_id={request_id} | "
f"prompt_len={len(req.prompt)} | max_tokens={req.max_tokens}"
)
# 构建提示词(精简模板,减少无效计算)
prompt = f"问题:{req.prompt}\n{'解答(含步骤)' if req.is_math else '回答'}:"
# 输入处理(严格控制长度,避免2核CPU过载)
inputs = tokenizer(
prompt,
return_tensors="pt",
truncation=True,
max_length=1536 # 预留512token给生成结果
)
input_tokens = len(inputs["input_ids"][0])
logger.info(f"输入处理完成 | input_tokens={input_tokens}")
# 异步生成逻辑
async def generate_chunks():
nonlocal total_tokens, first_token_time
loop = asyncio.get_running_loop()
# 预计算生成参数(减少生成过程中的条件判断)
gen_kwargs = {
"input_ids": inputs.get("input_ids"),
"attention_mask": inputs.get("attention_mask"),
"streamer": streamer,
"max_new_tokens": req.max_tokens,
"do_sample": True,
"temperature": 0.2 if req.is_math else 0.6,
"top_p": 0.9 if req.is_math else 0.95,
"pad_token_id": tokenizer.pad_token_id,
"eos_token_id": tokenizer.eos_token_id,
"repetition_penalty": 1.05
}
# 启动生成并监控CPU
def generate_and_monitor():
# 生成过程中每1秒记录一次CPU(独立线程)
cpu_logger = None
def log_cpu_usage():
while True:
cpu_percent = psutil.cpu_percent(interval=1)
per_core = psutil.cpu_percent(percpu=True)
logger.info(
f"CPU实时监控 | request_id={request_id} | "
f"整体使用率={cpu_percent}% | 核心使用率={per_core}"
)
time.sleep(100)
# 启动CPU监控线程
import threading
cpu_logger = threading.Thread(target=log_cpu_usage, daemon=True)
cpu_logger.start()
# 执行生成
try:
return model.generate(** gen_kwargs)
finally:
# 生成结束后终止监控线程
if cpu_logger and cpu_logger.is_alive():
# 温和终止线程(避免资源泄漏)
import ctypes
ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(cpu_logger.ident),
ctypes.py_object(SystemExit)
)
# 在 executor 中运行生成逻辑(带CPU监控)
outputs = await loop.run_in_executor(None, generate_and_monitor)
# 处理生成结果
generated_tokens = outputs[0][input_tokens:]
total_tokens = len(generated_tokens)
logger.info(
f"生成完成 | request_id={request_id} | "
f"generated_tokens={total_tokens} | "
f"耗时={(time.time()-start_time):.2f}s"
)
# 流式返回处理
for i, token in enumerate(generated_tokens):
if i == 0:
first_token_time = time.time()
logger.info(
f"首token生成 | request_id={request_id} | "
f"延迟={(first_token_time - start_time):.2f}s"
)
# 客户端断开连接检测
if await request.is_disconnected():
logger.warning(f"客户端断开 | request_id={request_id} | 已生成{i+1}token")
break
# 解码与转义
token_text = tokenizer.decode(token, skip_special_tokens=True)
if token_text.endswith(tokenizer.eos_token):
break
escaped_text = token_text.replace('"', '\\"').replace('\n', '\\n')
yield '{{"chunk":"{}","finish":false,"request_id":"{}"}}\n'.format(escaped_text, request_id)
# 结束标识
yield '{"chunk":"","finish":true,"request_id":"{}"}\n'.format(request_id)
return StreamingResponse(generate_chunks(), media_type="application/x-ndjson")
except Exception as e:
error_msg = f"推理失败: {str(e)}"
logger.error(
f"请求出错 | request_id={request_id} | "
f"error={error_msg} | 耗时={(time.time()-start_time):.2f}s",
exc_info=True
)
raise HTTPException(status_code=500, detail=error_msg)
finally:
# 输出性能总结
elapsed_time = time.time() - start_time
if total_tokens > 0 and elapsed_time > 0:
speed = total_tokens / elapsed_time
logger.info(
f"请求总结 | request_id={request_id} | "
f"总token={total_tokens} | 总耗时={elapsed_time:.2f}s | "
f"平均速率={speed:.2f}token/s | "
f"内存占用={psutil.virtual_memory().used / 1024**3:.2f}GB"
)
# --------------------------
# 7. 增强版健康检查接口
# --------------------------
@app.get("/node/health")
async def node_health():
# 实时硬件状态
cpu_percent = psutil.cpu_percent(interval=0.5)
mem_usage = psutil.virtual_memory().percent
model_available = isinstance(model, AutoModelForCausalLM)
return {
"status": "healthy" if model_available else "unhealthy",
"model": MODEL_NAME,
"hardware": {
"cpu_cores": psutil.cpu_count(logical=False),
"logical_cores": psutil.cpu_count(logical=True),
"cpu_usage": f"{cpu_percent}%",
"memory_usage": f"{mem_usage}%"
},
"performance": {
"target_speed": "1.5-2 token/s (2核CPU)",
"quantization": "4bit NF4"
},
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
}
if __name__ == "__main__":
import uvicorn
# 启动参数优化(2核专用)
uvicorn.run(
app,
host="0.0.0.0",
port=7860,
log_level="info",
workers=1 # 2核环境禁用多worker,避免资源竞争
)