from fastapi import FastAPI, HTTPException, Request from fastapi.responses import StreamingResponse from pydantic import BaseModel import os import logging import torch import asyncio import time import psutil # 新增:用于CPU监控 from transformers import ( AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig, TextStreamer ) # -------------------------- # 1. 环境与性能优化配置(核心) # -------------------------- # 绑定CPU线程(2核专用配置,避免线程切换开销) os.environ["OMP_NUM_THREADS"] = "2" os.environ["MKL_NUM_THREADS"] = "2" os.environ["TOKENIZERS_PARALLELISM"] = "false" # 禁用tokenizer并行(2核效率低) # -------------------------- # 2. 日志配置(增强监控粒度) # -------------------------- logging.basicConfig( level=logging.INFO, format="%(asctime)s-%(name)s-%(levelname)s-%(module)s:%(lineno)d-%(message)s" ) logger = logging.getLogger("optimized_deepseek_math") app = FastAPI(title="优化版DeepSeek-Math推理服务(2核CPU适配)") # -------------------------- # 3. 模型配置(量化与加载优化) # -------------------------- MODEL_NAME = os.getenv("MODEL_NAME", "deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B") MODEL_REVISION = "main" HF_TOKEN = os.getenv("HUGGINGFACE_HUB_TOKEN") # 4bit量化参数调优(适配2核CPU计算特性) bnb_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_use_double_quant=True, bnb_4bit_quant_type="nf4", # 数学模型推荐nf4量化,精度损失小 bnb_4bit_compute_dtype=torch.float16, bnb_4bit_quant_storage_dtype=torch.uint8 # 存储类型降级,减少内存访问耗时 ) # -------------------------- # 4. 模型加载(添加硬件适配逻辑) # -------------------------- try: logger.info(f"开始加载模型:{MODEL_NAME}(4bit量化,2核CPU优化)") # 加载Tokenizer(禁用快速tokenizer,减少内存波动) tokenizer = AutoTokenizer.from_pretrained( MODEL_NAME, revision=MODEL_REVISION, token=HF_TOKEN, padding_side="right", trust_remote_code=True, use_fast=False # 2核CPU下,慢速tokenizer更稳定 ) if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token logger.info(f"已设置pad_token: {tokenizer.eos_token}") # 加载模型(强制CPU运行,禁用GPU检测) model = AutoModelForCausalLM.from_pretrained( MODEL_NAME, revision=MODEL_REVISION, quantization_config=bnb_config, device_map="cpu", # 2核环境强制CPU,避免自动分配逻辑消耗资源 token=HF_TOKEN, trust_remote_code=True, torch_dtype=torch.float16, low_cpu_mem_usage=True # 启用低内存模式,减少加载时峰值占用 ) # 验证CPU指令集支持(AVX2对数学计算加速明显) try: import subprocess avx2_support = subprocess.check_output( "grep -c avx2 /proc/cpuinfo", shell=True ).decode().strip() logger.info(f"CPU AVX2支持: {'是' if int(avx2_support) > 0 else '否'}") except Exception as e: logger.warning(f"AVX2检测失败: {str(e)}") # 流式生成器配置(减少中间缓存) streamer = TextStreamer( tokenizer, skip_prompt=True, skip_special_tokens=True, timeout=30.0 # 适配2核生成速度,避免超时 ) logger.info(f"模型加载完成!内存占用: {psutil.virtual_memory().used / 1024**3:.2f}GB") except Exception as e: logger.error(f"模型加载失败: {str(e)}", exc_info=True) raise SystemExit(f"服务终止: {str(e)}") # -------------------------- # 5. 请求模型(精简参数) # -------------------------- class NodeInferenceRequest(BaseModel): prompt: str max_tokens: int = 512 # 2核环境缩短默认长度,控制总耗时 is_math: bool = False request_id: str = None # -------------------------- # 6. 流式推理接口(核心优化) # -------------------------- @app.post("/node/stream-infer") async def stream_infer(req: NodeInferenceRequest, request: Request): request_id = req.request_id or f"req_{int(time.time()*1000)}" start_time = time.time() total_tokens = 0 first_token_time = None cpu_monitor_interval = 10 # 每生成10个token监控一次CPU try: # 记录请求基础信息 logger.info( f"请求开始 | request_id={request_id} | " f"prompt_len={len(req.prompt)} | max_tokens={req.max_tokens}" ) # 构建提示词(精简模板,减少无效计算) prompt = f"问题:{req.prompt}\n{'解答(含步骤)' if req.is_math else '回答'}:" # 输入处理(严格控制长度,避免2核CPU过载) inputs = tokenizer( prompt, return_tensors="pt", truncation=True, max_length=1536 # 预留512token给生成结果 ) input_tokens = len(inputs["input_ids"][0]) logger.info(f"输入处理完成 | input_tokens={input_tokens}") # 异步生成逻辑 async def generate_chunks(): nonlocal total_tokens, first_token_time loop = asyncio.get_running_loop() # 预计算生成参数(减少生成过程中的条件判断) gen_kwargs = { "input_ids": inputs.get("input_ids"), "attention_mask": inputs.get("attention_mask"), "streamer": streamer, "max_new_tokens": req.max_tokens, "do_sample": True, "temperature": 0.2 if req.is_math else 0.6, "top_p": 0.9 if req.is_math else 0.95, "pad_token_id": tokenizer.pad_token_id, "eos_token_id": tokenizer.eos_token_id, "repetition_penalty": 1.05 } # 启动生成并监控CPU def generate_and_monitor(): # 生成过程中每1秒记录一次CPU(独立线程) cpu_logger = None def log_cpu_usage(): while True: cpu_percent = psutil.cpu_percent(interval=1) per_core = psutil.cpu_percent(percpu=True) logger.info( f"CPU实时监控 | request_id={request_id} | " f"整体使用率={cpu_percent}% | 核心使用率={per_core}" ) time.sleep(100) # 启动CPU监控线程 import threading cpu_logger = threading.Thread(target=log_cpu_usage, daemon=True) cpu_logger.start() # 执行生成 try: return model.generate(** gen_kwargs) finally: # 生成结束后终止监控线程 if cpu_logger and cpu_logger.is_alive(): # 温和终止线程(避免资源泄漏) import ctypes ctypes.pythonapi.PyThreadState_SetAsyncExc( ctypes.c_long(cpu_logger.ident), ctypes.py_object(SystemExit) ) # 在 executor 中运行生成逻辑(带CPU监控) outputs = await loop.run_in_executor(None, generate_and_monitor) # 处理生成结果 generated_tokens = outputs[0][input_tokens:] total_tokens = len(generated_tokens) logger.info( f"生成完成 | request_id={request_id} | " f"generated_tokens={total_tokens} | " f"耗时={(time.time()-start_time):.2f}s" ) # 流式返回处理 for i, token in enumerate(generated_tokens): if i == 0: first_token_time = time.time() logger.info( f"首token生成 | request_id={request_id} | " f"延迟={(first_token_time - start_time):.2f}s" ) # 客户端断开连接检测 if await request.is_disconnected(): logger.warning(f"客户端断开 | request_id={request_id} | 已生成{i+1}token") break # 解码与转义 token_text = tokenizer.decode(token, skip_special_tokens=True) if token_text.endswith(tokenizer.eos_token): break escaped_text = token_text.replace('"', '\\"').replace('\n', '\\n') yield '{{"chunk":"{}","finish":false,"request_id":"{}"}}\n'.format(escaped_text, request_id) # 结束标识 yield '{"chunk":"","finish":true,"request_id":"{}"}\n'.format(request_id) return StreamingResponse(generate_chunks(), media_type="application/x-ndjson") except Exception as e: error_msg = f"推理失败: {str(e)}" logger.error( f"请求出错 | request_id={request_id} | " f"error={error_msg} | 耗时={(time.time()-start_time):.2f}s", exc_info=True ) raise HTTPException(status_code=500, detail=error_msg) finally: # 输出性能总结 elapsed_time = time.time() - start_time if total_tokens > 0 and elapsed_time > 0: speed = total_tokens / elapsed_time logger.info( f"请求总结 | request_id={request_id} | " f"总token={total_tokens} | 总耗时={elapsed_time:.2f}s | " f"平均速率={speed:.2f}token/s | " f"内存占用={psutil.virtual_memory().used / 1024**3:.2f}GB" ) # -------------------------- # 7. 增强版健康检查接口 # -------------------------- @app.get("/node/health") async def node_health(): # 实时硬件状态 cpu_percent = psutil.cpu_percent(interval=0.5) mem_usage = psutil.virtual_memory().percent model_available = isinstance(model, AutoModelForCausalLM) return { "status": "healthy" if model_available else "unhealthy", "model": MODEL_NAME, "hardware": { "cpu_cores": psutil.cpu_count(logical=False), "logical_cores": psutil.cpu_count(logical=True), "cpu_usage": f"{cpu_percent}%", "memory_usage": f"{mem_usage}%" }, "performance": { "target_speed": "1.5-2 token/s (2核CPU)", "quantization": "4bit NF4" }, "timestamp": time.strftime("%Y-%m-%d %H:%M:%S") } if __name__ == "__main__": import uvicorn # 启动参数优化(2核专用) uvicorn.run( app, host="0.0.0.0", port=7860, log_level="info", workers=1 # 2核环境禁用多worker,避免资源竞争 )