File size: 3,682 Bytes
d14cb3f
8abdadf
d14cb3f
811184f
8abdadf
 
 
 
811184f
1eca6b5
2fa50f6
8abdadf
 
 
 
 
 
 
 
 
 
2fa50f6
d14cb3f
2fa50f6
8abdadf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d14cb3f
8abdadf
 
 
 
 
 
 
c202f7c
8abdadf
 
 
d14cb3f
 
8abdadf
 
 
c202f7c
d14cb3f
 
 
c202f7c
 
8abdadf
c202f7c
d14cb3f
c202f7c
d14cb3f
 
 
 
8abdadf
 
d14cb3f
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
from fastapi import FastAPI, HTTPException, status
from typing import Dict, Any, Optional, List
import asyncio
import os
import requests
from dotenv import load_dotenv
import contextlib
from pydantic import BaseModel

from agents.root_agent import RootAgent

# 定义任务子项的模型
class Subtask(BaseModel):
    agent_type: str
    payload: Dict[str, Any]

# 定义 process_task endpoint 的请求体模型
class ProcessTaskRequest(BaseModel):
    subtasks: List[Subtask]
    global_config: Optional[Dict[str, Any]] = None # 可选的全局配置

app = FastAPI()
root_agent_instance: Optional[RootAgent] = None

# Upstash Redis 相关的全局变量
UPSTASH_REDIS_REST_URL: Optional[str] = None
UPSTASH_REDIS_REST_TOKEN: Optional[str] = None

@contextlib.asynccontextmanager
async def lifespan(app: FastAPI):
    global root_agent_instance, UPSTASH_REDIS_REST_URL, UPSTASH_REDIS_REST_TOKEN

    # 加载 .env 文件中的环境变量
    load_dotenv()
    UPSTASH_REDIS_REST_URL = os.getenv("UPSTASH_REDIS_REST_URL")
    UPSTASH_REDIS_REST_TOKEN = os.getenv("UPSTASH_REDIS_REST_TOKEN")

    if not UPSTASH_REDIS_REST_URL or not UPSTASH_REDIS_REST_TOKEN:
        print("错误:UPSTASH_REDIS_REST_URL 或 UPSTASH_REDIS_REST_TOKEN 未设置。请检查 .env 文件。")
        # 可以在这里选择抛出异常或继续,取决于应用程序对 Redis 的依赖程度
        # 为了演示,我们选择继续,但会在 Redis 操作时检查这些变量
    else:
        print("Upstash Redis 环境变量已加载。")
        # 可以在这里添加一个简单的 Redis 连接测试
        headers = {
            "Authorization": f"Bearer {UPSTASH_REDIS_REST_TOKEN}",
            "Content-Type": "application/json"
        }
        try:
            # 尝试执行一个简单的 PING 命令来测试连接
            ping_url = f"{UPSTASH_REDIS_REST_URL}/ping"
            response = requests.post(ping_url, headers=headers)
            response.raise_for_status()
            ping_result = response.json()
            if ping_result and ping_result.get('result') == "PONG":
                print("Upstash Redis 连接测试成功!")
            else:
                print(f"Upstash Redis 连接测试失败: {ping_result}")
        except Exception as e:
            print(f"Upstash Redis 连接测试发生错误: {e}")

    # 初始化 RootAgent
    root_agent_instance = RootAgent()
    try:
        await root_agent_instance.initialize()
        print("RootAgent 已初始化。")
    except Exception as e:
        print(f"RootAgent 初始化失败: {e}")
        # 可以在这里选择抛出异常或以降级模式运行
        raise # 重新抛出异常以阻止应用程序启动,直到问题解决

    yield # 应用程序在此处启动

    # 应用程序关闭时执行的清理工作
    if root_agent_instance:
        await root_agent_instance.shutdown()
        print("RootAgent 已关闭。")

app = FastAPI(lifespan=lifespan)

@app.get("/")
async def read_root():
    return {"message": "Worker service with RootAgent is running!"}

@app.post("/process_task")
async def process_task_endpoint(request: ProcessTaskRequest):
    """
    接收外部任务请求,并交由 RootAgent 处理。
    """
    if not root_agent_instance:
        raise HTTPException(status_code=500, detail="RootAgent not initialized.")
    
    try:
        # 将 Pydantic 模型转换为 RootAgent 期望的 Dict 格式
        task_data = request.dict()
        result = await root_agent_instance.process_task(task_data)
        return result
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Error processing task: {e}")