File size: 11,182 Bytes
91bd68b
 
 
 
 
 
aa269af
59a13f0
ca2bc68
91bd68b
 
 
 
 
ca2bc68
 
 
 
 
 
 
 
 
 
 
59a13f0
ca2bc68
59a13f0
 
ca2bc68
 
91bd68b
ca2bc68
 
 
8e7b0f7
ca2bc68
 
91bd68b
ca2bc68
91bd68b
 
 
ca2bc68
 
 
91bd68b
 
ca2bc68
 
 
aa269af
ca2bc68
 
 
aa269af
 
59a13f0
1b4691b
a31d0b8
ca2bc68
 
aa269af
1b4691b
 
ca2bc68
1b4691b
ca2bc68
aa269af
 
59a13f0
aa269af
ca2bc68
a31d0b8
59a13f0
ca2bc68
 
aa269af
59a13f0
ca2bc68
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
aa269af
ca2bc68
 
91bd68b
ca2bc68
 
 
91bd68b
ca2bc68
 
 
 
91bd68b
ca2bc68
 
 
91bd68b
 
ca2bc68
59a13f0
 
 
ca2bc68
59a13f0
91bd68b
ca2bc68
59a13f0
ca2bc68
 
59a13f0
 
ca2bc68
 
9b13845
ca2bc68
ec8e47c
9b13845
ec8e47c
 
ca2bc68
59a13f0
ca2bc68
 
aa269af
ca2bc68
d5049a2
59a13f0
 
6b2cd32
ca2bc68
5847836
ca2bc68
5847836
 
 
 
 
 
 
 
 
 
 
59a13f0
ca2bc68
 
 
 
 
 
 
 
 
 
 
 
276d1d1
ca2bc68
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
59a13f0
 
ca2bc68
59a13f0
 
ca2bc68
 
 
59a13f0
 
ca2bc68
59a13f0
 
 
 
ca2bc68
 
59a13f0
 
ca2bc68
91bd68b
ca2bc68
91bd68b
59a13f0
ca2bc68
91bd68b
9b13845
 
59a13f0
1b4691b
59a13f0
 
ca2bc68
59a13f0
91bd68b
a31d0b8
91bd68b
 
ca2bc68
59a13f0
ca2bc68
 
59a13f0
 
1b4691b
59a13f0
ca2bc68
59a13f0
 
 
 
ca2bc68
 
 
 
59a13f0
91bd68b
ca2bc68
 
 
91bd68b
 
ca2bc68
 
 
59a13f0
 
aa269af
ca2bc68
aa269af
ca2bc68
 
 
 
 
 
 
 
 
 
 
aa269af
91bd68b
 
 
ca2bc68
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import os
import logging
import torch
import asyncio
import time
import psutil  # 新增:用于CPU监控
from transformers import (
    AutoModelForCausalLM, AutoTokenizer,
    BitsAndBytesConfig, TextStreamer
)

# --------------------------
# 1. 环境与性能优化配置(核心)
# --------------------------
# 绑定CPU线程(2核专用配置,避免线程切换开销)
os.environ["OMP_NUM_THREADS"] = "2"
os.environ["MKL_NUM_THREADS"] = "2"
os.environ["TOKENIZERS_PARALLELISM"] = "false"  # 禁用tokenizer并行(2核效率低)

# --------------------------
# 2. 日志配置(增强监控粒度)
# --------------------------
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s-%(name)s-%(levelname)s-%(module)s:%(lineno)d-%(message)s"
)
logger = logging.getLogger("optimized_deepseek_math")
app = FastAPI(title="优化版DeepSeek-Math推理服务(2核CPU适配)")

# --------------------------
# 3. 模型配置(量化与加载优化)
# --------------------------
MODEL_NAME = os.getenv("MODEL_NAME", "deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B")
MODEL_REVISION = "main"
HF_TOKEN = os.getenv("HUGGINGFACE_HUB_TOKEN")

# 4bit量化参数调优(适配2核CPU计算特性)
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_use_double_quant=True,
    bnb_4bit_quant_type="nf4",  # 数学模型推荐nf4量化,精度损失小
    bnb_4bit_compute_dtype=torch.float16,
    bnb_4bit_quant_storage_dtype=torch.uint8  # 存储类型降级,减少内存访问耗时
)

