# -*- coding: utf-8 -*-
"""
飞行汉化 (Flying Translation) 云端多模态引擎 Pro
==================================================
优化的 FastAPI 云端翻译服务
主要优化:
1. 完善的异常处理和日志记录
2. 排队状态监控 API
3. 请求限流和频率控制
4. 增强的安全验证
5. 更详细的性能指标
"""
import os
import sys
import json
import asyncio
import time
import logging
from datetime import datetime, timedelta
from typing import Optional, Dict, List, Any
from collections import defaultdict, deque
from functools import wraps
from contextlib import asynccontextmanager
# Web 框架
from fastapi import FastAPI, HTTPException, Security, Depends, Request, Response
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from fastapi.security import APIKeyHeader
from pydantic import BaseModel, Field
# Hugging Face
from huggingface_hub import AsyncInferenceClient
# ==========================================
# 日志配置
# ==========================================
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s | %(levelname)-8s | %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
# ==========================================
# 应用配置
# ==========================================
API_KEY_NAME = "X-API-Key"
SECRET_KEY = os.environ.get("SERVER_SECRET_KEY", "flying_dev_key")
HF_TOKEN = os.environ.get("HF_TOKEN")
# 限流配置
RATE_LIMIT_REQUESTS = 30 # 每分钟最大请求数
RATE_LIMIT_WINDOW = 60 # 时间窗口(秒)
MAX_QUEUE_SIZE = 100 # 最大排队数
# 初始化客户端
async_client = AsyncInferenceClient(token=HF_TOKEN) if HF_TOKEN else None
# ==========================================
# 限流器
# ==========================================
class 限流器:
"""令牌桶算法的简单限流实现"""
def __init__(self, max_requests: int = 30, window_seconds: int = 60):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = defaultdict(deque)
self._lock = asyncio.Lock()
async def is_allowed(self, client_id: str) -> bool:
"""检查是否允许请求"""
async with self._lock:
now = datetime.now()
cutoff = now - timedelta(seconds=self.window_seconds)
# 清理过期记录
while self.requests[client_id] and self.requests[client_id][0] < cutoff:
self.requests[client_id].popleft()
# 检查是否超限
if len(self.requests[client_id]) >= self.max_requests:
return False
# 记录请求
self.requests[client_id].append(now)
return True
def get_remaining(self, client_id: str) -> int:
"""获取剩余请求数"""
now = datetime.now()
cutoff = now - timedelta(seconds=self.window_seconds)
# 清理过期记录
while self.requests[client_id] and self.requests[client_id][0] < cutoff:
self.requests[client_id].popleft()
return max(0, self.max_requests - len(self.requests[client_id]))
# 全局限流器实例
rate_limiter = 限流器(
max_requests=RATE_LIMIT_REQUESTS,
window_seconds=RATE_LIMIT_WINDOW
)
# ==========================================
# 排队状态跟踪器
# ==========================================
class 排队跟踪器:
"""跟踪当前请求排队状态"""
def __init__(self, max_size: int = 100):
self.max_size = max_size
self.current_requests: Dict[str, dict] = {}
self._lock = asyncio.Lock()
self._request_counter = 0
async def 添加请求(self, request_id: str, client_info: dict) -> bool:
"""添加新请求到队列"""
async with self._lock:
if len(self.current_requests) >= self.max_size:
return False
self._request_counter += 1
self.current_requests[request_id] = {
**client_info,
"request_id": request_id,
"start_time": time.time(),
"counter": self._request_counter
}
return True
async def 移除请求(self, request_id: str):
"""移除完成的请求"""
async with self._lock:
self.current_requests.pop(request_id, None)
async def 获取状态(self) -> dict:
"""获取当前排队状态"""
async with self._lock:
return {
"total_requests": len(self.current_requests),
"queue_position": self._request_counter,
"waiting": len(self.current_requests),
"requests": [
{
"request_id": req["request_id"],
"model": req.get("model", "unknown"),
"elapsed_seconds": round(time.time() - req["start_time"], 1)
}
for req in sorted(
self.current_requests.values(),
key=lambda x: x["counter"]
)
]
}
queue_tracker = 排队跟踪器(max_size=MAX_QUEUE_SIZE)
# ==========================================
# FastAPI 应用
# ==========================================
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期管理"""
logger.info("🚀 云端翻译引擎启动中...")
if not HF_TOKEN:
logger.warning("⚠️ 未配置 HF_TOKEN,部分功能可能受限")
yield
logger.info("🛑 云端翻译引擎关闭中...")
if async_client:
await async_client.close()
app = FastAPI(
title="飞行汉化 (Flying Translation) 云端多模态引擎 Pro",
description="""
## 🚀 功能特性
- **多模态翻译**: 支持文本翻译和视觉截图翻译
- **智能重试**: 自动处理 HF 集群过载情况
- **实时监控**: 排队状态实时查询
- **频率限制**: 保护服务稳定性
## 🔐 认证方式
需要在请求头中添加 `X-API-密匙` 字段
""",
version="2.0.0",
lifespan=lifespan
)
# CORS 配置(生产环境建议限制来源)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 生产环境应限制为特定域名
allow_credentials=True,
allow_methods=["GET", "POST"],
allow_headers=["*"],
)
# ==========================================
# 安全验证
# ==========================================
api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False)
async def verify_api_key(api_key: str = Security(api_key_header)) -> str:
"""验证 API 密匙"""
if not api_key:
raise HTTPException(
status_code=401,
detail="缺少 API 密匙,请检查请求头中的 X-API-Key"
)
if api_key != SECRET_KEY:
raise HTTPException(
status_code=401,
detail="无效的 API 密匙"
)
return api_key
async def check_rate_limit(request: Request):
"""检查请求频率限制"""
client_id = request.client.host if request.client else "unknown"
if not await rate_limiter.is_allowed(client_id):
remaining = rate_limiter.get_remaining(client_id)
raise HTTPException(
status_code=429,
detail=f"请求过于频繁,请在 {RATE_LIMIT_WINDOW} 秒后重试。剩余配额: {remaining}"
)
# ==========================================
# 请求模型
# ==========================================
class 翻译请求(BaseModel):
"""翻译请求模型"""
target_language: str = Field(..., description="目标语言,如 '中文', '英文'")
data: Optional[dict] = Field(None, description="要翻译的 JSON 数据")
image_base64: Optional[str] = Field(None, description="Base64 编码的图片")
model_id: str = Field(..., description="要使用的模型 ID")
custom_glossary: Optional[Dict[str, str]] = Field(
default_factory=dict,
description="自定义术语词典"
)
model_config = {
"json_schema_extra": {
"example": {
"target_language": "中文",
"data": {"NodeClass": {"title": "Example", "inputs": {"image": "image"}}},
"model_id": "Qwen/Qwen2.5-7B-Instruct"
}
}
}
# ==========================================
# 路由定义
# ==========================================
@app.get("/")
@app.get("/health")
async def health_check():
"""健康检查端点"""
return {
"status": "healthy",
"service": "Flying-Translation API",
"version": "2.0.0",
"timestamp": datetime.now().isoformat(),
"hf_client_ready": async_client is not None
}
@app.get("/api/queue_status")
async def get_queue_status(request: Request):
"""获取当前排队状态"""
client_ip = request.client.host if request.client else "unknown"
status = await queue_tracker.获取状态()
return {
"status": "ok",
"your_ip": client_ip,
"rate_limit": {
"remaining": rate_limiter.get_remaining(client_ip),
"window_seconds": RATE_LIMIT_WINDOW,
"max_per_window": RATE_LIMIT_REQUESTS
},
**status
}
@app.post("/api/translate", dependencies=[Depends(verify_api_key), Depends(check_rate_limit)])
async def execute_cloud_translation(
request: Request,
request_data: 翻译请求
):
"""执行云端翻译"""
request_id = f"{int(time.time() * 1000)}"
# 记录请求
logger.info(f"[{request_id}] 收到翻译请求: 语言={request_data.target_language}, 模型={request_data.model_id}")
# 添加到队列
client_ip = request.client.host if request.client else "unknown"
await queue_tracker.添加请求(request_id, {
"client_ip": client_ip,
"model": request_data.model_id,
"language": request_data.target_language
})
try:
# 根据请求类型构建消息
if request_data.image_base64:
result = await _handle_vision_translation(
request_data.image_base64,
request_data.target_language,
request_data.model_id
)
elif request_data.data:
result = await _handle_text_translation(
request_data.data,
request_data.target_language,
request_data.model_id,
request_data.custom_glossary
)
else:
raise HTTPException(
status_code=400,
detail="必须提供 data 或 image_base64"
)
logger.info(f"[{request_id}] 翻译完成")
return {
"status": "success",
"data": result,
"model_used": request_data.model_id,
"request_id": request_id
}
except HTTPException:
raise
except Exception as e:
logger.error(f"[{request_id}] 翻译失败: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"云端翻译异常: {str(e)}"
)
finally:
await queue_tracker.移除请求(request_id)
async def _handle_vision_translation(
image_base64: str,
target_language: str,
model_id: str
) -> dict:
"""处理视觉翻译请求"""
sys_prompt = f"""你是一个 ComfyUI 节点翻译专家。请分析用户提供的节点截图,提取并翻译为:【{target_language}】。
【提取规则】:
1. 图像右上角通常有一个带有背景色的小字(如 Image-Filters),这是该节点的归属插件名,请务必将其提取并放在最外层的 "_plugin_guess" 字段中!
2. 图像内部左上角的大字(如 Keyer)才是节点真正的类名,将其作为 JSON 数据的主 Key,并将它的翻译放在内部的 "title" 中。
3. 左侧连接点是 inputs。右侧连接点是 outputs。中间输入框是 widgets。
4. 根据功能理解,用【{target_language}】写一段专业 description。
5. 必须且只输出 JSON 代码块!
【输出格式严格规范】:
{{
"_plugin_guess": "右上角提取的插件名",
"Original Node Class Name": {{
"title": "[节点名称的 {target_language} 翻译]",
"inputs": {{"original_variable_name": "[{target_language} 翻译]"}},
"widgets": {{"original_variable_name": "[{target_language} 翻译]"}},
"outputs": {{"original_variable_name": "[{target_language} 翻译]"}},
"description": "[详细的功能 {target_language} 描述]"
}}
}}
【额外规则】:
4. 字典格式:内部结构必须严格是 "原始变量名": "{target_language}翻译"。绝对禁止编造嵌套字典(例如禁止生成 type, default 等无关属性)!
5. 必须意译:对于 "source_path", "image", "max_pixels" 等参数,必须直接翻译为通顺的 {target_language}。"""
messages = [
{"role": "system", "content": sys_prompt},
{"role": "user", "content": [
{"type": "text", "text": f"请提取图中节点信息并严格按照示例格式翻译为 {target_language}。"},
{"type": "image_url", "image_url": {"url": image_base64}}
]}
]
return await _call_model_with_retry(model_id, messages)
async def _handle_text_translation(
data: dict,
target_language: str,
model_id: str,
custom_glossary: Optional[Dict[str, str]]
) -> dict:
"""处理文本翻译请求"""
sys_prompt = f"""你是一个专业的 ComfyUI 插件多语言本地化翻译专家。任务是将 JSON 文件中的原始字符串(Value)翻译成:【{target_language}】。
【核心规则】:
1. 绝对禁止修改 JSON 的键名(Key)!只能翻译值(Value)。
2. "title" 和 "description" 必须翻译为通顺的 {target_language}。
3. 带下划线、数字或大写的变量(如 image_1, MAX_SIZE)必须根据 {target_language} 表达习惯意译。
4. 专业术语必须保留原文:clip, vae, latent, lora, cond, uncond, seed, step 等。"""
# 智能检测输入数据的源语言方向
输入样本 = json.dumps(data, ensure_ascii=False)[:500]
源含中文 = any('\u4e00' <= c <= '\u9fff' for c in 输入样本)
是否翻译为中文 = "中文" in target_language or "zh" in target_language.lower()
# 按翻译方向注入特化规则
if 是否翻译为中文:
sys_prompt += "\n5. 【中文特有规则】'mask' 统一译为 '遮罩';'image' 统一译为 '图像'。"
elif 源含中文:
sys_prompt += f"""
5. 【关键】当前输入的 Value 中包含中文,你必须将所有中文精准翻译为地道的【{target_language}】,最终输出中绝对不允许残留任何中文字符!
6. 常见术语参考:遮罩=Mask、图像=Image、模型=Model、采样器=Sampler、宽度=Width、高度=Height、步数=Steps、种子=Seed、批次=Batch、通道=Channel、降噪=Denoise、提示词=Prompt。"""
sys_prompt += "\n【输出要求】必须且只能输出合法的 JSON 代码块,绝不要包含 ``` json 等 Markdown 标记或解释说明!"
sys_prompt += f"""
【翻译参考示例】
(注意:以下示例仅用于演示 JSON 结构,不管示例中输出的是什么语言,你的实际输出必须全部使用【{target_language}】!)"""
# 根据翻译方向提供匹配的参考示例
if 源含中文 and not 是否翻译为中文:
sys_prompt += f"""
输入:
{{
"ImageScaler": {{
"title": "图像缩放",
"inputs": {{"图像": "图像", "遮罩": "遮罩"}},
"widgets": {{"宽度": "宽度", "高度": "高度", "缩放模式": "缩放模式"}},
"outputs": {{"IMAGE": "IMAGE", "LATENT": "LATENT"}},
"description": "分类: 图像处理"
}}
}}
输出:
{{
"ImageScaler": {{
"title": "[图像缩放 的 {target_language} 翻译]",
"inputs": {{"图像": "[图像 的 {target_language} 翻译]", "遮罩": "[遮罩 的 {target_language} 翻译]"}},
"widgets": {{"宽度": "[宽度 的 {target_language} 翻译]", "高度": "[高度 的 {target_language} 翻译]", "缩放模式": "[缩放模式 的 {target_language} 翻译]"}},
"outputs": {{"IMAGE": "[IMAGE 的 {target_language} 翻译]", "LATENT": "LATENT"}},
"description": "[分类: 图像处理 的 {target_language} 翻译]"
}}
}}
"""
else:
sys_prompt += f"""
输入:
{{
"IntLiteral": {{
"title": "IntLiteral",
"inputs": {{"image_1": "image_1", "mask": "mask"}},
"widgets": {{"seed": "seed", "resize_mode": "resize_mode"}},
"outputs": {{"IMAGE": "IMAGE", "LATENT": "LATENT"}},
"description": "Category: Logic"
}}
}}
输出:
{{
"IntLiteral": {{
"title": "[IntLiteral 的 {target_language} 翻译]",
"inputs": {{"image_1": "[image 1 的 {target_language} 翻译]", "mask": "[mask 的 {target_language} 翻译]"}},
"widgets": {{"seed": "seed", "resize_mode": "[resize_mode 的 {target_language} 翻译]"}},
"outputs": {{"IMAGE": "[IMAGE 的 {target_language} 翻译]", "LATENT": "LATENT"}},
"description": "[Category: Logic 的 {target_language} 翻译]"
}}
}}
"""
user_prompt = f"请翻译以下 JSON 为 {target_language}(保持 Key 不变):\n{json.dumps(data, indent=2, ensure_ascii=False)}"
messages = [
{"role": "system", "content": sys_prompt},
{"role": "user", "content": user_prompt}
]
result = await _call_model_with_retry(model_id, messages)
# 应用自定义词典和后处理
return _执行规则后处理(result, target_language, custom_glossary or {})
async def _call_model_with_retry(
model_id: str,
messages: List[dict],
max_retries: int = 3
) -> dict:
"""带重试的模型调用"""
if not async_client:
raise HTTPException(
status_code=503,
detail="Hugging Face 客户端未初始化,请检查 HF_TOKEN 配置"
)
for attempt in range(max_retries):
try:
start_time = time.time()
response = await asyncio.wait_for(
async_client.chat_completion(
model=model_id,
messages=messages,
max_tokens=4096,
temperature=0.1
),
timeout=120.0 # 2分钟超时
)
elapsed = time.time() - start_time
logger.debug(f"模型调用成功,耗时: {elapsed:.2f}秒")
output_content = response.choices[0].message.content
# 提取并解析 JSON
extracted_json = _清理提取JSON(output_content)
parsed_dict = json.loads(extracted_json)
return parsed_dict
except asyncio.TimeoutError:
logger.warning(f"模型调用超时 (尝试 {attempt + 1}/{max_retries})")
if attempt == max_retries - 1:
raise HTTPException(status_code=504, detail="请求超时,请稍后重试")
await asyncio.sleep(2 ** attempt)
except json.JSONDecodeError as e:
logger.warning(f"JSON 解析失败 (尝试 {attempt + 1}/{max_retries}): {str(e)}")
if attempt == max_retries - 1:
raise HTTPException(status_code=500, detail=f"JSON 解析失败: {str(e)}")
await asyncio.sleep(1)
except Exception as e:
error_msg = str(e)
logger.warning(f"模型调用异常 (尝试 {attempt + 1}/{max_retries}): {error_msg}")
# 【关键】计费额度耗尽 - 不可恢复,立即终止,不重试
if any(keyword in error_msg for keyword in ["402", "Payment Required", "depleted", "credits"]):
logger.error(f"HuggingFace 额度耗尽,立即终止: {error_msg}")
raise HTTPException(
status_code=402,
detail="HuggingFace 月度免费额度已耗尽,请前往 https://huggingface.co/settings/billing 购买额度或升级 PRO 订阅。"
)
# HF 集群过载 - 可重试
if any(keyword in error_msg for keyword in ["503", "Overloaded", "overloaded", "timeout"]):
if attempt == max_retries - 1:
raise HTTPException(status_code=503, detail="Hugging Face 集群繁忙,请稍后重试")
await asyncio.sleep(2 ** attempt)
else:
if attempt == max_retries - 1:
raise HTTPException(status_code=500, detail=f"云端翻译失败: {error_msg}")
await asyncio.sleep(1)
def _清理提取JSON(text: str) -> str:
"""从模型输出中提取 JSON"""
import re
# 移除 Qwen3 思考模式的 ... 内容
text = re.sub(r'[\s\S]*?', '', text).strip()
# 尝试提取 markdown 代码块
if "```" in text:
parts = text.split("```")
for i, part in enumerate(parts):
if i % 2 == 1: # 代码块内容
part = part.strip()
# 移除 json 标记
if part.startswith("json"):
part = part[4:].strip()
if part.startswith("{") and "}" in part:
return part[:part.rfind("}") + 1]
# 如果没有找到 json 块,使用最后一部分
last_part = parts[-1].strip()
if "{" in last_part and "}" in last_part:
return last_part[last_part.find("{"):last_part.rfind("}") + 1]
# 尝试直接提取 JSON 对象
if "{" in text:
start = text.find("{")
end = text.rfind("}") + 1
if end > start:
return text[start:end]
raise json.JSONDecodeError("无法从响应中提取 JSON", text, 0)
def _执行规则后处理(
data: Any,
target_language: str,
custom_glossary: Dict[str, str]
) -> Any:
"""翻译后处理,支持双向术语映射"""
是否为中文 = "中文" in target_language or "zh" in target_language.lower()
# 非中文目标:将模型可能残留的中文术语修正为英文
中文转英文映射 = {
"遮罩": "Mask", "蒙版": "Mask", "掩码": "Mask",
"图像": "Image", "图片": "Image",
"模型": "Model", "采样器": "Sampler",
"宽度": "Width", "高度": "Height",
"步数": "Steps", "种子": "Seed",
"批次": "Batch", "通道": "Channel",
"降噪": "Denoise", "标准化": "Normalized",
"提示词": "Prompt", "负面提示词": "Negative Prompt",
"加载": "Load", "保存": "Save",
"预览": "Preview", "输出": "Output", "输入": "Input",
"分类": "Category", "描述": "Description",
}
def 递归替换(obj):
if isinstance(obj, dict):
return {k: 递归替换(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [递归替换(i) for i in obj]
elif isinstance(obj, str):
val = obj
# 自定义词典(所有语言都生效)
if custom_glossary:
for 原词, 新词 in custom_glossary.items():
if 原词.lower() in val.lower():
val = val.replace(原词, 新词).replace(原词.capitalize(), 新词)
if 是否为中文:
# 中文目标:统一中文术语表达
术语映射 = {
"掩码": "遮罩",
"蒙版": "遮罩",
"Normalized": "标准化",
"normalized": "标准化",
"latent": "Latent",
"LATENT": "Latent",
}
for 原文, 译文 in 术语映射.items():
if 原文 in val and 译文 is not None:
val = val.replace(原文, 译文)
if val.lower() == "mask":
return "遮罩"
if val.lower() in ["image", "图片"]:
return "图像"
else:
# 非中文目标:清除模型残留的中文字符
含有中文 = any('\u4e00' <= c <= '\u9fff' for c in val)
if 含有中文:
if val in 中文转英文映射:
return 中文转英文映射[val]
for 中文, 英文 in 中文转英文映射.items():
if 中文 in val:
val = val.replace(中文, 英文)
return val
return obj
return 递归替换(data)
# ==========================================
# 错误处理器
# ==========================================
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
"""HTTP 异常处理"""
logger.warning(f"[HTTP {exc.status_code}] {exc.detail}")
return JSONResponse(
status_code=exc.status_code,
content={
"status": "error",
"error": exc.detail,
"code": exc.status_code
}
)
@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
"""通用异常处理"""
logger.error(f"[UNHANDLED] {type(exc).__name__}: {str(exc)}")
return JSONResponse(
status_code=500,
content={
"status": "error",
"error": "服务器内部错误",
"code": 500
}
)
# ==========================================
# 入口点
# ==========================================
if __name__ == "__main__":
import uvicorn
uvicorn.run(
app,
host="0.0.0.0",
port=7860,
log_level="info",
timeout_keep_alive=60
)