fiewolf1000 commited on
Commit
ea17503
·
verified ·
1 Parent(s): 09674e8

Update inference_node.py

Browse files
Files changed (1) hide show
  1. inference_node.py +50 -107
inference_node.py CHANGED
@@ -7,7 +7,7 @@ import torch
7
  import asyncio
8
  from transformers import (
9
  AutoModelForCausalLM, AutoTokenizer,
10
- BitsAndBytesConfig
11
  )
12
 
13
  # 1. 基础配置
@@ -27,7 +27,7 @@ bnb_config = BitsAndBytesConfig(
27
  bnb_4bit_compute_dtype=torch.bfloat16
28
  )
29
 
30
- # 4. 加载模型(移除TextStreamer,避免兼容性问题
31
  try:
32
  logger.info(f"开始加载模型:{MODEL_NAME}(4bit量化)")
33
  tokenizer = AutoTokenizer.from_pretrained(
@@ -35,7 +35,7 @@ try:
35
  token=HF_TOKEN,
36
  padding_side="right", # 右侧padding,避免生成时截断
37
  trust_remote_code=True, # Qwen模型必需(加载自定义tokenizer)
38
- eos_token="<|endoftext|>", # 显式指定结束符
39
  pad_token="<|endoftext|>" # 显式指定padding符(避免生成警告)
40
  )
41
  model = AutoModelForCausalLM.from_pretrained(
@@ -46,8 +46,8 @@ try:
46
  trust_remote_code=True,
47
  torch_dtype=torch.bfloat16 # 匹配量化计算精度
48
  )
49
- # 关键:设置模型为评估模式(避免训练模式的随机行为
50
- model.eval()
51
  logger.info(f"模型 {MODEL_NAME} 加载成功!显存占用约 4-5GB(4bit 量化)")
52
  except Exception as e:
53
  logger.error(f"模型加载失败:{str(e)}", exc_info=True)
@@ -58,122 +58,66 @@ class NodeInferenceRequest(BaseModel):
58
  prompt: str # 用户提问内容
59
  max_tokens: int = 1024 # 最大生成长度(默认1024)
60
  temperature: float = 0.7 # 随机性(0-1,越大越多样)
61
- top_p: float = 0.9 # 采样Top-p(配合temperature使用)
62
 
63
- # 6. 流式推理接口(核心:手动逐token生成显式管理past_key_values
64
  @app.post("/node/stream-infer")
65
  async def stream_infer(req: NodeInferenceRequest, request: Request):
66
  try:
67
  # --------------------------
68
- # 1. 构建Qwen原生对话输入格式
 
69
  # --------------------------
70
  user_prompt = req.prompt.strip()
71
- if not user_prompt:
72
- raise ValueError("用户输入prompt不能为空")
73
-
74
- # Qwen-7B 原生对话格式:<|user|>提问<|end|><|assistant|>
75
  input_text = f"<|user|>{user_prompt}<|end|><|assistant|>"
76
 
77
- # 编码输入(转换为GPU张量,不进行padding,避免冗余
78
  inputs = tokenizer(
79
  input_text,
80
  return_tensors="pt", # 返回PyTorch张量
81
  truncation=True, # 截断过长输入(避免OOM)
82
- max_length=2048 # 输入最大长度(匹配模型能力)
83
  ).to(model.device)
84
 
85
- # --------------------------
86
- # 2. 初始化生成状态(显式管理past_key_values)
87
- # --------------------------
88
- input_ids = inputs["input_ids"] # 初始输入token
89
- attention_mask = inputs["attention_mask"] # 初始注意力掩码
90
- past_key_values = None # 初始化键值对缓存(后续逐token更新)
91
- generated_tokens = [] # 记录已生成的token(用于最终校验)
92
- max_new_tokens = min(req.max_tokens, 2048) # 限制最大生成长度(避免OOM)
93
-
94
- # --------------------------
95
- # 3. 异步流式生成(手动逐token生成,避免TextStreamer问题)
96
- # --------------------------
97
  async def generate_chunks():
98
- nonlocal past_key_values, generated_tokens
99
  loop = asyncio.get_running_loop()
100
-
101
- for _ in range(max_new_tokens):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
102
  # 检查客户端是否断开连接(避免无效生成)
103
  if await request.is_disconnected():
104
  logger.info("客户端已断开连接,停止生成")
