Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException, status | |
| from typing import Dict, Any, Optional, List | |
| import asyncio | |
| import os | |
| import requests | |
| from dotenv import load_dotenv | |
| import contextlib | |
| from pydantic import BaseModel | |
| from agents.root_agent import RootAgent | |
| # 定义任务子项的模型 | |
| class Subtask(BaseModel): | |
| agent_type: str | |
| payload: Dict[str, Any] | |
| # 定义 process_task endpoint 的请求体模型 | |
| class ProcessTaskRequest(BaseModel): | |
| subtasks: List[Subtask] | |
| global_config: Optional[Dict[str, Any]] = None # 可选的全局配置 | |
| app = FastAPI() | |
| root_agent_instance: Optional[RootAgent] = None | |
| # Upstash Redis 相关的全局变量 | |
| UPSTASH_REDIS_REST_URL: Optional[str] = None | |
| UPSTASH_REDIS_REST_TOKEN: Optional[str] = None | |
| async def lifespan(app: FastAPI): | |
| global root_agent_instance, UPSTASH_REDIS_REST_URL, UPSTASH_REDIS_REST_TOKEN | |
| # 加载 .env 文件中的环境变量 | |
| load_dotenv() | |
| UPSTASH_REDIS_REST_URL = os.getenv("UPSTASH_REDIS_REST_URL") | |
| UPSTASH_REDIS_REST_TOKEN = os.getenv("UPSTASH_REDIS_REST_TOKEN") | |
| if not UPSTASH_REDIS_REST_URL or not UPSTASH_REDIS_REST_TOKEN: | |
| print("错误:UPSTASH_REDIS_REST_URL 或 UPSTASH_REDIS_REST_TOKEN 未设置。请检查 .env 文件。") | |
| # 可以在这里选择抛出异常或继续,取决于应用程序对 Redis 的依赖程度 | |
| # 为了演示,我们选择继续,但会在 Redis 操作时检查这些变量 | |
| else: | |
| print("Upstash Redis 环境变量已加载。") | |
| # 可以在这里添加一个简单的 Redis 连接测试 | |
| headers = { | |
| "Authorization": f"Bearer {UPSTASH_REDIS_REST_TOKEN}", | |
| "Content-Type": "application/json" | |
| } | |
| try: | |
| # 尝试执行一个简单的 PING 命令来测试连接 | |
| ping_url = f"{UPSTASH_REDIS_REST_URL}/ping" | |
| response = requests.post(ping_url, headers=headers) | |
| response.raise_for_status() | |
| ping_result = response.json() | |
| if ping_result and ping_result.get('result') == "PONG": | |
| print("Upstash Redis 连接测试成功!") | |
| else: | |
| print(f"Upstash Redis 连接测试失败: {ping_result}") | |
| except Exception as e: | |
| print(f"Upstash Redis 连接测试发生错误: {e}") | |
| # 初始化 RootAgent | |
| root_agent_instance = RootAgent() | |
| try: | |
| await root_agent_instance.initialize() | |
| print("RootAgent 已初始化。") | |
| except Exception as e: | |
| print(f"RootAgent 初始化失败: {e}") | |
| # 可以在这里选择抛出异常或以降级模式运行 | |
| raise # 重新抛出异常以阻止应用程序启动,直到问题解决 | |
| yield # 应用程序在此处启动 | |
| # 应用程序关闭时执行的清理工作 | |
| if root_agent_instance: | |
| await root_agent_instance.shutdown() | |
| print("RootAgent 已关闭。") | |
| app = FastAPI(lifespan=lifespan) | |
| async def read_root(): | |
| return {"message": "Worker service with RootAgent is running!"} | |
| async def process_task_endpoint(request: ProcessTaskRequest): | |
| """ | |
| 接收外部任务请求,并交由 RootAgent 处理。 | |
| """ | |
| if not root_agent_instance: | |
| raise HTTPException(status_code=500, detail="RootAgent not initialized.") | |
| try: | |
| # 将 Pydantic 模型转换为 RootAgent 期望的 Dict 格式 | |
| task_data = request.dict() | |
| result = await root_agent_instance.process_task(task_data) | |
| return result | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error processing task: {e}") | |