root_agent / agents /agent_manager_client.py
airsltd's picture
update
d14cb3f
from typing import Dict, List, Optional
import httpx
import os
# 假设 agent_manager 的 models.py 可以在这里被导入,或者我们重新定义需要的模型
# 为了避免循环依赖,这里直接定义需要的模型,或者从一个共享的包中导入
from pydantic import BaseModel
class AgentInfo(BaseModel):
id: str
agent_type: str
mcp_endpoint: str
status: str
created_at: str
last_heartbeat: str
metadata: Dict = {}
class CreateAgentRequest(BaseModel):
agent_type: str
image_name: str
env_vars: Dict[str, str] = {}
resource_limits: Dict = {}
config: Dict = {}
class AgentManagerClient:
def __init__(self, base_url: str):
self.base_url = base_url
self.client = httpx.AsyncClient()
async def create_agent(self, request: CreateAgentRequest) -> AgentInfo:
"""
向 Agent Manager 请求创建一个新的 Agent 实例。
"""
response = await self.client.post(f"{self.base_url}/agents", json=request.dict())
response.raise_for_status()
return AgentInfo(**response.json())
async def get_agent_info(self, agent_id: str) -> AgentInfo:
"""
从 Agent Manager 获取指定 Agent 实例的详细信息。
"""
response = await self.client.get(f"{self.base_url}/agents/{agent_id}")
response.raise_for_status()
return AgentInfo(**response.json())
async def list_agents(self, agent_type: Optional[str] = None) -> List[AgentInfo]:
"""
从 Agent Manager 列出所有活跃的 Agent 实例。
"""
params = {"agent_type": agent_type} if agent_type else {}
response = await self.client.get(f"{self.base_url}/agents", params=params)
response.raise_for_status()
return [AgentInfo(**agent_data) for agent_data in response.json()]
async def destroy_agent(self, agent_id: str) -> bool:
"""
向 Agent Manager 请求销毁一个 Agent 实例。
"""
response = await self.client.delete(f"{self.base_url}/agents/{agent_id}")
response.raise_for_status()
return response.status_code == 204 # No Content
async def stop_agent(self, agent_id: str) -> bool:
"""
向 Agent Manager 请求停止一个 Agent 实例。
"""
response = await self.client.post(f"{self.base_url}/agents/{agent_id}/stop")
response.raise_for_status()
return response.status_code == 200 # OK
# 示例用法 (在实际 RootAgent 中使用)
# AGENT_MANAGER_BASE_URL = os.getenv("AGENT_MANAGER_BASE_URL", "http://localhost:7860")
# agent_manager_client = AgentManagerClient(base_url=AGENT_MANAGER_BASE_URL)