File size: 7,654 Bytes
621645b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 |
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Warp Protobuf编解码服务器启动文件
纯protobuf编解码服务器,提供JSON<->Protobuf转换、WebSocket监控和静态文件服务。
"""
import os
import sys
import asyncio
import json
from pathlib import Path
from typing import Any, Dict, List
# 添加common模块到Python路径
sys.path.insert(0, str(Path(__file__).parent))
import uvicorn
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse
from fastapi import Query, HTTPException
from fastapi.responses import Response
# 使用统一的共享模块
from common.config import config
from common.schema_utils import sanitize_json_schema
from common.message_codec import decode_server_message_data, encode_server_message_data
from warp2protobuf.api.protobuf_routes import app as protobuf_app
from warp2protobuf.core.logging import logger, set_log_file
from warp2protobuf.api.protobuf_routes import EncodeRequest, _encode_smd_inplace
from warp2protobuf.core.protobuf_utils import dict_to_protobuf_bytes
from warp2protobuf.core.schema_sanitizer import sanitize_mcp_input_schema_in_packet
from warp2protobuf.core.auth import acquire_anonymous_access_token
from warp2protobuf.config.models import get_all_unique_models
def create_app() -> FastAPI:
"""创建FastAPI应用"""
# 将服务器日志重定向到专用文件
try:
set_log_file('warp_server.log')
except Exception:
pass
# 使用protobuf路由的应用作为主应用
app = protobuf_app
# ============= 新增接口:返回protobuf编码后的AI请求字节 =============
@app.post("/api/warp/encode_raw")
async def encode_ai_request_raw(
request: EncodeRequest,
output: str = Query("raw", description="输出格式:raw(默认,返回application/x-protobuf字节) 或 base64", pattern=r"^(raw|base64)$"),
):
try:
# 获取实际数据并验证
actual_data = request.get_data()
if not actual_data:
raise HTTPException(400, "数据包不能为空")
# 在 encode 之前,对 mcp_context.tools[*].input_schema 做一次安全清理
if isinstance(actual_data, dict):
wrapped = {"json_data": actual_data}
wrapped = sanitize_mcp_input_schema_in_packet(wrapped)
actual_data = wrapped.get("json_data", actual_data)
# 将 server_message_data 对象(如有)编码为 Base64URL 字符串
actual_data = _encode_smd_inplace(actual_data)
# 编码为protobuf字节
protobuf_bytes = dict_to_protobuf_bytes(actual_data, request.message_type)
logger.debug(f"✅ AI请求编码为protobuf成功: {len(protobuf_bytes)} 字节")
if output == "raw":
# 直接返回二进制 protobuf 内容
return Response(
content=protobuf_bytes,
media_type="application/x-protobuf",
headers={"Content-Length": str(len(protobuf_bytes))},
)
else:
# 返回base64文本,便于在JSON中传输/调试
import base64
return {
"protobuf_base64": base64.b64encode(protobuf_bytes).decode("utf-8"),
"size": len(protobuf_bytes),
"message_type": request.message_type,
}
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ AI请求编码失败: {e}")
raise HTTPException(500, f"编码失败: {str(e)}")
# ============= OpenAI 兼容:模型列表接口 =============
@app.get("/v1/models")
async def list_models():
"""OpenAI-compatible endpoint that lists available models."""
try:
models = get_all_unique_models()
return {"object": "list", "data": models}
except Exception as e:
logger.error(f"❌ 获取模型列表失败: {e}")
raise HTTPException(500, f"获取模型列表失败: {str(e)}")
return app
async def startup_tasks():
"""启动时执行的任务"""
logger.info("="*60)
logger.info("Warp Protobuf编解码服务器启动")
logger.info("="*60)
# 检查端口冲突
if config.WARP_SERVER_PORT == config.OPENAI_COMPAT_PORT:
logger.warning("⚠️ 警告:Warp服务器端口和OpenAI兼容层端口相同,可能导致冲突!")
logger.warning(f"当前配置:WARP_SERVER_PORT={config.WARP_SERVER_PORT}, OPENAI_COMPAT_PORT={config.OPENAI_COMPAT_PORT}")
logger.warning("请在.env文件中设置不同的端口,或使用--port参数指定不同的端口")
# 检查protobuf运行时
try:
from warp2protobuf.core.protobuf import ensure_proto_runtime
ensure_proto_runtime()
logger.info("✅ Protobuf运行时初始化成功")
except Exception as e:
logger.error(f"❌ Protobuf运行时初始化失败: {e}")
raise
# 检查JWT token
try:
from warp2protobuf.core.auth import get_jwt_token, is_token_expired
token = get_jwt_token()
if token and not is_token_expired(token):
logger.info("✅ JWT token有效")
elif not token:
logger.warning("⚠️ 未找到JWT token,尝试申请匿名访问token用于额度初始化…")
try:
new_token = await acquire_anonymous_access_token()
if new_token:
logger.info("✅ 匿名访问token申请成功")
else:
logger.warning("⚠️ 匿名访问token申请失败")
except Exception as e2:
logger.warning(f"⚠️ 匿名访问token申请异常: {e2}")
else:
logger.warning("⚠️ JWT token无效或已过期,建议运行: uv run refresh_jwt.py")
except Exception as e:
logger.warning(f"⚠️ JWT检查失败: {e}")
# 显示可用端点(简化版)
logger.info("📍 API端点: http://localhost:%s", config.WARP_SERVER_PORT)
def main():
"""主函数"""
import argparse
from contextlib import asynccontextmanager
# 解析命令行参数
parser = argparse.ArgumentParser(description="Warp Protobuf编解码服务器")
parser.add_argument("--port", type=int, default=config.WARP_SERVER_PORT,
help=f"服务器监听端口 (默认: {config.WARP_SERVER_PORT})")
args = parser.parse_args()
# 使用lifespan上下文管理器替代on_event
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动时执行
await startup_tasks()
yield
# 关闭时执行(如果需要的话)
# 创建应用
app = create_app()
app.router.lifespan_context = lifespan
# 启动服务器
try:
logger.info(f"🌐 服务器监听端口: {args.port}")
uvicorn.run(
app,
host=config.HOST,
port=args.port,
log_level="info",
access_log=True
)
except KeyboardInterrupt:
logger.info("服务器被用户停止")
except Exception as e:
logger.error(f"服务器启动失败: {e}")
raise
if __name__ == "__main__":
main()
|