Spaces:
Sleeping
Sleeping
File size: 3,682 Bytes
d14cb3f 8abdadf d14cb3f 811184f 8abdadf 811184f 1eca6b5 2fa50f6 8abdadf 2fa50f6 d14cb3f 2fa50f6 8abdadf d14cb3f 8abdadf c202f7c 8abdadf d14cb3f 8abdadf c202f7c d14cb3f c202f7c 8abdadf c202f7c d14cb3f c202f7c d14cb3f 8abdadf d14cb3f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
from fastapi import FastAPI, HTTPException, status
from typing import Dict, Any, Optional, List
import asyncio
import os
import requests
from dotenv import load_dotenv
import contextlib
from pydantic import BaseModel
from agents.root_agent import RootAgent
# 定义任务子项的模型
class Subtask(BaseModel):
agent_type: str
payload: Dict[str, Any]
# 定义 process_task endpoint 的请求体模型
class ProcessTaskRequest(BaseModel):
subtasks: List[Subtask]
global_config: Optional[Dict[str, Any]] = None # 可选的全局配置
app = FastAPI()
root_agent_instance: Optional[RootAgent] = None
# Upstash Redis 相关的全局变量
UPSTASH_REDIS_REST_URL: Optional[str] = None
UPSTASH_REDIS_REST_TOKEN: Optional[str] = None
@contextlib.asynccontextmanager
async def lifespan(app: FastAPI):
global root_agent_instance, UPSTASH_REDIS_REST_URL, UPSTASH_REDIS_REST_TOKEN
# 加载 .env 文件中的环境变量
load_dotenv()
UPSTASH_REDIS_REST_URL = os.getenv("UPSTASH_REDIS_REST_URL")
UPSTASH_REDIS_REST_TOKEN = os.getenv("UPSTASH_REDIS_REST_TOKEN")
if not UPSTASH_REDIS_REST_URL or not UPSTASH_REDIS_REST_TOKEN:
print("错误:UPSTASH_REDIS_REST_URL 或 UPSTASH_REDIS_REST_TOKEN 未设置。请检查 .env 文件。")
# 可以在这里选择抛出异常或继续,取决于应用程序对 Redis 的依赖程度
# 为了演示,我们选择继续,但会在 Redis 操作时检查这些变量
else:
print("Upstash Redis 环境变量已加载。")
# 可以在这里添加一个简单的 Redis 连接测试
headers = {
"Authorization": f"Bearer {UPSTASH_REDIS_REST_TOKEN}",
"Content-Type": "application/json"
}
try:
# 尝试执行一个简单的 PING 命令来测试连接
ping_url = f"{UPSTASH_REDIS_REST_URL}/ping"
response = requests.post(ping_url, headers=headers)
response.raise_for_status()
ping_result = response.json()
if ping_result and ping_result.get('result') == "PONG":
print("Upstash Redis 连接测试成功!")
else:
print(f"Upstash Redis 连接测试失败: {ping_result}")
except Exception as e:
print(f"Upstash Redis 连接测试发生错误: {e}")
# 初始化 RootAgent
root_agent_instance = RootAgent()
try:
await root_agent_instance.initialize()
print("RootAgent 已初始化。")
except Exception as e:
print(f"RootAgent 初始化失败: {e}")
# 可以在这里选择抛出异常或以降级模式运行
raise # 重新抛出异常以阻止应用程序启动,直到问题解决
yield # 应用程序在此处启动
# 应用程序关闭时执行的清理工作
if root_agent_instance:
await root_agent_instance.shutdown()
print("RootAgent 已关闭。")
app = FastAPI(lifespan=lifespan)
@app.get("/")
async def read_root():
return {"message": "Worker service with RootAgent is running!"}
@app.post("/process_task")
async def process_task_endpoint(request: ProcessTaskRequest):
"""
接收外部任务请求,并交由 RootAgent 处理。
"""
if not root_agent_instance:
raise HTTPException(status_code=500, detail="RootAgent not initialized.")
try:
# 将 Pydantic 模型转换为 RootAgent 期望的 Dict 格式
task_data = request.dict()
result = await root_agent_instance.process_task(task_data)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error processing task: {e}")
|