Spaces:
Sleeping
Sleeping
| """ | |
| Agent Manager for OpenFloor Protocol | |
| Handles agent discovery, invitation, and manifest management | |
| """ | |
| import requests | |
| import logging | |
| from typing import Dict, Optional, Any | |
| from dataclasses import dataclass | |
| from datetime import datetime | |
| from .protocol.envelope import create_envelope, create_inter_agent_message | |
| from .protocol.events import create_invite_event, create_get_manifest_event | |
| from .utils.helpers import generate_message_id | |
| from .utils.config import settings | |
| logger = logging.getLogger(__name__) | |
| class AgentInfo: | |
| """Information about an agent""" | |
| speaker_uri: str | |
| service_url: str | |
| manifest: Optional[Dict[str, Any]] = None | |
| conversation_id: Optional[str] = None | |
| invited_at: Optional[datetime] = None | |
| def to_dict(self) -> Dict[str, Any]: | |
| return { | |
| "speaker_uri": self.speaker_uri, | |
| "service_url": self.service_url, | |
| "manifest": self.manifest, | |
| "conversation_id": self.conversation_id, | |
| "invited_at": self.invited_at.isoformat() if self.invited_at else None | |
| } | |
| class AgentManager: | |
| """ | |
| Manages agent discovery and invitation for OpenFloor Protocol | |
| Based on OFP 1.0.0 specification: | |
| - getManifests event: Discover agent capabilities | |
| - invite event: Invite agents to join conversations | |
| """ | |
| def __init__(self, floor_manager_uri: str, floor_manager_url: str): | |
| """ | |
| Initialize Agent Manager | |
| Args: | |
| floor_manager_uri: Speaker URI of the floor manager | |
| floor_manager_url: Service URL of the floor manager | |
| """ | |
| self.floor_manager_uri = floor_manager_uri | |
| self.floor_manager_url = floor_manager_url | |
| self.agents: Dict[str, AgentInfo] = {} | |
| logger.info(f"Agent Manager initialized: {floor_manager_uri}") | |
| async def get_agent_manifest( | |
| self, | |
| agent_service_url: str, | |
| timeout: int = 10 | |
| ) -> Optional[Dict[str, Any]]: | |
| """ | |
| Get an agent's manifest by sending getManifests event | |
| Per OFP spec: https://openfloor.dev/protocol/specifications/inter-agent-message#h2-117-getmanifests-event | |
| Args: | |
| agent_service_url: URL of the agent's service | |
| timeout: Request timeout in seconds | |
| Returns: | |
| Agent manifest dictionary or None if failed | |
| """ | |
| try: | |
| logger.info(f"Requesting manifest from agent: {agent_service_url}") | |
| # Create getManifests event envelope | |
| conversation_id = f"manifest_request_{generate_message_id()}" | |
| # Create OFP envelope with getManifests event | |
| envelope_dict = { | |
| "openFloor": { | |
| "schema": { | |
| "version": settings.OFP_VERSION | |
| }, | |
| "conversation": { | |
| "id": conversation_id | |
| }, | |
| "sender": { | |
| "speakerUri": self.floor_manager_uri, | |
| "serviceUrl": self.floor_manager_url | |
| }, | |
| "events": [ | |
| { | |
| "eventType": "getManifests" | |
| } | |
| ] | |
| } | |
| } | |
| # Send HTTPS POST request to agent | |
| response = requests.post( | |
| agent_service_url, | |
| json=envelope_dict, | |
| headers={ | |
| 'Content-Type': 'application/json', | |
| 'User-Agent': f'OFP-FloorManager/{settings.OFP_VERSION}' | |
| }, | |
| timeout=timeout | |
| ) | |
| response.raise_for_status() | |
| # Parse response - should contain publishManifests or publishManifest event | |
| response_data = response.json() | |
| if "openFloor" in response_data: | |
| events = response_data["openFloor"].get("events", []) | |
| for event in events: | |
| event_type = event.get("eventType") | |
| # Handle both publishManifests (plural) and publishManifest (singular) | |
| if event_type in ["publishManifests", "publishManifest"]: | |
| parameters = event.get("parameters", {}) | |
| # Try direct manifest field | |
| manifest = parameters.get("manifest") | |
| if manifest: | |
| logger.info(f"β Received manifest from {agent_service_url}") | |
| return manifest | |
| # Try servicingManifests array (OFP spec format) | |
| servicing_manifests = parameters.get("servicingManifests", []) | |
| if servicing_manifests and len(servicing_manifests) > 0: | |
| # Get the first manifest and extract identification | |
| first_manifest = servicing_manifests[0] | |
| identification = first_manifest.get("identification", {}) | |
| if identification: | |
| logger.info(f"β Received manifest from {agent_service_url}") | |
| # Return the identification as the manifest | |
| return identification | |
| logger.warning(f"No manifest found in response from {agent_service_url}") | |
| return None | |
| except requests.exceptions.Timeout: | |
| logger.error(f"β Timeout getting manifest from {agent_service_url}") | |
| return None | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"β Failed to get manifest from {agent_service_url}: {e}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"β Unexpected error getting manifest: {e}") | |
| return None | |
| async def invite_agent( | |
| self, | |
| agent_speaker_uri: str, | |
| agent_service_url: str, | |
| conversation_id: str, | |
| timeout: int = 10 | |
| ) -> bool: | |
| """ | |
| Invite an agent to join a conversation | |
| Per OFP spec: https://openfloor.dev/protocol/specifications/inter-agent-message#h2-113-invite-event | |
| Args: | |
| agent_speaker_uri: Speaker URI of the agent to invite | |
| agent_service_url: Service URL of the agent | |
| conversation_id: Conversation ID to invite agent to | |
| timeout: Request timeout in seconds | |
| Returns: | |
| True if invite was sent successfully, False otherwise | |
| """ | |
| try: | |
| logger.info(f"Inviting agent {agent_speaker_uri} to conversation {conversation_id}") | |
| # Create OFP envelope with invite event | |
| envelope_dict = { | |
| "openFloor": { | |
| "schema": { | |
| "version": settings.OFP_VERSION | |
| }, | |
| "conversation": { | |
| "id": conversation_id | |
| }, | |
| "sender": { | |
| "speakerUri": self.floor_manager_uri, | |
| "serviceUrl": self.floor_manager_url | |
| }, | |
| "events": [ | |
| { | |
| "eventType": "invite", | |
| "to": { | |
| "serviceUrl": agent_service_url, | |
| "speakerUri": agent_speaker_uri | |
| } | |
| } | |
| ] | |
| } | |
| } | |
| # Send HTTPS POST request to agent | |
| response = requests.post( | |
| agent_service_url, | |
| json=envelope_dict, | |
| headers={ | |
| 'Content-Type': 'application/json', | |
| 'User-Agent': f'OFP-FloorManager/{settings.OFP_VERSION}' | |
| }, | |
| timeout=timeout | |
| ) | |
| response.raise_for_status() | |
| # Store agent info | |
| agent_info = AgentInfo( | |
| speaker_uri=agent_speaker_uri, | |
| service_url=agent_service_url, | |
| conversation_id=conversation_id, | |
| invited_at=datetime.now() | |
| ) | |
| self.agents[agent_speaker_uri] = agent_info | |
| logger.info(f"β Successfully invited agent {agent_speaker_uri}") | |
| return True | |
| except requests.exceptions.Timeout: | |
| logger.error(f"β Timeout inviting agent {agent_speaker_uri}") | |
| return False | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"β Failed to invite agent {agent_speaker_uri}: {e}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"β Unexpected error inviting agent: {e}") | |
| return False | |
| async def add_agent( | |
| self, | |
| agent_service_url: str, | |
| conversation_id: str, | |
| timeout: int = 10, | |
| session_participants: Optional[Dict[str, AgentInfo]] = None | |
| ) -> Optional[AgentInfo]: | |
| """ | |
| Add an agent by getting their manifest and inviting them to a conversation | |
| This is a convenience method that combines: | |
| 1. Getting the agent's manifest (to discover their speaker URI) | |
| 2. Inviting the agent to the conversation | |
| 3. Optionally adding to session participants | |
| Args: | |
| agent_service_url: Service URL of the agent | |
| conversation_id: Conversation ID to invite agent to | |
| timeout: Request timeout in seconds | |
| session_participants: Optional dict to add the agent to | |
| Returns: | |
| AgentInfo object if successful, None otherwise | |
| """ | |
| try: | |
| # Step 1: Get agent's manifest | |
| manifest = await self.get_agent_manifest(agent_service_url, timeout) | |
| if not manifest: | |
| logger.error(f"Failed to get manifest for agent: {agent_service_url}") | |
| return None | |
| # Extract speaker URI from manifest | |
| agent_speaker_uri = manifest.get("speakerUri") | |
| if not agent_speaker_uri: | |
| logger.error(f"Manifest missing speakerUri: {agent_service_url}") | |
| return None | |
| # Step 2: Invite the agent | |
| success = await self.invite_agent( | |
| agent_speaker_uri, | |
| agent_service_url, | |
| conversation_id, | |
| timeout | |
| ) | |
| if not success: | |
| logger.error(f"Failed to invite agent: {agent_speaker_uri}") | |
| return None | |
| # Update agent info with manifest | |
| agent_info = self.agents[agent_speaker_uri] | |
| agent_info.manifest = manifest | |
| # Add to session participants if provided | |
| if session_participants is not None: | |
| session_participants[agent_speaker_uri] = agent_info | |
| logger.info(f"Added {agent_speaker_uri} to session participants") | |
| logger.info(f"β Successfully added agent {agent_speaker_uri} to conversation {conversation_id}") | |
| return agent_info | |
| except Exception as e: | |
| logger.error(f"β Error adding agent: {e}") | |
| return None | |
| def get_agent_info(self, agent_speaker_uri: str) -> Optional[AgentInfo]: | |
| """Get information about an agent""" | |
| return self.agents.get(agent_speaker_uri) | |
| def list_agents(self) -> Dict[str, AgentInfo]: | |
| """List all known agents""" | |
| return self.agents.copy() | |
| def remove_agent(self, agent_speaker_uri: str) -> bool: | |
| """Remove an agent from the manager""" | |
| if agent_speaker_uri in self.agents: | |
| del self.agents[agent_speaker_uri] | |
| logger.info(f"Removed agent: {agent_speaker_uri}") | |
| return True | |
| return False | |