105
- yield '{"chunk":"","finish":true}\n'
106
- return
107
-
108
- # --------------------------
109
- # 关键:在线程池中运行模型生成(每次只生成1个token)
110
- # --------------------------
111
- try:
112
- # 传入past_key_values,确保缓存不丢失
113
- generate_kwargs = {
114
- "input_ids": input_ids,
115
- "attention_mask": attention_mask,
116
- "past_key_values": past_key_values,
117
- "max_new_tokens": 1, # 每次只生成1个token流式核心
118
- "do_sample": True,
119
- "temperature": req.temperature,
120
- "top_p": req.top_p,
121
- "pad_token_id": tokenizer.pad_token_id,
122
- "eos_token_id": tokenizer.eos_token_id,
123
- "use_cache": True, # 必须开启缓存,否则past_key_values无效
124
- "return_dict_in_generate": True, # 返回字典格式,便于获取past_key_values
125
- "output_scores": False # 关闭分数输出,减少计算开销
126
- }
127
-
128
- # 同步生成(在独立线程中运行,不阻塞FastAPI事件循环)
129
- outputs = await loop.run_in_executor(
130
- None, # 使用默认线程池
131
- lambda: model.generate(**generate_kwargs)
132
- )
133
-
134
- # --------------------------
135
- # 4. 更新生成状态(关键:保存past_key_values)
136
- # --------------------------
137
- next_token = outputs.sequences[:, -1:] # 获取最新生成的1个token
138
- past_key_values = outputs.past_key_values # 更新缓存(避免下一轮为None)
139
- generated_tokens.append(next_token.item()) # 记录生成的token
140
-
141
- # --------------------------
142
- # 5. 解码token并返回(流式输出)
143
- # --------------------------
144
- # 解码单个token(跳过特殊符号,清理空格)
145
- token_text = tokenizer.decode(
146
- next_token[0], # 取batch中的第一个(仅单条请求)
147
- skip_special_tokens=True,
148
- clean_up_tokenization_spaces=True
149
- )
150
-
151
- # 转义双引号(避免JSON格式错误)
152
- escaped_text = token_text.replace('"', '\\"')
153
- # 按NDJSON格式返回(每行一个JSON对象,客户端可逐行解析)
154
- yield f'{{"chunk":"{escaped_text}","finish":false}}\n'
155
-
156
- # 检查是否生成结束符(eos_token),是的话终止生成
157
- if next_token.item() == tokenizer.eos_token_id:
158
- logger.info(f"生成结束符(eos_token),停止生成")
159
- break
160
-
161
- # 更新下一轮的输入(仅使用最新生成的token,减少计算量)
162
- input_ids = next_token
163
- # 扩展注意力掩码(新token的掩码为1)
164
- attention_mask = torch.cat([
165
- attention_mask,
166
- torch.ones((1, 1), device=model.device, dtype=torch.long)
167
- ], dim=-1)
168
-
169
- except Exception as gen_e:
170
- logger.error(f"逐token生成失败:{str(gen_e)}", exc_info=True)
171
- yield f'{{"chunk":"生成过程异常:{str(gen_e)}","finish":true}}\n'
172
- return
173
-
174
- # --------------------------
175
- # 6. 生成结束(返回终止标志)
176
- # --------------------------
177
  yield '{"chunk":"","finish":true}\n'
178
 
179
  # 返回流式响应(媒体类型为application/x-ndjson,支持逐行解析)
@@ -183,9 +127,8 @@ async def stream_infer(req: NodeInferenceRequest, request: Request):
183
  )
184
 
185
  except Exception as e:
186
- error_msg = f"推理服务异常:{str(e)}"
187
- logger.error(error_msg, exc_info=True)
188
- raise HTTPException(status_code=500, detail=error_msg)
189
 
190
  # 7. 健康检查接口(用于监控服务状态)
191
  @app.get("/node/health")
@@ -196,18 +139,18 @@ async def node_health():
196
  "status": "healthy" if is_model_ready else "unhealthy",
197
  "model": MODEL_NAME,
198
  "support_stream": True,
199
- "note": "Qwen-7B 4bit量化(手动逐token生成,解决past_key_values问题)",
200
  "timestamp": str(asyncio.get_event_loop().time())
201
  }
202
 
203
  # 8. 启动服务(仅在直接运行脚本时执行)
204
  if __name__ == "__main__":
205
  import uvicorn
206
- # 启动UVicorn服务(单进程避免模型重复加载
207
  uvicorn.run(
208
  app,
209
  host="0.0.0.0",
210
  port=7860,
211
  log_level="info",
212
- workers=1 # 模型不支持多进程共享,必须设为1
213
  )
 
7
  import asyncio
8
  from transformers import (
9
  AutoModelForCausalLM, AutoTokenizer,
10
+ BitsAndBytesConfig, TextStreamer
11
  )
12
 
13
  # 1. 基础配置
 
27
  bnb_4bit_compute_dtype=torch.bfloat16
28
  )
29
 
30
+ # 4. 加载模型(关键:显式处理tokenizer缺失的配置
31
  try:
32
  logger.info(f"开始加载模型:{MODEL_NAME}(4bit量化)")
33
  tokenizer = AutoTokenizer.from_pretrained(
 
35
  token=HF_TOKEN,
36
  padding_side="right", # 右侧padding,避免生成时截断
37
  trust_remote_code=True, # Qwen模型必需(加载自定义tokenizer)
38
+ eos_token="<|endoftext|>", # 显式指定结束符(兼容旧版本)
39
  pad_token="<|endoftext|>" # 显式指定padding符(避免生成警告)
40
  )
41
  model = AutoModelForCausalLM.from_pretrained(
 
46
  trust_remote_code=True,
47
  torch_dtype=torch.bfloat16 # 匹配量化计算精度
48
  )
