""" Agent Manager for OpenFloor Protocol Handles agent discovery, invitation, and manifest management """ import requests import logging from typing import Dict, Optional, Any from dataclasses import dataclass from datetime import datetime from .protocol.envelope import create_envelope, create_inter_agent_message from .protocol.events import create_invite_event, create_get_manifest_event from .utils.helpers import generate_message_id from .utils.config import settings logger = logging.getLogger(__name__) @dataclass class AgentInfo: """Information about an agent""" speaker_uri: str service_url: str manifest: Optional[Dict[str, Any]] = None conversation_id: Optional[str] = None invited_at: Optional[datetime] = None def to_dict(self) -> Dict[str, Any]: return { "speaker_uri": self.speaker_uri, "service_url": self.service_url, "manifest": self.manifest, "conversation_id": self.conversation_id, "invited_at": self.invited_at.isoformat() if self.invited_at else None } class AgentManager: """ Manages agent discovery and invitation for OpenFloor Protocol Based on OFP 1.0.0 specification: - getManifests event: Discover agent capabilities - invite event: Invite agents to join conversations """ def __init__(self, floor_manager_uri: str, floor_manager_url: str): """ Initialize Agent Manager Args: floor_manager_uri: Speaker URI of the floor manager floor_manager_url: Service URL of the floor manager """ self.floor_manager_uri = floor_manager_uri self.floor_manager_url = floor_manager_url self.agents: Dict[str, AgentInfo] = {} logger.info(f"Agent Manager initialized: {floor_manager_uri}") async def get_agent_manifest( self, agent_service_url: str, timeout: int = 10 ) -> Optional[Dict[str, Any]]: """ Get an agent's manifest by sending getManifests event Per OFP spec: https://openfloor.dev/protocol/specifications/inter-agent-message#h2-117-getmanifests-event Args: agent_service_url: URL of the agent's service timeout: Request timeout in seconds Returns: Agent manifest dictionary or None if failed """ try: logger.info(f"Requesting manifest from agent: {agent_service_url}") # Create getManifests event envelope conversation_id = f"manifest_request_{generate_message_id()}" # Create OFP envelope with getManifests event envelope_dict = { "openFloor": { "schema": { "version": settings.OFP_VERSION }, "conversation": { "id": conversation_id }, "sender": { "speakerUri": self.floor_manager_uri, "serviceUrl": self.floor_manager_url }, "events": [ { "eventType": "getManifests" } ] } } # Send HTTPS POST request to agent response = requests.post( agent_service_url, json=envelope_dict, headers={ 'Content-Type': 'application/json', 'User-Agent': f'OFP-FloorManager/{settings.OFP_VERSION}' }, timeout=timeout ) response.raise_for_status() # Parse response - should contain publishManifests or publishManifest event response_data = response.json() if "openFloor" in response_data: events = response_data["openFloor"].get("events", []) for event in events: event_type = event.get("eventType") # Handle both publishManifests (plural) and publishManifest (singular) if event_type in ["publishManifests", "publishManifest"]: parameters = event.get("parameters", {}) # Try direct manifest field manifest = parameters.get("manifest") if manifest: logger.info(f"✓ Received manifest from {agent_service_url}") return manifest # Try servicingManifests array (OFP spec format) servicing_manifests = parameters.get("servicingManifests", []) if servicing_manifests and len(servicing_manifests) > 0: # Get the first manifest and extract identification first_manifest = servicing_manifests[0] identification = first_manifest.get("identification", {}) if identification: logger.info(f"✓ Received manifest from {agent_service_url}") # Return the identification as the manifest return identification logger.warning(f"No manifest found in response from {agent_service_url}") return None except requests.exceptions.Timeout: logger.error(f"✗ Timeout getting manifest from {agent_service_url}") return None except requests.exceptions.RequestException as e: logger.error(f"✗ Failed to get manifest from {agent_service_url}: {e}") return None except Exception as e: logger.error(f"✗ Unexpected error getting manifest: {e}") return None async def invite_agent( self, agent_speaker_uri: str, agent_service_url: str, conversation_id: str, timeout: int = 10 ) -> bool: """ Invite an agent to join a conversation Per OFP spec: https://openfloor.dev/protocol/specifications/inter-agent-message#h2-113-invite-event Args: agent_speaker_uri: Speaker URI of the agent to invite agent_service_url: Service URL of the agent conversation_id: Conversation ID to invite agent to timeout: Request timeout in seconds Returns: True if invite was sent successfully, False otherwise """ try: logger.info(f"Inviting agent {agent_speaker_uri} to conversation {conversation_id}") # Create OFP envelope with invite event envelope_dict = { "openFloor": { "schema": { "version": settings.OFP_VERSION }, "conversation": { "id": conversation_id }, "sender": { "speakerUri": self.floor_manager_uri, "serviceUrl": self.floor_manager_url }, "events": [ { "eventType": "invite", "to": { "serviceUrl": agent_service_url, "speakerUri": agent_speaker_uri } } ] } } # Send HTTPS POST request to agent response = requests.post( agent_service_url, json=envelope_dict, headers={ 'Content-Type': 'application/json', 'User-Agent': f'OFP-FloorManager/{settings.OFP_VERSION}' }, timeout=timeout ) response.raise_for_status() # Store agent info agent_info = AgentInfo( speaker_uri=agent_speaker_uri, service_url=agent_service_url, conversation_id=conversation_id, invited_at=datetime.now() ) self.agents[agent_speaker_uri] = agent_info logger.info(f"✓ Successfully invited agent {agent_speaker_uri}") return True except requests.exceptions.Timeout: logger.error(f"✗ Timeout inviting agent {agent_speaker_uri}") return False except requests.exceptions.RequestException as e: logger.error(f"✗ Failed to invite agent {agent_speaker_uri}: {e}") return False except Exception as e: logger.error(f"✗ Unexpected error inviting agent: {e}") return False async def add_agent( self, agent_service_url: str, conversation_id: str, timeout: int = 10, session_participants: Optional[Dict[str, AgentInfo]] = None ) -> Optional[AgentInfo]: """ Add an agent by getting their manifest and inviting them to a conversation This is a convenience method that combines: 1. Getting the agent's manifest (to discover their speaker URI) 2. Inviting the agent to the conversation 3. Optionally adding to session participants Args: agent_service_url: Service URL of the agent conversation_id: Conversation ID to invite agent to timeout: Request timeout in seconds session_participants: Optional dict to add the agent to Returns: AgentInfo object if successful, None otherwise """ try: # Step 1: Get agent's manifest manifest = await self.get_agent_manifest(agent_service_url, timeout) if not manifest: logger.error(f"Failed to get manifest for agent: {agent_service_url}") return None # Extract speaker URI from manifest agent_speaker_uri = manifest.get("speakerUri") if not agent_speaker_uri: logger.error(f"Manifest missing speakerUri: {agent_service_url}") return None # Step 2: Invite the agent success = await self.invite_agent( agent_speaker_uri, agent_service_url, conversation_id, timeout ) if not success: logger.error(f"Failed to invite agent: {agent_speaker_uri}") return None # Update agent info with manifest agent_info = self.agents[agent_speaker_uri] agent_info.manifest = manifest # Add to session participants if provided if session_participants is not None: session_participants[agent_speaker_uri] = agent_info logger.info(f"Added {agent_speaker_uri} to session participants") logger.info(f"✓ Successfully added agent {agent_speaker_uri} to conversation {conversation_id}") return agent_info except Exception as e: logger.error(f"✗ Error adding agent: {e}") return None def get_agent_info(self, agent_speaker_uri: str) -> Optional[AgentInfo]: """Get information about an agent""" return self.agents.get(agent_speaker_uri) def list_agents(self) -> Dict[str, AgentInfo]: """List all known agents""" return self.agents.copy() def remove_agent(self, agent_speaker_uri: str) -> bool: """Remove an agent from the manager""" if agent_speaker_uri in self.agents: del self.agents[agent_speaker_uri] logger.info(f"Removed agent: {agent_speaker_uri}") return True return False