# --------------------------
# 4. 模型加载(添加硬件适配逻辑)
# --------------------------
try:
    logger.info(f"开始加载模型:{MODEL_NAME}(4bit量化,2核CPU优化)")
    
    # 加载Tokenizer(禁用快速tokenizer,减少内存波动)
    tokenizer = AutoTokenizer.from_pretrained(
        MODEL_NAME,
        revision=MODEL_REVISION,
        token=HF_TOKEN,
        padding_side="right",
        trust_remote_code=True,
        use_fast=False  # 2核CPU下,慢速tokenizer更稳定
    )
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token
        logger.info(f"已设置pad_token: {tokenizer.eos_token}")
    
    # 加载模型(强制CPU运行,禁用GPU检测)
    model = AutoModelForCausalLM.from_pretrained(
        MODEL_NAME,
        revision=MODEL_REVISION,
        quantization_config=bnb_config,
        device_map="cpu",  # 2核环境强制CPU,避免自动分配逻辑消耗资源
        token=HF_TOKEN,
        trust_remote_code=True,
        torch_dtype=torch.float16,
        low_cpu_mem_usage=True  # 启用低内存模式,减少加载时峰值占用
    )
    
    # 验证CPU指令集支持(AVX2对数学计算加速明显)
    try:
        import subprocess
        avx2_support = subprocess.check_output(
            "grep -c avx2 /proc/cpuinfo", shell=True
        ).decode().strip()
        logger.info(f"CPU AVX2支持: {'是' if int(avx2_support) > 0 else '否'}")
    except Exception as e:
        logger.warning(f"AVX2检测失败: {str(e)}")
    
    # 流式生成器配置(减少中间缓存)
    streamer = TextStreamer(
        tokenizer,
        skip_prompt=True,
        skip_special_tokens=True,
        timeout=30.0  # 适配2核生成速度,避免超时
    )
    
    logger.info(f"模型加载完成!内存占用: {psutil.virtual_memory().used / 1024**3:.2f}GB")
except Exception as e:
    logger.error(f"模型加载失败: {str(e)}", exc_info=True)
    raise SystemExit(f"服务终止: {str(e)}")

# --------------------------
# 5. 请求模型(精简参数)
# --------------------------
class NodeInferenceRequest(BaseModel):
    prompt: str
    max_tokens: int = 512  # 2核环境缩短默认长度,控制总耗时
    is_math: bool = False
    request_id: str = None

