root_agent / app.py
airsltd's picture
update
8abdadf
from fastapi import FastAPI, HTTPException, status
from typing import Dict, Any, Optional, List
import asyncio
import os
import requests
from dotenv import load_dotenv
import contextlib
from pydantic import BaseModel
from agents.root_agent import RootAgent
# 定义任务子项的模型
class Subtask(BaseModel):
agent_type: str
payload: Dict[str, Any]
# 定义 process_task endpoint 的请求体模型
class ProcessTaskRequest(BaseModel):
subtasks: List[Subtask]
global_config: Optional[Dict[str, Any]] = None # 可选的全局配置
app = FastAPI()
root_agent_instance: Optional[RootAgent] = None
# Upstash Redis 相关的全局变量
UPSTASH_REDIS_REST_URL: Optional[str] = None
UPSTASH_REDIS_REST_TOKEN: Optional[str] = None
@contextlib.asynccontextmanager
async def lifespan(app: FastAPI):
global root_agent_instance, UPSTASH_REDIS_REST_URL, UPSTASH_REDIS_REST_TOKEN
# 加载 .env 文件中的环境变量
load_dotenv()
UPSTASH_REDIS_REST_URL = os.getenv("UPSTASH_REDIS_REST_URL")
UPSTASH_REDIS_REST_TOKEN = os.getenv("UPSTASH_REDIS_REST_TOKEN")
if not UPSTASH_REDIS_REST_URL or not UPSTASH_REDIS_REST_TOKEN:
print("错误:UPSTASH_REDIS_REST_URL 或 UPSTASH_REDIS_REST_TOKEN 未设置。请检查 .env 文件。")
# 可以在这里选择抛出异常或继续,取决于应用程序对 Redis 的依赖程度
# 为了演示,我们选择继续,但会在 Redis 操作时检查这些变量
else:
print("Upstash Redis 环境变量已加载。")
# 可以在这里添加一个简单的 Redis 连接测试
headers = {
"Authorization": f"Bearer {UPSTASH_REDIS_REST_TOKEN}",
"Content-Type": "application/json"
}
try:
# 尝试执行一个简单的 PING 命令来测试连接
ping_url = f"{UPSTASH_REDIS_REST_URL}/ping"
response = requests.post(ping_url, headers=headers)
response.raise_for_status()
ping_result = response.json()
if ping_result and ping_result.get('result') == "PONG":
print("Upstash Redis 连接测试成功!")
else:
print(f"Upstash Redis 连接测试失败: {ping_result}")
except Exception as e:
print(f"Upstash Redis 连接测试发生错误: {e}")
# 初始化 RootAgent
root_agent_instance = RootAgent()
try:
await root_agent_instance.initialize()
print("RootAgent 已初始化。")
except Exception as e:
print(f"RootAgent 初始化失败: {e}")
# 可以在这里选择抛出异常或以降级模式运行
raise # 重新抛出异常以阻止应用程序启动,直到问题解决
yield # 应用程序在此处启动
# 应用程序关闭时执行的清理工作
if root_agent_instance:
await root_agent_instance.shutdown()
print("RootAgent 已关闭。")
app = FastAPI(lifespan=lifespan)
@app.get("/")
async def read_root():
return {"message": "Worker service with RootAgent is running!"}
@app.post("/process_task")
async def process_task_endpoint(request: ProcessTaskRequest):
"""
接收外部任务请求,并交由 RootAgent 处理。
"""
if not root_agent_instance:
raise HTTPException(status_code=500, detail="RootAgent not initialized.")
try:
# 将 Pydantic 模型转换为 RootAgent 期望的 Dict 格式
task_data = request.dict()
result = await root_agent_instance.process_task(task_data)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error processing task: {e}")