""" Floor Manager - Handles conversation sessions and envelope broadcasting """ import requests import logging from typing import Dict, Optional, List, Any from datetime import datetime from dataclasses import dataclass from .agent_manager import AgentManager, AgentInfo from .utils.helpers import generate_message_id from .utils.config import settings logger = logging.getLogger(__name__) @dataclass class FloorSession: """Represents an active floor conversation session""" session_id: str floor_manager_uri: str floor_manager_url: str participants: Dict[str, AgentInfo] # speaker_uri -> AgentInfo created_at: datetime floor_holder: Optional[str] = None # speaker_uri of current floor holder def add_participant(self, agent_info: AgentInfo): """Add a participant to the session""" self.participants[agent_info.speaker_uri] = agent_info def remove_participant(self, speaker_uri: str): """Remove a participant from the session""" if speaker_uri in self.participants: del self.participants[speaker_uri] if self.floor_holder == speaker_uri: self.floor_holder = None def get_participant_urls(self, exclude_speaker: Optional[str] = None) -> List[str]: """ Get list of all participant service URLs Args: exclude_speaker: Optional speaker URI to exclude (e.g., the sender) Returns: List of service URLs """ urls = [] for speaker_uri, agent_info in self.participants.items(): if exclude_speaker and speaker_uri == exclude_speaker: continue if agent_info.service_url: urls.append(agent_info.service_url) return urls class FloorManager: """ Floor Manager for OpenFloor Protocol Responsibilities: - Manage floor sessions - Handle agent discovery and invitation - Broadcast envelopes to all participants - Manage floor control (grant/revoke) """ def __init__(self): """Initialize Floor Manager""" self.sessions: Dict[str, FloorSession] = {} logger.info("Floor Manager initialized") def _log_envelope_details(self, envelope_dict: Dict[str, Any], direction: str = "šŸ“¤"): """ Log detailed envelope information in a structured format Args: envelope_dict: The OFP envelope dictionary direction: Direction indicator (šŸ“¤ for outgoing, šŸ“„ for incoming, šŸ“Ø for received) """ import json try: openfloor = envelope_dict.get("openFloor", {}) schema = openfloor.get("schema", {}) conversation = openfloor.get("conversation", {}) sender = openfloor.get("sender", {}) events = openfloor.get("events", []) logger.info(f"{direction} ENVELOPE COMMUNICATION") logger.info(f" Schema Version: {schema.get('version', 'N/A')}") logger.info(f" Conversation ID: {conversation.get('id', 'N/A')}") logger.info(f" Sender URI: {sender.get('speakerUri', 'N/A')}") logger.info(f" Sender Service URL: {sender.get('serviceUrl', 'N/A')}") logger.info(f" Number of Events: {len(events)}") for idx, event in enumerate(events, 1): event_type = event.get("eventType", "unknown") to_info = event.get("to", {}) logger.info(f" Event #{idx}:") logger.info(f" Type: {event_type}") if to_info: target_uri = to_info.get("speakerUri", "all") is_private = to_info.get("private", False) logger.info(f" To: {target_uri}") logger.info(f" Private: {is_private}") # Log utterance content with detailed dialog event info if event_type == "utterance": params = event.get("parameters", {}) dialog_event = params.get("dialogEvent", {}) if dialog_event: de_id = dialog_event.get("id", "N/A") de_speaker = dialog_event.get("speakerUri", "N/A") span = dialog_event.get("span", {}) start_time = span.get("startTime", "N/A") logger.info(f" Dialog Event ID: {de_id}") logger.info(f" Dialog Event Speaker: {de_speaker}") logger.info(f" Start Time: {start_time}") features = dialog_event.get("features", {}) text_feature = features.get("text", {}) mime_type = text_feature.get("mimeType", "N/A") tokens = text_feature.get("tokens", []) logger.info(f" Text MIME Type: {mime_type}") if tokens: for token_idx, token in enumerate(tokens): value = token.get("value", "") confidence = token.get("confidence", 1.0) logger.info(f" Token #{token_idx + 1}:") logger.info(f" Value: {value}") logger.info(f" Confidence: {confidence}") # Log full JSON structure logger.debug(f"{direction} COMPLETE ENVELOPE JSON:") logger.debug(json.dumps(envelope_dict, indent=4)) except Exception as e: logger.error(f"Error logging envelope details: {e}") def create_session( self, session_id: str, floor_manager_uri: str, floor_manager_url: str ) -> FloorSession: """ Create a new floor session Args: session_id: Unique session identifier floor_manager_uri: Speaker URI of the floor manager floor_manager_url: Service URL of the floor manager Returns: FloorSession object """ session = FloorSession( session_id=session_id, floor_manager_uri=floor_manager_uri, floor_manager_url=floor_manager_url, participants={}, created_at=datetime.now() ) self.sessions[session_id] = session logger.info(f"Created floor session: {session_id}") return session def get_session(self, session_id: str) -> Optional[FloorSession]: """Get a session by ID""" return self.sessions.get(session_id) def get_agent_manager(self, session_id: str) -> Optional[AgentManager]: """ Get an AgentManager for a specific session Args: session_id: Session identifier Returns: AgentManager instance or None """ session = self.get_session(session_id) if not session: return None return AgentManager( floor_manager_uri=session.floor_manager_uri, floor_manager_url=session.floor_manager_url ) async def broadcast_envelope( self, session_id: str, envelope_dict: Dict[str, Any], sender_uri: Optional[str] = None, timeout: int = 10 ) -> Dict[str, bool]: """ Broadcast an envelope to all participants in a session Per OFP specification, when an agent sends a message to the floor, the floor manager should forward it to all other participants. Args: session_id: Session identifier envelope_dict: OFP envelope dictionary to broadcast sender_uri: Speaker URI of the sender (to exclude from broadcast) timeout: Request timeout in seconds Returns: Dict mapping participant URIs to success status """ session = self.get_session(session_id) if not session: logger.error(f"Session not found: {session_id}") return {} results = {} participant_urls = session.get_participant_urls(exclude_speaker=sender_uri) if not participant_urls: logger.info(f"No participants to broadcast to in session {session_id}") return results logger.info(f"Broadcasting envelope to {len(participant_urls)} participants") # ENHANCED: Log the outgoing envelope we're about to send import json print(f"\n{'='*80}") print(f"šŸ“¤ OUTGOING ENVELOPE TO BROADCAST:") print(f"{'='*80}") print(json.dumps(envelope_dict, indent=2)) print(f"{'='*80}\n") logger.info(f"šŸ“¤ OUTGOING ENVELOPE:") logger.info(json.dumps(envelope_dict, indent=2)) for speaker_uri, agent_info in session.participants.items(): # Skip the sender if sender_uri and speaker_uri == sender_uri: continue if not agent_info.service_url: logger.warning(f"No service URL for participant: {speaker_uri}") results[speaker_uri] = False continue try: logger.info(f"šŸ“¤ Sending envelope to {speaker_uri} at {agent_info.service_url}") logger.debug(f"šŸ“¤ Envelope content: {envelope_dict}") response = requests.post( agent_info.service_url, json=envelope_dict, headers={ 'Content-Type': 'application/json', 'User-Agent': f'OFP-FloorManager/{settings.OFP_VERSION}' }, timeout=timeout ) response.raise_for_status() results[speaker_uri] = True # Log the response from the agent logger.info(f"āœ“ Envelope delivered to {speaker_uri} - Status: {response.status_code}") # Try to parse and log agent's response try: response_data = response.json() # ENHANCED: Log complete raw response first import json print(f"\n{'='*80}") print(f"šŸ“„ AGENT RESPONSE FROM: {speaker_uri}") print(f"{'='*80}") # Print AND log the RAW JSON response for debugging print(f"šŸ“„ RAW RESPONSE JSON:") print(json.dumps(response_data, indent=2)) print(f"-" * 80) logger.info(f"šŸ“„ Response from {speaker_uri}") logger.info(f"šŸ“„ RAW RESPONSE JSON:") logger.info(json.dumps(response_data, indent=2)) # Use the enhanced logging method self._log_envelope_details(response_data, direction="šŸ“„ INCOMING") # Check if agent sent any events back if "openFloor" in response_data: response_envelope = response_data.get("openFloor", {}) response_sender = response_envelope.get("sender", {}) events = response_envelope.get("events", []) print(f"Response Sender: {response_sender.get('speakerUri')}") print(f"Number of Events: {len(events)}") print(f"-" * 80) if events: logger.info(f"šŸ”” Agent {speaker_uri} sent {len(events)} event(s) in response") for idx, event in enumerate(events, 1): print(f"\nšŸ“‹ Event #{idx}:") event_type = event.get("eventType") to_info = event.get("to", {}) is_private = to_info.get("private", False) target_uri = to_info.get("speakerUri") print(f" Type: {event_type}") if to_info: print(f" To: {target_uri or 'all'}") print(f" Private: {is_private}") logger.info(f" - Event type: {event_type}") if is_private and target_uri: logger.info(f" - Private message to: {target_uri}") # Extract utterance content if present if event_type == "utterance": params = event.get("parameters", {}) dialog_event = params.get("dialogEvent", {}) features = dialog_event.get("features", {}) # Extract text feature text_feature = features.get("text", {}) tokens = text_feature.get("tokens", []) text_content = None if tokens: # Try both "value" and "token" fields (different agents use different formats) text_content = " ".join([ t.get("value", t.get("token", "")) for t in tokens ]) if is_private: logger.info(f" šŸ”’ Private message: {text_content}") else: logger.info(f" šŸ’¬ Agent says: {text_content}") # Extract HTML feature if present html_feature = features.get("html", {}) html_tokens = html_feature.get("tokens", []) html_content = None if html_tokens: html_content = "".join([ t.get("value", "") for t in html_tokens ]) logger.info(f" 🌐 Agent included HTML content ({len(html_content)} chars)") # Store response for retrieval (include private messages and HTML) if text_content or html_content: if not hasattr(self, '_agent_responses'): self._agent_responses = {} if session_id not in self._agent_responses: self._agent_responses[session_id] = [] agent_name = agent_info.manifest.get("conversationalName", speaker_uri) if agent_info.manifest else speaker_uri if is_private and target_uri: agent_name = f"{agent_name} (private to {target_uri})" self._agent_responses[session_id].append({ "agent_uri": speaker_uri, "agent_name": agent_name, "message": text_content or "", "html_content": html_content, "timestamp": datetime.now(), "is_private": is_private, "target_uri": target_uri }) except ValueError: logger.debug(f"Response from {speaker_uri} is not JSON: {response.text[:200]}") except requests.exceptions.Timeout: logger.error(f"āœ— Timeout sending to {speaker_uri}") results[speaker_uri] = False except requests.exceptions.RequestException as e: logger.error(f"āœ— Failed to send to {speaker_uri}: {e}") results[speaker_uri] = False except Exception as e: logger.error(f"āœ— Unexpected error sending to {speaker_uri}: {e}") results[speaker_uri] = False successful = sum(1 for success in results.values() if success) logger.info(f"Broadcast complete: {successful}/{len(results)} successful") return results async def handle_incoming_envelope( self, session_id: str, envelope_dict: Dict[str, Any] ) -> bool: """ Handle an incoming envelope and broadcast to participants This is the main entry point for envelopes received by the floor. The floor manager will: 1. Validate the envelope 2. Process any floor control events 3. Broadcast to all other participants Args: session_id: Session identifier envelope_dict: OFP envelope dictionary Returns: True if handled successfully """ try: # Extract sender information sender_info = envelope_dict.get("openFloor", {}).get("sender", {}) sender_uri = sender_info.get("speakerUri") if not sender_uri: logger.error("Envelope missing sender speakerUri") return False logger.info(f"Received envelope from {sender_uri} for session {session_id}") # Process floor control events (grant, revoke, request, etc.) await self._process_floor_events(session_id, envelope_dict) # Broadcast to all participants except sender await self.broadcast_envelope( session_id=session_id, envelope_dict=envelope_dict, sender_uri=sender_uri ) return True except Exception as e: logger.error(f"Error handling envelope: {e}") return False async def _process_floor_events( self, session_id: str, envelope_dict: Dict[str, Any] ): """ Process floor control events (grantFloor, revokeFloor, requestFloor, etc.) Args: session_id: Session identifier envelope_dict: OFP envelope dictionary """ session = self.get_session(session_id) if not session: return events = envelope_dict.get("openFloor", {}).get("events", []) sender_uri = envelope_dict.get("openFloor", {}).get("sender", {}).get("speakerUri") for event in events: event_type = event.get("eventType") if event_type == "requestFloor": logger.info(f"Floor requested by {sender_uri}") # In a full implementation, this would trigger convener logic elif event_type == "grantFloor": to_uri = event.get("to", {}).get("speakerUri") if to_uri: session.floor_holder = to_uri logger.info(f"Floor granted to {to_uri}") elif event_type == "revokeFloor": logger.info(f"Floor revoked from {session.floor_holder}") session.floor_holder = None elif event_type == "yieldFloor": if sender_uri == session.floor_holder: session.floor_holder = None logger.info(f"Floor yielded by {sender_uri}") async def send_floor_notification( self, session_id: str, event_type: str, target_uri: Optional[str] = None, reason: Optional[str] = None ) -> bool: """ Send a floor control notification to all participants Args: session_id: Session identifier event_type: Type of floor event (grantFloor, revokeFloor, etc.) target_uri: Optional target speaker URI reason: Optional reason for the event Returns: True if sent successfully """ session = self.get_session(session_id) if not session: return False # Create floor notification envelope event = { "eventType": event_type } if target_uri: event["to"] = { "speakerUri": target_uri } if reason: event["reason"] = reason envelope_dict = { "openFloor": { "schema": { "version": settings.OFP_VERSION }, "conversation": { "id": session_id }, "sender": { "speakerUri": session.floor_manager_uri, "serviceUrl": session.floor_manager_url }, "events": [event] } } # Broadcast to all participants results = await self.broadcast_envelope( session_id=session_id, envelope_dict=envelope_dict ) return any(results.values()) def get_agent_responses(self, session_id: str) -> List[Dict[str, Any]]: """ Get agent responses for a session Args: session_id: Session identifier Returns: List of agent response dictionaries """ if not hasattr(self, '_agent_responses'): return [] return self._agent_responses.get(session_id, []) def clear_agent_responses(self, session_id: str): """Clear agent responses for a session""" if hasattr(self, '_agent_responses') and session_id in self._agent_responses: self._agent_responses[session_id] = []