49
+ # 输出配置跳过提示词,只返回生成内容
50
+ streamer = TextStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True)
51
  logger.info(f"模型 {MODEL_NAME} 加载成功!显存占用约 4-5GB(4bit 量化)")
52
  except Exception as e:
53
  logger.error(f"模型加载失败:{str(e)}", exc_info=True)
 
58
  prompt: str # 用户提问内容
59
  max_tokens: int = 1024 # 最大生成长度(默认1024)
60
  temperature: float = 0.7 # 随机性(0-1,越大越多样)
 
61
 
62
+ # 6. 流式推理接口(核心修复绕开chat_template直接构建输入
63
  @app.post("/node/stream-infer")
64
  async def stream_infer(req: NodeInferenceRequest, request: Request):
65
  try:
66
  # --------------------------
67
+ # 关键修复:手动构建Qwen原生对话格式
68
+ # Qwen要求格式:<|user|>用户输入<|end|><|assistant|>
69
  # --------------------------
70
  user_prompt = req.prompt.strip()
71
+ # 构建模型能理解的输入文本(无需依赖chat_template)
 
 
 
72
  input_text = f"<|user|>{user_prompt}<|end|><|assistant|>"
73
 
74
+ # 编码输入(转换为模型可处理的张量,并移动到GPU
75
  inputs = tokenizer(
76
  input_text,
77
  return_tensors="pt", # 返回PyTorch张量
78
  truncation=True, # 截断过长输入(避免OOM)
79
+ max_length=2048 # 输入最大长度(根据模型能力调整
80
  ).to(model.device)
81
 
82
+ # 异步生成流式内容(避免阻塞FastAPI主线程)
 
 
 
 
 
 
 
 
 
 
 
83
  async def generate_chunks():
 
84
  loop = asyncio.get_running_loop()
85
+ # 在线程池中运行同步的模型生成(不阻塞事件循环)
86
+ outputs = await loop.run_in_executor(
87
+ None, # 使用默认线程池
88
+ lambda: model.generate(
89
+ **inputs,
90
+ streamer=streamer, # 流式输出支持
91
+ max_new_tokens=req.max_tokens, # 最大生成长度
92
+ do_sample=True, # 启用采样(生成多样内容)
93
+ temperature=req.temperature, # 随机性控制
94
+ pad_token_id=tokenizer.pad_token_id, # padding符ID
95
+ eos_token_id=tokenizer.eos_token_id # 结束符ID(生成停止标志)
96
+ )
97
+ )
98
+
99
+ # 提取生成的内容(排除输入部分,只取新生成的token)
100
+ input_token_len = inputs["input_ids"].shape[1] # 输入token长度
101
+ generated_tokens = outputs[0][input_token_len:] # 仅保留新生成的token
102
+
103
+ # 逐token解码并返回(流式输出核心)
104
+ for token in generated_tokens:
105
  # 检查客户端是否断开连接(避免无效生成)
106
  if await request.is_disconnected():
107
  logger.info("客户端已断开连接,停止生成")
108
+ break
109
+ # 解码单个token(跳过特殊符号,如<|end|>)
110
+ token_text = tokenizer.decode(
111
+ token,
112
+ skip_special_tokens=True, # 跳过特殊token(如结束符、分隔符
113
+ clean_up_tokenization_spaces=True # 清理多余空格
114
+ )
115
+ # 转义双引号(避免JSON格错误)
116
+ escaped_text = token_text.replace('"', '\\"')
117
+ # 按NDJSON格式返回(每行一个JSON对象,兼容流式解析)
118
+ yield f'{{"chunk":"{escaped_text}","finish":false}}\n'
119
+
120
+ # 生成结束标志告知客户端生成完成
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
121
  yield '{"chunk":"","finish":true}\n'
122
 
123
  # 返回流式响应(媒体类型为application/x-ndjson,支持逐行解析)
 
127
  )
128
 
129
  except Exception as e:
130
+ logger.error(f"推理失败:{str(e)}", exc_info=True) # 记录详细错误堆栈
131
+ raise HTTPException(status_code=500, detail=f"推理服务异常:{str(e)}")
 
132
 
133
  # 7. 健康检查接口(用于监控服务状态)
134
  @app.get("/node/health")
 
139
  "status": "healthy" if is_model_ready else "unhealthy",
140
  "model": MODEL_NAME,
141
  "support_stream": True,
142
+ "note": "Qwen-7B 4bit量化(适配16G内存,绕开chat_template兼容旧版本",
143
  "timestamp": str(asyncio.get_event_loop().time())
144
  }
145
 
146
  # 8. 启动服务(仅在直接运行脚本时执行)
147
  if __name__ == "__main__":
148
  import uvicorn
149
+ # 启动UVicorn服务(host=0.0.0.0允许外部访问port=7860为默认端口
150
  uvicorn.run(
151
  app,
152
  host="0.0.0.0",
153
  port=7860,
154
  log_level="info",
155
+ workers=1 # 单进程(模型不支持多进程共享,避免重复加载)
156
  )