Spaces:
Sleeping
Sleeping
Update inference_node.py
Browse files- inference_node.py +77 -49
inference_node.py
CHANGED
|
@@ -15,11 +15,11 @@ logging.basicConfig(level=logging.INFO, format="%(asctime)s-%(name)s-%(levelname
|
|
| 15 |
logger = logging.getLogger("inference_node")
|
| 16 |
app = FastAPI(title="推理节点服务(Qwen-7B)")
|
| 17 |
|
| 18 |
-
# 2.
|
| 19 |
MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen-7B")
|
| 20 |
-
HF_TOKEN = os.getenv("HUGGINGFACE_HUB_TOKEN")
|
| 21 |
|
| 22 |
-
# 3. 4bit
|
| 23 |
bnb_config = BitsAndBytesConfig(
|
| 24 |
load_in_4bit=True,
|
| 25 |
bnb_4bit_use_double_quant=True,
|
|
@@ -27,102 +27,130 @@ bnb_config = BitsAndBytesConfig(
|
|
| 27 |
bnb_4bit_compute_dtype=torch.bfloat16
|
| 28 |
)
|
| 29 |
|
| 30 |
-
# 4.
|
| 31 |
try:
|
| 32 |
logger.info(f"开始加载模型:{MODEL_NAME}(4bit量化)")
|
| 33 |
tokenizer = AutoTokenizer.from_pretrained(
|
| 34 |
MODEL_NAME,
|
| 35 |
token=HF_TOKEN,
|
| 36 |
-
padding_side="right",
|
| 37 |
-
trust_remote_code=True
|
|
|
|
|
|
|
| 38 |
)
|
| 39 |
-
# 添加缺失的pad_token(如果需要)
|
| 40 |
-
if tokenizer.pad_token is None:
|
| 41 |
-
tokenizer.pad_token = tokenizer.eos_token
|
| 42 |
-
|
| 43 |
model = AutoModelForCausalLM.from_pretrained(
|
| 44 |
MODEL_NAME,
|
| 45 |
quantization_config=bnb_config,
|
| 46 |
-
device_map="auto",
|
| 47 |
token=HF_TOKEN,
|
| 48 |
-
trust_remote_code=True
|
|
|
|
| 49 |
)
|
|
|
|
| 50 |
streamer = TextStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True)
|
| 51 |
logger.info(f"模型 {MODEL_NAME} 加载成功!显存占用约 4-5GB(4bit 量化)")
|
| 52 |
except Exception as e:
|
| 53 |
logger.error(f"模型加载失败:{str(e)}", exc_info=True)
|
| 54 |
raise SystemExit(f"服务终止:{str(e)}")
|
| 55 |
|
| 56 |
-
# 5.
|
| 57 |
class NodeInferenceRequest(BaseModel):
|
| 58 |
-
prompt: str
|
| 59 |
-
max_tokens: int = 1024
|
|
|
|
| 60 |
|
| 61 |
-
# 6.
|
| 62 |
@app.post("/node/stream-infer")
|
| 63 |
async def stream_infer(req: NodeInferenceRequest, request: Request):
|
| 64 |
try:
|
| 65 |
-
#
|
| 66 |
-
|
| 67 |
-
|
| 68 |
-
|
| 69 |
-
|
| 70 |
-
|
| 71 |
-
|
| 72 |
-
input_text = tokenizer.apply_chat_template(
|
| 73 |
-
conversation,
|
| 74 |
-
tokenize=False,
|
| 75 |
-
add_generation_prompt=True
|
| 76 |
-
)
|
| 77 |
|
| 78 |
-
#
|
| 79 |
inputs = tokenizer(
|
| 80 |
input_text,
|
| 81 |
-
return_tensors="pt",
|
| 82 |
-
|
| 83 |
-
|
| 84 |
).to(model.device)
|
| 85 |
|
|
|
|
| 86 |
async def generate_chunks():
|
| 87 |
loop = asyncio.get_running_loop()
|
|
|
|
| 88 |
outputs = await loop.run_in_executor(
|
| 89 |
-
None,
|
| 90 |
lambda: model.generate(
|
| 91 |
**inputs,
|
| 92 |
-
streamer=streamer,
|
| 93 |
-
max_new_tokens=req.max_tokens,
|
| 94 |
-
do_sample=True,
|
| 95 |
-
temperature=
|
| 96 |
-
pad_token_id=tokenizer.
|
|
|
|
| 97 |
)
|
| 98 |
)
|
| 99 |
|
| 100 |
-
#
|
| 101 |
-
|
|
|
|
|
|
|
|
|
|
| 102 |
for token in generated_tokens:
|
|
|
|
| 103 |
if await request.is_disconnected():
|
|
|
|
| 104 |
break
|
| 105 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 106 |
escaped_text = token_text.replace('"', '\\"')
|
| 107 |
-
|
|
|
|
|
|
|
|
|
|
| 108 |
yield '{"chunk":"","finish":true}\n'
|
| 109 |
|
| 110 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
| 111 |
|
| 112 |
except Exception as e:
|
| 113 |
-
logger.error(f"推理失败:{str(e)}", exc_info=True) #
|
| 114 |
-
raise HTTPException(status_code=500, detail=str(e))
|
| 115 |
|
| 116 |
-
# 7.
|
| 117 |
@app.get("/node/health")
|
| 118 |
async def node_health():
|
|
|
|
|
|
|
| 119 |
return {
|
| 120 |
-
"status": "healthy",
|
| 121 |
"model": MODEL_NAME,
|
| 122 |
"support_stream": True,
|
| 123 |
-
"note": "Qwen-7B 4bit
|
|
|
|
| 124 |
}
|
| 125 |
|
|
|
|
| 126 |
if __name__ == "__main__":
|
| 127 |
import uvicorn
|
| 128 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 15 |
logger = logging.getLogger("inference_node")
|
| 16 |
app = FastAPI(title="推理节点服务(Qwen-7B)")
|
| 17 |
|
| 18 |
+
# 2. 模型配置(Qwen-7B 公开模型,无需HF Token)
|
| 19 |
MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen-7B")
|
| 20 |
+
HF_TOKEN = os.getenv("HUGGINGFACE_HUB_TOKEN", "") # 留空即可
|
| 21 |
|
| 22 |
+
# 3. 4bit量化配置(适配16G内存,显存占用约4-5GB)
|
| 23 |
bnb_config = BitsAndBytesConfig(
|
| 24 |
load_in_4bit=True,
|
| 25 |
bnb_4bit_use_double_quant=True,
|
|
|
|
| 27 |
bnb_4bit_compute_dtype=torch.bfloat16
|
| 28 |
)
|
| 29 |
|
| 30 |
+
# 4. 加载模型(关键:显式处理tokenizer缺失的配置)
|
| 31 |
try:
|
| 32 |
logger.info(f"开始加载模型:{MODEL_NAME}(4bit量化)")
|
| 33 |
tokenizer = AutoTokenizer.from_pretrained(
|
| 34 |
MODEL_NAME,
|
| 35 |
token=HF_TOKEN,
|
| 36 |
+
padding_side="right", # 右侧padding,避免生成时截断
|
| 37 |
+
trust_remote_code=True, # Qwen模型必需(加载自定义tokenizer)
|
| 38 |
+
eos_token="<|endoftext|>", # 显式指定结束符(兼容旧版本)
|
| 39 |
+
pad_token="<|endoftext|>" # 显式指定padding符(避免生成警告)
|
| 40 |
)
|
|
|
|
|
|
|
|
|
|
|
|
|
| 41 |
model = AutoModelForCausalLM.from_pretrained(
|
| 42 |
MODEL_NAME,
|
| 43 |
quantization_config=bnb_config,
|
| 44 |
+
device_map="auto", # 自动分配设备(优先GPU)
|
| 45 |
token=HF_TOKEN,
|
| 46 |
+
trust_remote_code=True,
|
| 47 |
+
torch_dtype=torch.bfloat16 # 匹配量化计算精度
|
| 48 |
)
|
| 49 |
+
# 流式输出配置(跳过提示词,只返回生成内容)
|
| 50 |
streamer = TextStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True)
|
| 51 |
logger.info(f"模型 {MODEL_NAME} 加载成功!显存占用约 4-5GB(4bit 量化)")
|
| 52 |
except Exception as e:
|
| 53 |
logger.error(f"模型加载失败:{str(e)}", exc_info=True)
|
| 54 |
raise SystemExit(f"服务终止:{str(e)}")
|
| 55 |
|
| 56 |
+
# 5. 请求体定义(用户输入prompt和生成参数)
|
| 57 |
class NodeInferenceRequest(BaseModel):
|
| 58 |
+
prompt: str # 用户提问内容
|
| 59 |
+
max_tokens: int = 1024 # 最大生成长度(默认1024)
|
| 60 |
+
temperature: float = 0.7 # 随机性(0-1,越大越多样)
|
| 61 |
|
| 62 |
+
# 6. 流式推理接口(核心修复:绕开chat_template,直接构建输入)
|
| 63 |
@app.post("/node/stream-infer")
|
| 64 |
async def stream_infer(req: NodeInferenceRequest, request: Request):
|
| 65 |
try:
|
| 66 |
+
# --------------------------
|
| 67 |
+
# 关键修复:手动构建Qwen原生对话格式
|
| 68 |
+
# Qwen要求格式:<|user|>用户输入<|end|><|assistant|>
|
| 69 |
+
# --------------------------
|
| 70 |
+
user_prompt = req.prompt.strip()
|
| 71 |
+
# 构建模型能理解的输入文本(无需依赖chat_template)
|
| 72 |
+
input_text = f"<|user|>{user_prompt}<|end|><|assistant|>"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 73 |
|
| 74 |
+
# 编码输入(转换为模型可处理的张量,并移动到GPU)
|
| 75 |
inputs = tokenizer(
|
| 76 |
input_text,
|
| 77 |
+
return_tensors="pt", # 返回PyTorch张量
|
| 78 |
+
truncation=True, # 截断过长输入(避免OOM)
|
| 79 |
+
max_length=2048 # 输入最大长度(根据模型能力调整)
|
| 80 |
).to(model.device)
|
| 81 |
|
| 82 |
+
# 异步生成流式内容(避免阻塞FastAPI主线程)
|
| 83 |
async def generate_chunks():
|
| 84 |
loop = asyncio.get_running_loop()
|
| 85 |
+
# 在线程池中运行同步的模型生成(不阻塞事件循环)
|
| 86 |
outputs = await loop.run_in_executor(
|
| 87 |
+
None, # 使用默认线程池
|
| 88 |
lambda: model.generate(
|
| 89 |
**inputs,
|
| 90 |
+
streamer=streamer, # 流式输出支持
|
| 91 |
+
max_new_tokens=req.max_tokens, # 最大生成长度
|
| 92 |
+
do_sample=True, # 启��采样(生成多样内容)
|
| 93 |
+
temperature=req.temperature, # 随机性控制
|
| 94 |
+
pad_token_id=tokenizer.pad_token_id, # padding符ID
|
| 95 |
+
eos_token_id=tokenizer.eos_token_id # 结束符ID(生成停止标志)
|
| 96 |
)
|
| 97 |
)
|
| 98 |
|
| 99 |
+
# 提取生成的内容(排除输入部分,只取新生成的token)
|
| 100 |
+
input_token_len = inputs["input_ids"].shape[1] # 输入token长度
|
| 101 |
+
generated_tokens = outputs[0][input_token_len:] # 仅保留新生成的token
|
| 102 |
+
|
| 103 |
+
# 逐token解码并返回(流式输出核心)
|
| 104 |
for token in generated_tokens:
|
| 105 |
+
# 检查客户端是否断开连接(避免无效生成)
|
| 106 |
if await request.is_disconnected():
|
| 107 |
+
logger.info("客户端已断开连接,停止生成")
|
| 108 |
break
|
| 109 |
+
# 解码单个token(跳过特殊符号,如<|end|>)
|
| 110 |
+
token_text = tokenizer.decode(
|
| 111 |
+
token,
|
| 112 |
+
skip_special_tokens=True, # 跳过特殊token(如结束符、分隔符)
|
| 113 |
+
clean_up_tokenization_spaces=True # 清理多余空格
|
| 114 |
+
)
|
| 115 |
+
# 转义双引号(避免JSON格式错误)
|
| 116 |
escaped_text = token_text.replace('"', '\\"')
|
| 117 |
+
# 按NDJSON格式返回(每行一个JSON对象,兼容流式解析)
|
| 118 |
+
yield f'{{"chunk":"{escaped_text}","finish":false}}\n'
|
| 119 |
+
|
| 120 |
+
# 生成结束标志(告知客户端生成完成)
|
| 121 |
yield '{"chunk":"","finish":true}\n'
|
| 122 |
|
| 123 |
+
# 返回流式响应(媒体类型为application/x-ndjson,支持逐行解析)
|
| 124 |
+
return StreamingResponse(
|
| 125 |
+
generate_chunks(),
|
| 126 |
+
media_type="application/x-ndjson"
|
| 127 |
+
)
|
| 128 |
|
| 129 |
except Exception as e:
|
| 130 |
+
logger.error(f"推理失败:{str(e)}", exc_info=True) # 记录详细错误堆栈
|
| 131 |
+
raise HTTPException(status_code=500, detail=f"推理服务异常:{str(e)}")
|
| 132 |
|
| 133 |
+
# 7. 健康检查接口(用于监控服务状态)
|
| 134 |
@app.get("/node/health")
|
| 135 |
async def node_health():
|
| 136 |
+
# 检查模型和tokenizer是否正常加载
|
| 137 |
+
is_model_ready = model is not None and tokenizer is not None
|
| 138 |
return {
|
| 139 |
+
"status": "healthy" if is_model_ready else "unhealthy",
|
| 140 |
"model": MODEL_NAME,
|
| 141 |
"support_stream": True,
|
| 142 |
+
"note": "Qwen-7B 4bit量化(适配16G内存),绕开chat_template兼容旧版本",
|
| 143 |
+
"timestamp": str(asyncio.get_event_loop().time())
|
| 144 |
}
|
| 145 |
|
| 146 |
+
# 8. 启动服务(仅在直接运行脚本时执行)
|
| 147 |
if __name__ == "__main__":
|
| 148 |
import uvicorn
|
| 149 |
+
# 启动UVicorn服务(host=0.0.0.0允许外部访问,port=7860为默认端口)
|
| 150 |
+
uvicorn.run(
|
| 151 |
+
app,
|
| 152 |
+
host="0.0.0.0",
|
| 153 |
+
port=7860,
|
| 154 |
+
log_level="info",
|
| 155 |
+
workers=1 # 单进程(模型不支持多进程共享,避免重复加载)
|
| 156 |
+
)
|