Spaces:
Sleeping
Sleeping
File size: 12,417 Bytes
eac355f 4d7b8a6 eac355f 4d7b8a6 eac355f 4d7b8a6 eac355f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 |
"""
Agent Manager for OpenFloor Protocol
Handles agent discovery, invitation, and manifest management
"""
import requests
import logging
from typing import Dict, Optional, Any
from dataclasses import dataclass
from datetime import datetime
from .protocol.envelope import create_envelope, create_inter_agent_message
from .protocol.events import create_invite_event, create_get_manifest_event
from .utils.helpers import generate_message_id
from .utils.config import settings
logger = logging.getLogger(__name__)
@dataclass
class AgentInfo:
"""Information about an agent"""
speaker_uri: str
service_url: str
manifest: Optional[Dict[str, Any]] = None
conversation_id: Optional[str] = None
invited_at: Optional[datetime] = None
def to_dict(self) -> Dict[str, Any]:
return {
"speaker_uri": self.speaker_uri,
"service_url": self.service_url,
"manifest": self.manifest,
"conversation_id": self.conversation_id,
"invited_at": self.invited_at.isoformat() if self.invited_at else None
}
class AgentManager:
"""
Manages agent discovery and invitation for OpenFloor Protocol
Based on OFP 1.0.0 specification:
- getManifests event: Discover agent capabilities
- invite event: Invite agents to join conversations
"""
def __init__(self, floor_manager_uri: str, floor_manager_url: str):
"""
Initialize Agent Manager
Args:
floor_manager_uri: Speaker URI of the floor manager
floor_manager_url: Service URL of the floor manager
"""
self.floor_manager_uri = floor_manager_uri
self.floor_manager_url = floor_manager_url
self.agents: Dict[str, AgentInfo] = {}
logger.info(f"Agent Manager initialized: {floor_manager_uri}")
async def get_agent_manifest(
self,
agent_service_url: str,
timeout: int = 10
) -> Optional[Dict[str, Any]]:
"""
Get an agent's manifest by sending getManifests event
Per OFP spec: https://openfloor.dev/protocol/specifications/inter-agent-message#h2-117-getmanifests-event
Args:
agent_service_url: URL of the agent's service
timeout: Request timeout in seconds
Returns:
Agent manifest dictionary or None if failed
"""
try:
logger.info(f"Requesting manifest from agent: {agent_service_url}")
# Create getManifests event envelope
conversation_id = f"manifest_request_{generate_message_id()}"
# Create OFP envelope with getManifests event
envelope_dict = {
"openFloor": {
"schema": {
"version": settings.OFP_VERSION
},
"conversation": {
"id": conversation_id
},
"sender": {
"speakerUri": self.floor_manager_uri,
"serviceUrl": self.floor_manager_url
},
"events": [
{
"eventType": "getManifests"
}
]
}
}
# Send HTTPS POST request to agent
response = requests.post(
agent_service_url,
json=envelope_dict,
headers={
'Content-Type': 'application/json',
'User-Agent': f'OFP-FloorManager/{settings.OFP_VERSION}'
},
timeout=timeout
)
response.raise_for_status()
# Parse response - should contain publishManifests or publishManifest event
response_data = response.json()
if "openFloor" in response_data:
events = response_data["openFloor"].get("events", [])
for event in events:
event_type = event.get("eventType")
# Handle both publishManifests (plural) and publishManifest (singular)
if event_type in ["publishManifests", "publishManifest"]:
parameters = event.get("parameters", {})
# Try direct manifest field
manifest = parameters.get("manifest")
if manifest:
logger.info(f"β Received manifest from {agent_service_url}")
return manifest
# Try servicingManifests array (OFP spec format)
servicing_manifests = parameters.get("servicingManifests", [])
if servicing_manifests and len(servicing_manifests) > 0:
# Get the first manifest and extract identification
first_manifest = servicing_manifests[0]
identification = first_manifest.get("identification", {})
if identification:
logger.info(f"β Received manifest from {agent_service_url}")
# Return the identification as the manifest
return identification
logger.warning(f"No manifest found in response from {agent_service_url}")
return None
except requests.exceptions.Timeout:
logger.error(f"β Timeout getting manifest from {agent_service_url}")
return None
except requests.exceptions.RequestException as e:
logger.error(f"β Failed to get manifest from {agent_service_url}: {e}")
return None
except Exception as e:
logger.error(f"β Unexpected error getting manifest: {e}")
return None
async def invite_agent(
self,
agent_speaker_uri: str,
agent_service_url: str,
conversation_id: str,
timeout: int = 10
) -> bool:
"""
Invite an agent to join a conversation
Per OFP spec: https://openfloor.dev/protocol/specifications/inter-agent-message#h2-113-invite-event
Args:
agent_speaker_uri: Speaker URI of the agent to invite
agent_service_url: Service URL of the agent
conversation_id: Conversation ID to invite agent to
timeout: Request timeout in seconds
Returns:
True if invite was sent successfully, False otherwise
"""
try:
logger.info(f"Inviting agent {agent_speaker_uri} to conversation {conversation_id}")
# Create OFP envelope with invite event
envelope_dict = {
"openFloor": {
"schema": {
"version": settings.OFP_VERSION
},
"conversation": {
"id": conversation_id
},
"sender": {
"speakerUri": self.floor_manager_uri,
"serviceUrl": self.floor_manager_url
},
"events": [
{
"eventType": "invite",
"to": {
"serviceUrl": agent_service_url,
"speakerUri": agent_speaker_uri
}
}
]
}
}
# Send HTTPS POST request to agent
response = requests.post(
agent_service_url,
json=envelope_dict,
headers={
'Content-Type': 'application/json',
'User-Agent': f'OFP-FloorManager/{settings.OFP_VERSION}'
},
timeout=timeout
)
response.raise_for_status()
# Store agent info
agent_info = AgentInfo(
speaker_uri=agent_speaker_uri,
service_url=agent_service_url,
conversation_id=conversation_id,
invited_at=datetime.now()
)
self.agents[agent_speaker_uri] = agent_info
logger.info(f"β Successfully invited agent {agent_speaker_uri}")
return True
except requests.exceptions.Timeout:
logger.error(f"β Timeout inviting agent {agent_speaker_uri}")
return False
except requests.exceptions.RequestException as e:
logger.error(f"β Failed to invite agent {agent_speaker_uri}: {e}")
return False
except Exception as e:
logger.error(f"β Unexpected error inviting agent: {e}")
return False
async def add_agent(
self,
agent_service_url: str,
conversation_id: str,
timeout: int = 10,
session_participants: Optional[Dict[str, AgentInfo]] = None
) -> Optional[AgentInfo]:
"""
Add an agent by getting their manifest and inviting them to a conversation
This is a convenience method that combines:
1. Getting the agent's manifest (to discover their speaker URI)
2. Inviting the agent to the conversation
3. Optionally adding to session participants
Args:
agent_service_url: Service URL of the agent
conversation_id: Conversation ID to invite agent to
timeout: Request timeout in seconds
session_participants: Optional dict to add the agent to
Returns:
AgentInfo object if successful, None otherwise
"""
try:
# Step 1: Get agent's manifest
manifest = await self.get_agent_manifest(agent_service_url, timeout)
if not manifest:
logger.error(f"Failed to get manifest for agent: {agent_service_url}")
return None
# Extract speaker URI from manifest
agent_speaker_uri = manifest.get("speakerUri")
if not agent_speaker_uri:
logger.error(f"Manifest missing speakerUri: {agent_service_url}")
return None
# Step 2: Invite the agent
success = await self.invite_agent(
agent_speaker_uri,
agent_service_url,
conversation_id,
timeout
)
if not success:
logger.error(f"Failed to invite agent: {agent_speaker_uri}")
return None
# Update agent info with manifest
agent_info = self.agents[agent_speaker_uri]
agent_info.manifest = manifest
# Add to session participants if provided
if session_participants is not None:
session_participants[agent_speaker_uri] = agent_info
logger.info(f"Added {agent_speaker_uri} to session participants")
logger.info(f"β Successfully added agent {agent_speaker_uri} to conversation {conversation_id}")
return agent_info
except Exception as e:
logger.error(f"β Error adding agent: {e}")
return None
def get_agent_info(self, agent_speaker_uri: str) -> Optional[AgentInfo]:
"""Get information about an agent"""
return self.agents.get(agent_speaker_uri)
def list_agents(self) -> Dict[str, AgentInfo]:
"""List all known agents"""
return self.agents.copy()
def remove_agent(self, agent_speaker_uri: str) -> bool:
"""Remove an agent from the manager"""
if agent_speaker_uri in self.agents:
del self.agents[agent_speaker_uri]
logger.info(f"Removed agent: {agent_speaker_uri}")
return True
return False
|