"""Grok2API""" import os import sys from contextlib import asynccontextmanager from pathlib import Path from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from app.core.logger import logger from app.core.exception import register_exception_handlers from app.core.storage import storage_manager from app.core.config import setting from app.services.grok.token import token_manager from app.api.v1.chat import router as chat_router from app.api.v1.models import router as models_router from app.api.v1.images import router as images_router from app.api.admin.manage import router as admin_router from app.services.mcp import mcp # 0. 兼容性检测 try: if sys.platform != 'win32': import uvloop uvloop.install() logger.info("[Grok2API] 启用uvloop高性能事件循环") else: logger.info("[Grok2API] Windows系统,使用默认asyncio事件循环") except ImportError: logger.info("[Grok2API] uvloop未安装,使用默认asyncio事件循环") # 1. 创建MCP的FastAPI应用实例 mcp_app = mcp.http_app(stateless_http=True, transport="streamable-http") # 2. 定义应用生命周期 @asynccontextmanager async def lifespan(app: FastAPI): """ 启动顺序: 1. 初始化核心服务 (storage, settings, token_manager) 2. 异步加载 token 数据 3. 启动批量保存任务 4. 启动MCP服务生命周期 关闭顺序 (LIFO): 1. 关闭MCP服务生命周期 2. 关闭批量保存任务并刷新数据 3. 关闭核心服务 """ # --- 启动过程 --- # 1. 初始化核心服务 await storage_manager.init() # 设置存储到配置和token管理器 storage = storage_manager.get_storage() setting.set_storage(storage) token_manager.set_storage(storage) # 2. 重新加载配置 await setting.reload() logger.info("[Grok2API] 核心服务初始化完成") # 2.5. 初始化代理池 from app.core.proxy_pool import proxy_pool proxy_url = setting.grok_config.get("proxy_url", "") proxy_pool_url = setting.grok_config.get("proxy_pool_url", "") proxy_pool_interval = setting.grok_config.get("proxy_pool_interval", 300) proxy_pool.configure(proxy_url, proxy_pool_url, proxy_pool_interval) # 3. 异步加载 token 数据 await token_manager._load_data() logger.info("[Grok2API] Token数据加载完成") # 4. 启动批量保存任务 await token_manager.start_batch_save() # 5. 管理MCP服务的生命周期 mcp_lifespan_context = mcp_app.lifespan(app) await mcp_lifespan_context.__aenter__() logger.info("[MCP] MCP服务初始化完成") logger.info("[Grok2API] 应用启动成功") try: yield finally: # --- 关闭过程 --- # 1. 退出MCP服务的生命周期 await mcp_lifespan_context.__aexit__(None, None, None) logger.info("[MCP] MCP服务已关闭") # 2. 关闭批量保存任务并刷新数据 await token_manager.shutdown() logger.info("[Token] Token管理器已关闭") # 3. 关闭核心服务 await storage_manager.close() logger.info("[Grok2API] 应用关闭成功") # 初始化日志 logger.info("[Grok2API] 应用正在启动...") # 创建FastAPI应用 app = FastAPI( title="Grok2API", description="Grok API 转换服务", version="1.3.1", lifespan=lifespan ) # 注册全局异常处理器 register_exception_handlers(app) # 注册路由 app.include_router(chat_router, prefix="/v1") app.include_router(models_router, prefix="/v1") app.include_router(images_router) app.include_router(admin_router) # 挂载静态文件 app.mount("/static", StaticFiles(directory="app/template"), name="template") @app.get("/") async def root(): """根路径""" from fastapi.responses import RedirectResponse return RedirectResponse(url="/login") @app.get("/health") async def health_check(): """健康检查接口""" return { "status": "healthy", "service": "Grok2API", "version": "1.0.3" } # 挂载MCP服务器 app.mount("", mcp_app) if __name__ == "__main__": import uvicorn import os # 读取 worker 数量,默认为 1 workers = int(os.getenv("WORKERS", "1")) # 提示多进程模式 if workers > 1: logger.info( f"[Grok2API] 多进程模式已启用 (workers={workers})。" f"建议使用 Redis/MySQL 存储以获得最佳性能。" ) # 确定事件循环类型 loop_type = "auto" if workers == 1 and sys.platform != 'win32': try: import uvloop loop_type = "uvloop" except ImportError: pass uvicorn.run( "main:app", host="0.0.0.0", port=8001, workers=workers, loop=loop_type )