from fastapi import FastAPI, HTTPException, status from typing import Dict, Any, Optional, List import asyncio import os import requests from dotenv import load_dotenv import contextlib from pydantic import BaseModel from agents.root_agent import RootAgent # 定义任务子项的模型 class Subtask(BaseModel): agent_type: str payload: Dict[str, Any] # 定义 process_task endpoint 的请求体模型 class ProcessTaskRequest(BaseModel): subtasks: List[Subtask] global_config: Optional[Dict[str, Any]] = None # 可选的全局配置 app = FastAPI() root_agent_instance: Optional[RootAgent] = None # Upstash Redis 相关的全局变量 UPSTASH_REDIS_REST_URL: Optional[str] = None UPSTASH_REDIS_REST_TOKEN: Optional[str] = None @contextlib.asynccontextmanager async def lifespan(app: FastAPI): global root_agent_instance, UPSTASH_REDIS_REST_URL, UPSTASH_REDIS_REST_TOKEN # 加载 .env 文件中的环境变量 load_dotenv() UPSTASH_REDIS_REST_URL = os.getenv("UPSTASH_REDIS_REST_URL") UPSTASH_REDIS_REST_TOKEN = os.getenv("UPSTASH_REDIS_REST_TOKEN") if not UPSTASH_REDIS_REST_URL or not UPSTASH_REDIS_REST_TOKEN: print("错误:UPSTASH_REDIS_REST_URL 或 UPSTASH_REDIS_REST_TOKEN 未设置。请检查 .env 文件。") # 可以在这里选择抛出异常或继续,取决于应用程序对 Redis 的依赖程度 # 为了演示,我们选择继续,但会在 Redis 操作时检查这些变量 else: print("Upstash Redis 环境变量已加载。") # 可以在这里添加一个简单的 Redis 连接测试 headers = { "Authorization": f"Bearer {UPSTASH_REDIS_REST_TOKEN}", "Content-Type": "application/json" } try: # 尝试执行一个简单的 PING 命令来测试连接 ping_url = f"{UPSTASH_REDIS_REST_URL}/ping" response = requests.post(ping_url, headers=headers) response.raise_for_status() ping_result = response.json() if ping_result and ping_result.get('result') == "PONG": print("Upstash Redis 连接测试成功!") else: print(f"Upstash Redis 连接测试失败: {ping_result}") except Exception as e: print(f"Upstash Redis 连接测试发生错误: {e}") # 初始化 RootAgent root_agent_instance = RootAgent() try: await root_agent_instance.initialize() print("RootAgent 已初始化。") except Exception as e: print(f"RootAgent 初始化失败: {e}") # 可以在这里选择抛出异常或以降级模式运行 raise # 重新抛出异常以阻止应用程序启动,直到问题解决 yield # 应用程序在此处启动 # 应用程序关闭时执行的清理工作 if root_agent_instance: await root_agent_instance.shutdown() print("RootAgent 已关闭。") app = FastAPI(lifespan=lifespan) @app.get("/") async def read_root(): return {"message": "Worker service with RootAgent is running!"} @app.post("/process_task") async def process_task_endpoint(request: ProcessTaskRequest): """ 接收外部任务请求,并交由 RootAgent 处理。 """ if not root_agent_instance: raise HTTPException(status_code=500, detail="RootAgent not initialized.") try: # 将 Pydantic 模型转换为 RootAgent 期望的 Dict 格式 task_data = request.dict() result = await root_agent_instance.process_task(task_data) return result except Exception as e: raise HTTPException(status_code=500, detail=f"Error processing task: {e}")