Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException, Request | |
| from fastapi.responses import StreamingResponse | |
| from pydantic import BaseModel | |
| import os | |
| import logging | |
| import torch | |
| import asyncio | |
| import time | |
| import psutil # 新增:用于CPU监控 | |
| from transformers import ( | |
| AutoModelForCausalLM, AutoTokenizer, | |
| BitsAndBytesConfig, TextStreamer | |
| ) | |
| # -------------------------- | |
| # 1. 环境与性能优化配置(核心) | |
| # -------------------------- | |
| # 绑定CPU线程(2核专用配置,避免线程切换开销) | |
| os.environ["OMP_NUM_THREADS"] = "2" | |
| os.environ["MKL_NUM_THREADS"] = "2" | |
| os.environ["TOKENIZERS_PARALLELISM"] = "false" # 禁用tokenizer并行(2核效率低) | |
| # -------------------------- | |
| # 2. 日志配置(增强监控粒度) | |
| # -------------------------- | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s-%(name)s-%(levelname)s-%(module)s:%(lineno)d-%(message)s" | |
| ) | |
| logger = logging.getLogger("optimized_deepseek_math") | |
| app = FastAPI(title="优化版DeepSeek-Math推理服务(2核CPU适配)") | |
| # -------------------------- | |
| # 3. 模型配置(量化与加载优化) | |
| # -------------------------- | |
| MODEL_NAME = os.getenv("MODEL_NAME", "deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B") | |
| MODEL_REVISION = "main" | |
| HF_TOKEN = os.getenv("HUGGINGFACE_HUB_TOKEN") | |
| # 4bit量化参数调优(适配2核CPU计算特性) | |
| bnb_config = BitsAndBytesConfig( | |
| load_in_4bit=True, | |
| bnb_4bit_use_double_quant=True, | |
| bnb_4bit_quant_type="nf4", # 数学模型推荐nf4量化,精度损失小 | |
| bnb_4bit_compute_dtype=torch.float16, | |
| bnb_4bit_quant_storage_dtype=torch.uint8 # 存储类型降级,减少内存访问耗时 | |
| ) | |
| # -------------------------- | |
| # 4. 模型加载(添加硬件适配逻辑) | |
| # -------------------------- | |
| try: | |
| logger.info(f"开始加载模型:{MODEL_NAME}(4bit量化,2核CPU优化)") | |
| # 加载Tokenizer(禁用快速tokenizer,减少内存波动) | |
| tokenizer = AutoTokenizer.from_pretrained( | |
| MODEL_NAME, | |
| revision=MODEL_REVISION, | |
| token=HF_TOKEN, | |
| padding_side="right", | |
| trust_remote_code=True, | |
| use_fast=False # 2核CPU下,慢速tokenizer更稳定 | |
| ) | |
| if tokenizer.pad_token is None: | |
| tokenizer.pad_token = tokenizer.eos_token | |
| logger.info(f"已设置pad_token: {tokenizer.eos_token}") | |
| # 加载模型(强制CPU运行,禁用GPU检测) | |
| model = AutoModelForCausalLM.from_pretrained( | |
| MODEL_NAME, | |
| revision=MODEL_REVISION, | |
| quantization_config=bnb_config, | |
| device_map="cpu", # 2核环境强制CPU,避免自动分配逻辑消耗资源 | |
| token=HF_TOKEN, | |
| trust_remote_code=True, | |
| torch_dtype=torch.float16, | |
| low_cpu_mem_usage=True # 启用低内存模式,减少加载时峰值占用 | |
| ) | |
| # 验证CPU指令集支持(AVX2对数学计算加速明显) | |
| try: | |
| import subprocess | |
| avx2_support = subprocess.check_output( | |
| "grep -c avx2 /proc/cpuinfo", shell=True | |
| ).decode().strip() | |
| logger.info(f"CPU AVX2支持: {'是' if int(avx2_support) > 0 else '否'}") | |
| except Exception as e: | |
| logger.warning(f"AVX2检测失败: {str(e)}") | |
| # 流式生成器配置(减少中间缓存) | |
| streamer = TextStreamer( | |
| tokenizer, | |
| skip_prompt=True, | |
| skip_special_tokens=True, | |
| timeout=30.0 # 适配2核生成速度,避免超时 | |
| ) | |
| logger.info(f"模型加载完成!内存占用: {psutil.virtual_memory().used / 1024**3:.2f}GB") | |
| except Exception as e: | |
| logger.error(f"模型加载失败: {str(e)}", exc_info=True) | |
| raise SystemExit(f"服务终止: {str(e)}") | |
| # -------------------------- | |
| # 5. 请求模型(精简参数) | |
| # -------------------------- | |
| class NodeInferenceRequest(BaseModel): | |
| prompt: str | |
| max_tokens: int = 512 # 2核环境缩短默认长度,控制总耗时 | |
| is_math: bool = False | |
| request_id: str = None | |
| # -------------------------- | |
| # 6. 流式推理接口(核心优化) | |
| # -------------------------- | |
| async def stream_infer(req: NodeInferenceRequest, request: Request): | |
| request_id = req.request_id or f"req_{int(time.time()*1000)}" | |
| start_time = time.time() | |
| total_tokens = 0 | |
| first_token_time = None | |
| cpu_monitor_interval = 10 # 每生成10个token监控一次CPU | |
| try: | |
| # 记录请求基础信息 | |
| logger.info( | |
| f"请求开始 | request_id={request_id} | " | |
| f"prompt_len={len(req.prompt)} | max_tokens={req.max_tokens}" | |
| ) | |
| # 构建提示词(精简模板,减少无效计算) | |
| prompt = f"问题:{req.prompt}\n{'解答(含步骤)' if req.is_math else '回答'}:" | |
| # 输入处理(严格控制长度,避免2核CPU过载) | |
| inputs = tokenizer( | |
| prompt, | |
| return_tensors="pt", | |
| truncation=True, | |
| max_length=1536 # 预留512token给生成结果 | |
| ) | |
| input_tokens = len(inputs["input_ids"][0]) | |
| logger.info(f"输入处理完成 | input_tokens={input_tokens}") | |
| # 异步生成逻辑 | |
| async def generate_chunks(): | |
| nonlocal total_tokens, first_token_time | |
| loop = asyncio.get_running_loop() | |
| # 预计算生成参数(减少生成过程中的条件判断) | |
| gen_kwargs = { | |
| "input_ids": inputs.get("input_ids"), | |
| "attention_mask": inputs.get("attention_mask"), | |
| "streamer": streamer, | |
| "max_new_tokens": req.max_tokens, | |
| "do_sample": True, | |
| "temperature": 0.2 if req.is_math else 0.6, | |
| "top_p": 0.9 if req.is_math else 0.95, | |
| "pad_token_id": tokenizer.pad_token_id, | |
| "eos_token_id": tokenizer.eos_token_id, | |
| "repetition_penalty": 1.05 | |
| } | |
| # 启动生成并监控CPU | |
| def generate_and_monitor(): | |
| # 生成过程中每1秒记录一次CPU(独立线程) | |
| cpu_logger = None | |
| def log_cpu_usage(): | |
| while True: | |
| cpu_percent = psutil.cpu_percent(interval=1) | |
| per_core = psutil.cpu_percent(percpu=True) | |
| logger.info( | |
| f"CPU实时监控 | request_id={request_id} | " | |
| f"整体使用率={cpu_percent}% | 核心使用率={per_core}" | |
| ) | |
| time.sleep(100) | |
| # 启动CPU监控线程 | |
| import threading | |
| cpu_logger = threading.Thread(target=log_cpu_usage, daemon=True) | |
| cpu_logger.start() | |
| # 执行生成 | |
| try: | |
| return model.generate(** gen_kwargs) | |
| finally: | |
| # 生成结束后终止监控线程 | |
| if cpu_logger and cpu_logger.is_alive(): | |
| # 温和终止线程(避免资源泄漏) | |
| import ctypes | |
| ctypes.pythonapi.PyThreadState_SetAsyncExc( | |
| ctypes.c_long(cpu_logger.ident), | |
| ctypes.py_object(SystemExit) | |
| ) | |
| # 在 executor 中运行生成逻辑(带CPU监控) | |
| outputs = await loop.run_in_executor(None, generate_and_monitor) | |
| # 处理生成结果 | |
| generated_tokens = outputs[0][input_tokens:] | |
| total_tokens = len(generated_tokens) | |
| logger.info( | |
| f"生成完成 | request_id={request_id} | " | |
| f"generated_tokens={total_tokens} | " | |
| f"耗时={(time.time()-start_time):.2f}s" | |
| ) | |
| # 流式返回处理 | |
| for i, token in enumerate(generated_tokens): | |
| if i == 0: | |
| first_token_time = time.time() | |
| logger.info( | |
| f"首token生成 | request_id={request_id} | " | |
| f"延迟={(first_token_time - start_time):.2f}s" | |
| ) | |
| # 客户端断开连接检测 | |
| if await request.is_disconnected(): | |
| logger.warning(f"客户端断开 | request_id={request_id} | 已生成{i+1}token") | |
| break | |
| # 解码与转义 | |
| token_text = tokenizer.decode(token, skip_special_tokens=True) | |
| if token_text.endswith(tokenizer.eos_token): | |
| break | |
| escaped_text = token_text.replace('"', '\\"').replace('\n', '\\n') | |
| yield '{{"chunk":"{}","finish":false,"request_id":"{}"}}\n'.format(escaped_text, request_id) | |
| # 结束标识 | |
| yield '{"chunk":"","finish":true,"request_id":"{}"}\n'.format(request_id) | |
| return StreamingResponse(generate_chunks(), media_type="application/x-ndjson") | |
| except Exception as e: | |
| error_msg = f"推理失败: {str(e)}" | |
| logger.error( | |
| f"请求出错 | request_id={request_id} | " | |
| f"error={error_msg} | 耗时={(time.time()-start_time):.2f}s", | |
| exc_info=True | |
| ) | |
| raise HTTPException(status_code=500, detail=error_msg) | |
| finally: | |
| # 输出性能总结 | |
| elapsed_time = time.time() - start_time | |
| if total_tokens > 0 and elapsed_time > 0: | |
| speed = total_tokens / elapsed_time | |
| logger.info( | |
| f"请求总结 | request_id={request_id} | " | |
| f"总token={total_tokens} | 总耗时={elapsed_time:.2f}s | " | |
| f"平均速率={speed:.2f}token/s | " | |
| f"内存占用={psutil.virtual_memory().used / 1024**3:.2f}GB" | |
| ) | |
| # -------------------------- | |
| # 7. 增强版健康检查接口 | |
| # -------------------------- | |
| async def node_health(): | |
| # 实时硬件状态 | |
| cpu_percent = psutil.cpu_percent(interval=0.5) | |
| mem_usage = psutil.virtual_memory().percent | |
| model_available = isinstance(model, AutoModelForCausalLM) | |
| return { | |
| "status": "healthy" if model_available else "unhealthy", | |
| "model": MODEL_NAME, | |
| "hardware": { | |
| "cpu_cores": psutil.cpu_count(logical=False), | |
| "logical_cores": psutil.cpu_count(logical=True), | |
| "cpu_usage": f"{cpu_percent}%", | |
| "memory_usage": f"{mem_usage}%" | |
| }, | |
| "performance": { | |
| "target_speed": "1.5-2 token/s (2核CPU)", | |
| "quantization": "4bit NF4" | |
| }, | |
| "timestamp": time.strftime("%Y-%m-%d %H:%M:%S") | |
| } | |
| if __name__ == "__main__": | |
| import uvicorn | |
| # 启动参数优化(2核专用) | |
| uvicorn.run( | |
| app, | |
| host="0.0.0.0", | |
| port=7860, | |
| log_level="info", | |
| workers=1 # 2核环境禁用多worker,避免资源竞争 | |
| ) | |