# --------------------------
# 6. 流式推理接口(核心优化)
# --------------------------
@app.post("/node/stream-infer")
async def stream_infer(req: NodeInferenceRequest, request: Request):
    request_id = req.request_id or f"req_{int(time.time()*1000)}"
    start_time = time.time()
    total_tokens = 0
    first_token_time = None
    cpu_monitor_interval = 10  # 每生成10个token监控一次CPU
    
    try:
        # 记录请求基础信息
        logger.info(
            f"请求开始 | request_id={request_id} | "
            f"prompt_len={len(req.prompt)} | max_tokens={req.max_tokens}"
        )
        
        # 构建提示词(精简模板,减少无效计算)
        prompt = f"问题:{req.prompt}\n{'解答(含步骤)' if req.is_math else '回答'}:"
        
        # 输入处理(严格控制长度,避免2核CPU过载)
        inputs = tokenizer(
            prompt,
            return_tensors="pt",
            truncation=True,
            max_length=1536  # 预留512token给生成结果
        )
        input_tokens = len(inputs["input_ids"][0])
        logger.info(f"输入处理完成 | input_tokens={input_tokens}")

        # 异步生成逻辑
        async def generate_chunks():
            nonlocal total_tokens, first_token_time
            
            loop = asyncio.get_running_loop()
            # 预计算生成参数(减少生成过程中的条件判断)

            gen_kwargs = {
                "input_ids": inputs.get("input_ids"),
                "attention_mask": inputs.get("attention_mask"),
                "streamer": streamer,
                "max_new_tokens": req.max_tokens,
                "do_sample": True,
                "temperature": 0.2 if req.is_math else 0.6,
                "top_p": 0.9 if req.is_math else 0.95,
                "pad_token_id": tokenizer.pad_token_id,
                "eos_token_id": tokenizer.eos_token_id,
                "repetition_penalty": 1.05
            } 
            
            # 启动生成并监控CPU
            def generate_and_monitor():
                # 生成过程中每1秒记录一次CPU(独立线程)
                cpu_logger = None
                def log_cpu_usage():
                    while True:
                        cpu_percent = psutil.cpu_percent(interval=1)
                        per_core = psutil.cpu_percent(percpu=True)
                        logger.info(
                            f"CPU实时监控 | request_id={request_id} | "
                            f"整体使用率={cpu_percent}% | 核心使用率={per_core}"
                        )
                        time.sleep(100)
                
                # 启动CPU监控线程
                import threading
                cpu_logger = threading.Thread(target=log_cpu_usage, daemon=True)
                cpu_logger.start()
                
                # 执行生成
                try:
                    return model.generate(** gen_kwargs)
                finally:
                    # 生成结束后终止监控线程
                    if cpu_logger and cpu_logger.is_alive():
                        # 温和终止线程(避免资源泄漏)
                        import ctypes
                        ctypes.pythonapi.PyThreadState_SetAsyncExc(
                            ctypes.c_long(cpu_logger.ident),
                            ctypes.py_object(SystemExit)
                        )
            
            # 在 executor 中运行生成逻辑(带CPU监控)
            outputs = await loop.run_in_executor(None, generate_and_monitor)
            
            # 处理生成结果
            generated_tokens = outputs[0][input_tokens:]
            total_tokens = len(generated_tokens)
            logger.info(
                f"生成完成 | request_id={request_id} | "
                f"generated_tokens={total_tokens} | "
                f"耗时={(time.time()-start_time):.2f}s"
            )
            
            # 流式返回处理
            for i, token in enumerate(generated_tokens):
                if i == 0:
                    first_token_time = time.time()
                    logger.info(
                        f"首token生成 | request_id={request_id} | "
                        f"延迟={(first_token_time - start_time):.2f}s"
                    )
                
                # 客户端断开连接检测
                if await request.is_disconnected():
                    logger.warning(f"客户端断开 | request_id={request_id} | 已生成{i+1}token")
                    break
                
                # 解码与转义
                token_text = tokenizer.decode(token, skip_special_tokens=True)
                if token_text.endswith(tokenizer.eos_token):
                    break
                
                escaped_text = token_text.replace('"', '\\"').replace('\n', '\\n')
                yield '{{"chunk":"{}","finish":false,"request_id":"{}"}}\n'.format(escaped_text, request_id)
            
            # 结束标识
            yield '{"chunk":"","finish":true,"request_id":"{}"}\n'.format(request_id)

        return StreamingResponse(generate_chunks(), media_type="application/x-ndjson")

    except Exception as e:
        error_msg = f"推理失败: {str(e)}"
        logger.error(
            f"请求出错 | request_id={request_id} | "
            f"error={error_msg} | 耗时={(time.time()-start_time):.2f}s",
            exc_info=True
        )
        raise HTTPException(status_code=500, detail=error_msg)
    finally:
        # 输出性能总结
        elapsed_time = time.time() - start_time
        if total_tokens > 0 and elapsed_time > 0:
            speed = total_tokens / elapsed_time
            logger.info(
                f"请求总结 | request_id={request_id} | "
                f"总token={total_tokens} | 总耗时={elapsed_time:.2f}s | "
                f"平均速率={speed:.2f}token/s | "
                f"内存占用={psutil.virtual_memory().used / 1024**3:.2f}GB"
            )

# --------------------------
# 7. 增强版健康检查接口
# --------------------------
@app.get("/node/health")
async def node_health():
    # 实时硬件状态
    cpu_percent = psutil.cpu_percent(interval=0.5)
    mem_usage = psutil.virtual_memory().percent
    model_available = isinstance(model, AutoModelForCausalLM)
    
    return {
        "status": "healthy" if model_available else "unhealthy",
        "model": MODEL_NAME,
        "hardware": {
            "cpu_cores": psutil.cpu_count(logical=False),
            "logical_cores": psutil.cpu_count(logical=True),
            "cpu_usage": f"{cpu_percent}%",
            "memory_usage": f"{mem_usage}%"
        },
        "performance": {
            "target_speed": "1.5-2 token/s (2核CPU)",
            "quantization": "4bit NF4"
        },
        "timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
    }

if __name__ == "__main__":
    import uvicorn
    # 启动参数优化(2核专用)
    uvicorn.run(
        app,
        host="0.0.0.0",
        port=7860,
        log_level="info",
        workers=1  # 2核环境禁用多worker,避免资源竞争
    )