Spaces:
Sleeping
Sleeping
| """ | |
| Floor Manager - Handles conversation sessions and envelope broadcasting | |
| """ | |
| import requests | |
| import logging | |
| from typing import Dict, Optional, List, Any | |
| from datetime import datetime | |
| from dataclasses import dataclass | |
| from .agent_manager import AgentManager, AgentInfo | |
| from .utils.helpers import generate_message_id | |
| from .utils.config import settings | |
| logger = logging.getLogger(__name__) | |
| class FloorSession: | |
| """Represents an active floor conversation session""" | |
| session_id: str | |
| floor_manager_uri: str | |
| floor_manager_url: str | |
| participants: Dict[str, AgentInfo] # speaker_uri -> AgentInfo | |
| created_at: datetime | |
| floor_holder: Optional[str] = None # speaker_uri of current floor holder | |
| def add_participant(self, agent_info: AgentInfo): | |
| """Add a participant to the session""" | |
| self.participants[agent_info.speaker_uri] = agent_info | |
| def remove_participant(self, speaker_uri: str): | |
| """Remove a participant from the session""" | |
| if speaker_uri in self.participants: | |
| del self.participants[speaker_uri] | |
| if self.floor_holder == speaker_uri: | |
| self.floor_holder = None | |
| def get_participant_urls(self, exclude_speaker: Optional[str] = None) -> List[str]: | |
| """ | |
| Get list of all participant service URLs | |
| Args: | |
| exclude_speaker: Optional speaker URI to exclude (e.g., the sender) | |
| Returns: | |
| List of service URLs | |
| """ | |
| urls = [] | |
| for speaker_uri, agent_info in self.participants.items(): | |
| if exclude_speaker and speaker_uri == exclude_speaker: | |
| continue | |
| if agent_info.service_url: | |
| urls.append(agent_info.service_url) | |
| return urls | |
| class FloorManager: | |
| """ | |
| Floor Manager for OpenFloor Protocol | |
| Responsibilities: | |
| - Manage floor sessions | |
| - Handle agent discovery and invitation | |
| - Broadcast envelopes to all participants | |
| - Manage floor control (grant/revoke) | |
| """ | |
| def __init__(self): | |
| """Initialize Floor Manager""" | |
| self.sessions: Dict[str, FloorSession] = {} | |
| logger.info("Floor Manager initialized") | |
| def _log_envelope_details(self, envelope_dict: Dict[str, Any], direction: str = "📤"): | |
| """ | |
| Log detailed envelope information in a structured format | |
| Args: | |
| envelope_dict: The OFP envelope dictionary | |
| direction: Direction indicator (📤 for outgoing, 📥 for incoming, 📨 for received) | |
| """ | |
| import json | |
| try: | |
| openfloor = envelope_dict.get("openFloor", {}) | |
| schema = openfloor.get("schema", {}) | |
| conversation = openfloor.get("conversation", {}) | |
| sender = openfloor.get("sender", {}) | |
| events = openfloor.get("events", []) | |
| logger.info(f"{direction} ENVELOPE COMMUNICATION") | |
| logger.info(f" Schema Version: {schema.get('version', 'N/A')}") | |
| logger.info(f" Conversation ID: {conversation.get('id', 'N/A')}") | |
| logger.info(f" Sender URI: {sender.get('speakerUri', 'N/A')}") | |
| logger.info(f" Sender Service URL: {sender.get('serviceUrl', 'N/A')}") | |
| logger.info(f" Number of Events: {len(events)}") | |
| for idx, event in enumerate(events, 1): | |
| event_type = event.get("eventType", "unknown") | |
| to_info = event.get("to", {}) | |
| logger.info(f" Event #{idx}:") | |
| logger.info(f" Type: {event_type}") | |
| if to_info: | |
| target_uri = to_info.get("speakerUri", "all") | |
| is_private = to_info.get("private", False) | |
| logger.info(f" To: {target_uri}") | |
| logger.info(f" Private: {is_private}") | |
| # Log utterance content with detailed dialog event info | |
| if event_type == "utterance": | |
| params = event.get("parameters", {}) | |
| dialog_event = params.get("dialogEvent", {}) | |
| if dialog_event: | |
| de_id = dialog_event.get("id", "N/A") | |
| de_speaker = dialog_event.get("speakerUri", "N/A") | |
| span = dialog_event.get("span", {}) | |
| start_time = span.get("startTime", "N/A") | |
| logger.info(f" Dialog Event ID: {de_id}") | |
| logger.info(f" Dialog Event Speaker: {de_speaker}") | |
| logger.info(f" Start Time: {start_time}") | |
| features = dialog_event.get("features", {}) | |
| text_feature = features.get("text", {}) | |
| mime_type = text_feature.get("mimeType", "N/A") | |
| tokens = text_feature.get("tokens", []) | |
| logger.info(f" Text MIME Type: {mime_type}") | |
| if tokens: | |
| for token_idx, token in enumerate(tokens): | |
| value = token.get("value", "") | |
| confidence = token.get("confidence", 1.0) | |
| logger.info(f" Token #{token_idx + 1}:") | |
| logger.info(f" Value: {value}") | |
| logger.info(f" Confidence: {confidence}") | |
| # Log full JSON structure | |
| logger.debug(f"{direction} COMPLETE ENVELOPE JSON:") | |
| logger.debug(json.dumps(envelope_dict, indent=4)) | |
| except Exception as e: | |
| logger.error(f"Error logging envelope details: {e}") | |
| def create_session( | |
| self, | |
| session_id: str, | |
| floor_manager_uri: str, | |
| floor_manager_url: str | |
| ) -> FloorSession: | |
| """ | |
| Create a new floor session | |
| Args: | |
| session_id: Unique session identifier | |
| floor_manager_uri: Speaker URI of the floor manager | |
| floor_manager_url: Service URL of the floor manager | |
| Returns: | |
| FloorSession object | |
| """ | |
| session = FloorSession( | |
| session_id=session_id, | |
| floor_manager_uri=floor_manager_uri, | |
| floor_manager_url=floor_manager_url, | |
| participants={}, | |
| created_at=datetime.now() | |
| ) | |
| self.sessions[session_id] = session | |
| logger.info(f"Created floor session: {session_id}") | |
| return session | |
| def get_session(self, session_id: str) -> Optional[FloorSession]: | |
| """Get a session by ID""" | |
| return self.sessions.get(session_id) | |
| def get_agent_manager(self, session_id: str) -> Optional[AgentManager]: | |
| """ | |
| Get an AgentManager for a specific session | |
| Args: | |
| session_id: Session identifier | |
| Returns: | |
| AgentManager instance or None | |
| """ | |
| session = self.get_session(session_id) | |
| if not session: | |
| return None | |
| return AgentManager( | |
| floor_manager_uri=session.floor_manager_uri, | |
| floor_manager_url=session.floor_manager_url | |
| ) | |
| async def broadcast_envelope( | |
| self, | |
| session_id: str, | |
| envelope_dict: Dict[str, Any], | |
| sender_uri: Optional[str] = None, | |
| timeout: int = 10 | |
| ) -> Dict[str, bool]: | |
| """ | |
| Broadcast an envelope to all participants in a session | |
| Per OFP specification, when an agent sends a message to the floor, | |
| the floor manager should forward it to all other participants. | |
| Args: | |
| session_id: Session identifier | |
| envelope_dict: OFP envelope dictionary to broadcast | |
| sender_uri: Speaker URI of the sender (to exclude from broadcast) | |
| timeout: Request timeout in seconds | |
| Returns: | |
| Dict mapping participant URIs to success status | |
| """ | |
| session = self.get_session(session_id) | |
| if not session: | |
| logger.error(f"Session not found: {session_id}") | |
| return {} | |
| results = {} | |
| participant_urls = session.get_participant_urls(exclude_speaker=sender_uri) | |
| if not participant_urls: | |
| logger.info(f"No participants to broadcast to in session {session_id}") | |
| return results | |
| logger.info(f"Broadcasting envelope to {len(participant_urls)} participants") | |
| # ENHANCED: Log the outgoing envelope we're about to send | |
| import json | |
| print(f"\n{'='*80}") | |
| print(f"📤 OUTGOING ENVELOPE TO BROADCAST:") | |
| print(f"{'='*80}") | |
| print(json.dumps(envelope_dict, indent=2)) | |
| print(f"{'='*80}\n") | |
| logger.info(f"📤 OUTGOING ENVELOPE:") | |
| logger.info(json.dumps(envelope_dict, indent=2)) | |
| for speaker_uri, agent_info in session.participants.items(): | |
| # Skip the sender | |
| if sender_uri and speaker_uri == sender_uri: | |
| continue | |
| if not agent_info.service_url: | |
| logger.warning(f"No service URL for participant: {speaker_uri}") | |
| results[speaker_uri] = False | |
| continue | |
| try: | |
| logger.info(f"📤 Sending envelope to {speaker_uri} at {agent_info.service_url}") | |
| logger.debug(f"📤 Envelope content: {envelope_dict}") | |
| response = requests.post( | |
| agent_info.service_url, | |
| json=envelope_dict, | |
| headers={ | |
| 'Content-Type': 'application/json', | |
| 'User-Agent': f'OFP-FloorManager/{settings.OFP_VERSION}' | |
| }, | |
| timeout=timeout | |
| ) | |
| response.raise_for_status() | |
| results[speaker_uri] = True | |
| # Log the response from the agent | |
| logger.info(f"✓ Envelope delivered to {speaker_uri} - Status: {response.status_code}") | |
| # Try to parse and log agent's response | |
| try: | |
| response_data = response.json() | |
| # ENHANCED: Log complete raw response first | |
| import json | |
| print(f"\n{'='*80}") | |
| print(f"📥 AGENT RESPONSE FROM: {speaker_uri}") | |
| print(f"{'='*80}") | |
| # Print AND log the RAW JSON response for debugging | |
| print(f"📥 RAW RESPONSE JSON:") | |
| print(json.dumps(response_data, indent=2)) | |
| print(f"-" * 80) | |
| logger.info(f"📥 Response from {speaker_uri}") | |
| logger.info(f"📥 RAW RESPONSE JSON:") | |
| logger.info(json.dumps(response_data, indent=2)) | |
| # Use the enhanced logging method | |
| self._log_envelope_details(response_data, direction="📥 INCOMING") | |
| # Check if agent sent any events back | |
| if "openFloor" in response_data: | |
| response_envelope = response_data.get("openFloor", {}) | |
| response_sender = response_envelope.get("sender", {}) | |
| events = response_envelope.get("events", []) | |
| print(f"Response Sender: {response_sender.get('speakerUri')}") | |
| print(f"Number of Events: {len(events)}") | |
| print(f"-" * 80) | |
| if events: | |
| logger.info(f"🔔 Agent {speaker_uri} sent {len(events)} event(s) in response") | |
| for idx, event in enumerate(events, 1): | |
| print(f"\n📋 Event #{idx}:") | |
| event_type = event.get("eventType") | |
| to_info = event.get("to", {}) | |
| is_private = to_info.get("private", False) | |
| target_uri = to_info.get("speakerUri") | |
| print(f" Type: {event_type}") | |
| if to_info: | |
| print(f" To: {target_uri or 'all'}") | |
| print(f" Private: {is_private}") | |
| logger.info(f" - Event type: {event_type}") | |
| if is_private and target_uri: | |
| logger.info(f" - Private message to: {target_uri}") | |
| # Extract utterance content if present | |
| if event_type == "utterance": | |
| params = event.get("parameters", {}) | |
| dialog_event = params.get("dialogEvent", {}) | |
| features = dialog_event.get("features", {}) | |
| # Extract text feature | |
| text_feature = features.get("text", {}) | |
| tokens = text_feature.get("tokens", []) | |
| text_content = None | |
| if tokens: | |
| # Try both "value" and "token" fields (different agents use different formats) | |
| text_content = " ".join([ | |
| t.get("value", t.get("token", "")) for t in tokens | |
| ]) | |
| if is_private: | |
| logger.info(f" 🔒 Private message: {text_content}") | |
| else: | |
| logger.info(f" 💬 Agent says: {text_content}") | |
| # Extract HTML feature if present | |
| html_feature = features.get("html", {}) | |
| html_tokens = html_feature.get("tokens", []) | |
| html_content = None | |
| if html_tokens: | |
| html_content = "".join([ | |
| t.get("value", "") for t in html_tokens | |
| ]) | |
| logger.info(f" 🌐 Agent included HTML content ({len(html_content)} chars)") | |
| # Store response for retrieval (include private messages and HTML) | |
| if text_content or html_content: | |
| if not hasattr(self, '_agent_responses'): | |
| self._agent_responses = {} | |
| if session_id not in self._agent_responses: | |
| self._agent_responses[session_id] = [] | |
| agent_name = agent_info.manifest.get("conversationalName", speaker_uri) if agent_info.manifest else speaker_uri | |
| if is_private and target_uri: | |
| agent_name = f"{agent_name} (private to {target_uri})" | |
| self._agent_responses[session_id].append({ | |
| "agent_uri": speaker_uri, | |
| "agent_name": agent_name, | |
| "message": text_content or "", | |
| "html_content": html_content, | |
| "timestamp": datetime.now(), | |
| "is_private": is_private, | |
| "target_uri": target_uri | |
| }) | |
| except ValueError: | |
| logger.debug(f"Response from {speaker_uri} is not JSON: {response.text[:200]}") | |
| except requests.exceptions.Timeout: | |
| logger.error(f"✗ Timeout sending to {speaker_uri}") | |
| results[speaker_uri] = False | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"✗ Failed to send to {speaker_uri}: {e}") | |
| results[speaker_uri] = False | |
| except Exception as e: | |
| logger.error(f"✗ Unexpected error sending to {speaker_uri}: {e}") | |
| results[speaker_uri] = False | |
| successful = sum(1 for success in results.values() if success) | |
| logger.info(f"Broadcast complete: {successful}/{len(results)} successful") | |
| return results | |
| async def handle_incoming_envelope( | |
| self, | |
| session_id: str, | |
| envelope_dict: Dict[str, Any] | |
| ) -> bool: | |
| """ | |
| Handle an incoming envelope and broadcast to participants | |
| This is the main entry point for envelopes received by the floor. | |
| The floor manager will: | |
| 1. Validate the envelope | |
| 2. Process any floor control events | |
| 3. Broadcast to all other participants | |
| Args: | |
| session_id: Session identifier | |
| envelope_dict: OFP envelope dictionary | |
| Returns: | |
| True if handled successfully | |
| """ | |
| try: | |
| # Extract sender information | |
| sender_info = envelope_dict.get("openFloor", {}).get("sender", {}) | |
| sender_uri = sender_info.get("speakerUri") | |
| if not sender_uri: | |
| logger.error("Envelope missing sender speakerUri") | |
| return False | |
| logger.info(f"Received envelope from {sender_uri} for session {session_id}") | |
| # Process floor control events (grant, revoke, request, etc.) | |
| await self._process_floor_events(session_id, envelope_dict) | |
| # Broadcast to all participants except sender | |
| await self.broadcast_envelope( | |
| session_id=session_id, | |
| envelope_dict=envelope_dict, | |
| sender_uri=sender_uri | |
| ) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error handling envelope: {e}") | |
| return False | |
| async def _process_floor_events( | |
| self, | |
| session_id: str, | |
| envelope_dict: Dict[str, Any] | |
| ): | |
| """ | |
| Process floor control events (grantFloor, revokeFloor, requestFloor, etc.) | |
| Args: | |
| session_id: Session identifier | |
| envelope_dict: OFP envelope dictionary | |
| """ | |
| session = self.get_session(session_id) | |
| if not session: | |
| return | |
| events = envelope_dict.get("openFloor", {}).get("events", []) | |
| sender_uri = envelope_dict.get("openFloor", {}).get("sender", {}).get("speakerUri") | |
| for event in events: | |
| event_type = event.get("eventType") | |
| if event_type == "requestFloor": | |
| logger.info(f"Floor requested by {sender_uri}") | |
| # In a full implementation, this would trigger convener logic | |
| elif event_type == "grantFloor": | |
| to_uri = event.get("to", {}).get("speakerUri") | |
| if to_uri: | |
| session.floor_holder = to_uri | |
| logger.info(f"Floor granted to {to_uri}") | |
| elif event_type == "revokeFloor": | |
| logger.info(f"Floor revoked from {session.floor_holder}") | |
| session.floor_holder = None | |
| elif event_type == "yieldFloor": | |
| if sender_uri == session.floor_holder: | |
| session.floor_holder = None | |
| logger.info(f"Floor yielded by {sender_uri}") | |
| async def send_floor_notification( | |
| self, | |
| session_id: str, | |
| event_type: str, | |
| target_uri: Optional[str] = None, | |
| reason: Optional[str] = None | |
| ) -> bool: | |
| """ | |
| Send a floor control notification to all participants | |
| Args: | |
| session_id: Session identifier | |
| event_type: Type of floor event (grantFloor, revokeFloor, etc.) | |
| target_uri: Optional target speaker URI | |
| reason: Optional reason for the event | |
| Returns: | |
| True if sent successfully | |
| """ | |
| session = self.get_session(session_id) | |
| if not session: | |
| return False | |
| # Create floor notification envelope | |
| event = { | |
| "eventType": event_type | |
| } | |
| if target_uri: | |
| event["to"] = { | |
| "speakerUri": target_uri | |
| } | |
| if reason: | |
| event["reason"] = reason | |
| envelope_dict = { | |
| "openFloor": { | |
| "schema": { | |
| "version": settings.OFP_VERSION | |
| }, | |
| "conversation": { | |
| "id": session_id | |
| }, | |
| "sender": { | |
| "speakerUri": session.floor_manager_uri, | |
| "serviceUrl": session.floor_manager_url | |
| }, | |
| "events": [event] | |
| } | |
| } | |
| # Broadcast to all participants | |
| results = await self.broadcast_envelope( | |
| session_id=session_id, | |
| envelope_dict=envelope_dict | |
| ) | |
| return any(results.values()) | |
| def get_agent_responses(self, session_id: str) -> List[Dict[str, Any]]: | |
| """ | |
| Get agent responses for a session | |
| Args: | |
| session_id: Session identifier | |
| Returns: | |
| List of agent response dictionaries | |
| """ | |
| if not hasattr(self, '_agent_responses'): | |
| return [] | |
| return self._agent_responses.get(session_id, []) | |
| def clear_agent_responses(self, session_id: str): | |
| """Clear agent responses for a session""" | |
| if hasattr(self, '_agent_responses') and session_id in self._agent_responses: | |
| self._agent_responses[session_id] = [] | |