Spaces:
Sleeping
Sleeping
File size: 2,706 Bytes
d14cb3f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
from typing import Dict, List, Optional
import httpx
import os
# 假设 agent_manager 的 models.py 可以在这里被导入,或者我们重新定义需要的模型
# 为了避免循环依赖,这里直接定义需要的模型,或者从一个共享的包中导入
from pydantic import BaseModel
class AgentInfo(BaseModel):
id: str
agent_type: str
mcp_endpoint: str
status: str
created_at: str
last_heartbeat: str
metadata: Dict = {}
class CreateAgentRequest(BaseModel):
agent_type: str
image_name: str
env_vars: Dict[str, str] = {}
resource_limits: Dict = {}
config: Dict = {}
class AgentManagerClient:
def __init__(self, base_url: str):
self.base_url = base_url
self.client = httpx.AsyncClient()
async def create_agent(self, request: CreateAgentRequest) -> AgentInfo:
"""
向 Agent Manager 请求创建一个新的 Agent 实例。
"""
response = await self.client.post(f"{self.base_url}/agents", json=request.dict())
response.raise_for_status()
return AgentInfo(**response.json())
async def get_agent_info(self, agent_id: str) -> AgentInfo:
"""
从 Agent Manager 获取指定 Agent 实例的详细信息。
"""
response = await self.client.get(f"{self.base_url}/agents/{agent_id}")
response.raise_for_status()
return AgentInfo(**response.json())
async def list_agents(self, agent_type: Optional[str] = None) -> List[AgentInfo]:
"""
从 Agent Manager 列出所有活跃的 Agent 实例。
"""
params = {"agent_type": agent_type} if agent_type else {}
response = await self.client.get(f"{self.base_url}/agents", params=params)
response.raise_for_status()
return [AgentInfo(**agent_data) for agent_data in response.json()]
async def destroy_agent(self, agent_id: str) -> bool:
"""
向 Agent Manager 请求销毁一个 Agent 实例。
"""
response = await self.client.delete(f"{self.base_url}/agents/{agent_id}")
response.raise_for_status()
return response.status_code == 204 # No Content
async def stop_agent(self, agent_id: str) -> bool:
"""
向 Agent Manager 请求停止一个 Agent 实例。
"""
response = await self.client.post(f"{self.base_url}/agents/{agent_id}/stop")
response.raise_for_status()
return response.status_code == 200 # OK
# 示例用法 (在实际 RootAgent 中使用)
# AGENT_MANAGER_BASE_URL = os.getenv("AGENT_MANAGER_BASE_URL", "http://localhost:7860")
# agent_manager_client = AgentManagerClient(base_url=AGENT_MANAGER_BASE_URL)
|