FloorManager / src /floor_manager.py
BolyosCsaba
html render
aa54aeb
"""
Floor Manager - Handles conversation sessions and envelope broadcasting
"""
import requests
import logging
from typing import Dict, Optional, List, Any
from datetime import datetime
from dataclasses import dataclass
from .agent_manager import AgentManager, AgentInfo
from .utils.helpers import generate_message_id
from .utils.config import settings
logger = logging.getLogger(__name__)
@dataclass
class FloorSession:
"""Represents an active floor conversation session"""
session_id: str
floor_manager_uri: str
floor_manager_url: str
participants: Dict[str, AgentInfo] # speaker_uri -> AgentInfo
created_at: datetime
floor_holder: Optional[str] = None # speaker_uri of current floor holder
def add_participant(self, agent_info: AgentInfo):
"""Add a participant to the session"""
self.participants[agent_info.speaker_uri] = agent_info
def remove_participant(self, speaker_uri: str):
"""Remove a participant from the session"""
if speaker_uri in self.participants:
del self.participants[speaker_uri]
if self.floor_holder == speaker_uri:
self.floor_holder = None
def get_participant_urls(self, exclude_speaker: Optional[str] = None) -> List[str]:
"""
Get list of all participant service URLs
Args:
exclude_speaker: Optional speaker URI to exclude (e.g., the sender)
Returns:
List of service URLs
"""
urls = []
for speaker_uri, agent_info in self.participants.items():
if exclude_speaker and speaker_uri == exclude_speaker:
continue
if agent_info.service_url:
urls.append(agent_info.service_url)
return urls
class FloorManager:
"""
Floor Manager for OpenFloor Protocol
Responsibilities:
- Manage floor sessions
- Handle agent discovery and invitation
- Broadcast envelopes to all participants
- Manage floor control (grant/revoke)
"""
def __init__(self):
"""Initialize Floor Manager"""
self.sessions: Dict[str, FloorSession] = {}
logger.info("Floor Manager initialized")
def _log_envelope_details(self, envelope_dict: Dict[str, Any], direction: str = "📤"):
"""
Log detailed envelope information in a structured format
Args:
envelope_dict: The OFP envelope dictionary
direction: Direction indicator (📤 for outgoing, 📥 for incoming, 📨 for received)
"""
import json
try:
openfloor = envelope_dict.get("openFloor", {})
schema = openfloor.get("schema", {})
conversation = openfloor.get("conversation", {})
sender = openfloor.get("sender", {})
events = openfloor.get("events", [])
logger.info(f"{direction} ENVELOPE COMMUNICATION")
logger.info(f" Schema Version: {schema.get('version', 'N/A')}")
logger.info(f" Conversation ID: {conversation.get('id', 'N/A')}")
logger.info(f" Sender URI: {sender.get('speakerUri', 'N/A')}")
logger.info(f" Sender Service URL: {sender.get('serviceUrl', 'N/A')}")
logger.info(f" Number of Events: {len(events)}")
for idx, event in enumerate(events, 1):
event_type = event.get("eventType", "unknown")
to_info = event.get("to", {})
logger.info(f" Event #{idx}:")
logger.info(f" Type: {event_type}")
if to_info:
target_uri = to_info.get("speakerUri", "all")
is_private = to_info.get("private", False)
logger.info(f" To: {target_uri}")
logger.info(f" Private: {is_private}")
# Log utterance content with detailed dialog event info
if event_type == "utterance":
params = event.get("parameters", {})
dialog_event = params.get("dialogEvent", {})
if dialog_event:
de_id = dialog_event.get("id", "N/A")
de_speaker = dialog_event.get("speakerUri", "N/A")
span = dialog_event.get("span", {})
start_time = span.get("startTime", "N/A")
logger.info(f" Dialog Event ID: {de_id}")
logger.info(f" Dialog Event Speaker: {de_speaker}")
logger.info(f" Start Time: {start_time}")
features = dialog_event.get("features", {})
text_feature = features.get("text", {})
mime_type = text_feature.get("mimeType", "N/A")
tokens = text_feature.get("tokens", [])
logger.info(f" Text MIME Type: {mime_type}")
if tokens:
for token_idx, token in enumerate(tokens):
value = token.get("value", "")
confidence = token.get("confidence", 1.0)
logger.info(f" Token #{token_idx + 1}:")
logger.info(f" Value: {value}")
logger.info(f" Confidence: {confidence}")
# Log full JSON structure
logger.debug(f"{direction} COMPLETE ENVELOPE JSON:")
logger.debug(json.dumps(envelope_dict, indent=4))
except Exception as e:
logger.error(f"Error logging envelope details: {e}")
def create_session(
self,
session_id: str,
floor_manager_uri: str,
floor_manager_url: str
) -> FloorSession:
"""
Create a new floor session
Args:
session_id: Unique session identifier
floor_manager_uri: Speaker URI of the floor manager
floor_manager_url: Service URL of the floor manager
Returns:
FloorSession object
"""
session = FloorSession(
session_id=session_id,
floor_manager_uri=floor_manager_uri,
floor_manager_url=floor_manager_url,
participants={},
created_at=datetime.now()
)
self.sessions[session_id] = session
logger.info(f"Created floor session: {session_id}")
return session
def get_session(self, session_id: str) -> Optional[FloorSession]:
"""Get a session by ID"""
return self.sessions.get(session_id)
def get_agent_manager(self, session_id: str) -> Optional[AgentManager]:
"""
Get an AgentManager for a specific session
Args:
session_id: Session identifier
Returns:
AgentManager instance or None
"""
session = self.get_session(session_id)
if not session:
return None
return AgentManager(
floor_manager_uri=session.floor_manager_uri,
floor_manager_url=session.floor_manager_url
)
async def broadcast_envelope(
self,
session_id: str,
envelope_dict: Dict[str, Any],
sender_uri: Optional[str] = None,
timeout: int = 10
) -> Dict[str, bool]:
"""
Broadcast an envelope to all participants in a session
Per OFP specification, when an agent sends a message to the floor,
the floor manager should forward it to all other participants.
Args:
session_id: Session identifier
envelope_dict: OFP envelope dictionary to broadcast
sender_uri: Speaker URI of the sender (to exclude from broadcast)
timeout: Request timeout in seconds
Returns:
Dict mapping participant URIs to success status
"""
session = self.get_session(session_id)
if not session:
logger.error(f"Session not found: {session_id}")
return {}
results = {}
participant_urls = session.get_participant_urls(exclude_speaker=sender_uri)
if not participant_urls:
logger.info(f"No participants to broadcast to in session {session_id}")
return results
logger.info(f"Broadcasting envelope to {len(participant_urls)} participants")
# ENHANCED: Log the outgoing envelope we're about to send
import json
print(f"\n{'='*80}")
print(f"📤 OUTGOING ENVELOPE TO BROADCAST:")
print(f"{'='*80}")
print(json.dumps(envelope_dict, indent=2))
print(f"{'='*80}\n")
logger.info(f"📤 OUTGOING ENVELOPE:")
logger.info(json.dumps(envelope_dict, indent=2))
for speaker_uri, agent_info in session.participants.items():
# Skip the sender
if sender_uri and speaker_uri == sender_uri:
continue
if not agent_info.service_url:
logger.warning(f"No service URL for participant: {speaker_uri}")
results[speaker_uri] = False
continue
try:
logger.info(f"📤 Sending envelope to {speaker_uri} at {agent_info.service_url}")
logger.debug(f"📤 Envelope content: {envelope_dict}")
response = requests.post(
agent_info.service_url,
json=envelope_dict,
headers={
'Content-Type': 'application/json',
'User-Agent': f'OFP-FloorManager/{settings.OFP_VERSION}'
},
timeout=timeout
)
response.raise_for_status()
results[speaker_uri] = True
# Log the response from the agent
logger.info(f"✓ Envelope delivered to {speaker_uri} - Status: {response.status_code}")
# Try to parse and log agent's response
try:
response_data = response.json()
# ENHANCED: Log complete raw response first
import json
print(f"\n{'='*80}")
print(f"📥 AGENT RESPONSE FROM: {speaker_uri}")
print(f"{'='*80}")
# Print AND log the RAW JSON response for debugging
print(f"📥 RAW RESPONSE JSON:")
print(json.dumps(response_data, indent=2))
print(f"-" * 80)
logger.info(f"📥 Response from {speaker_uri}")
logger.info(f"📥 RAW RESPONSE JSON:")
logger.info(json.dumps(response_data, indent=2))
# Use the enhanced logging method
self._log_envelope_details(response_data, direction="📥 INCOMING")
# Check if agent sent any events back
if "openFloor" in response_data:
response_envelope = response_data.get("openFloor", {})
response_sender = response_envelope.get("sender", {})
events = response_envelope.get("events", [])
print(f"Response Sender: {response_sender.get('speakerUri')}")
print(f"Number of Events: {len(events)}")
print(f"-" * 80)
if events:
logger.info(f"🔔 Agent {speaker_uri} sent {len(events)} event(s) in response")
for idx, event in enumerate(events, 1):
print(f"\n📋 Event #{idx}:")
event_type = event.get("eventType")
to_info = event.get("to", {})
is_private = to_info.get("private", False)
target_uri = to_info.get("speakerUri")
print(f" Type: {event_type}")
if to_info:
print(f" To: {target_uri or 'all'}")
print(f" Private: {is_private}")
logger.info(f" - Event type: {event_type}")
if is_private and target_uri:
logger.info(f" - Private message to: {target_uri}")
# Extract utterance content if present
if event_type == "utterance":
params = event.get("parameters", {})
dialog_event = params.get("dialogEvent", {})
features = dialog_event.get("features", {})
# Extract text feature
text_feature = features.get("text", {})
tokens = text_feature.get("tokens", [])
text_content = None
if tokens:
# Try both "value" and "token" fields (different agents use different formats)
text_content = " ".join([
t.get("value", t.get("token", "")) for t in tokens
])
if is_private:
logger.info(f" 🔒 Private message: {text_content}")
else:
logger.info(f" 💬 Agent says: {text_content}")
# Extract HTML feature if present
html_feature = features.get("html", {})
html_tokens = html_feature.get("tokens", [])
html_content = None
if html_tokens:
html_content = "".join([
t.get("value", "") for t in html_tokens
])
logger.info(f" 🌐 Agent included HTML content ({len(html_content)} chars)")
# Store response for retrieval (include private messages and HTML)
if text_content or html_content:
if not hasattr(self, '_agent_responses'):
self._agent_responses = {}
if session_id not in self._agent_responses:
self._agent_responses[session_id] = []
agent_name = agent_info.manifest.get("conversationalName", speaker_uri) if agent_info.manifest else speaker_uri
if is_private and target_uri:
agent_name = f"{agent_name} (private to {target_uri})"
self._agent_responses[session_id].append({
"agent_uri": speaker_uri,
"agent_name": agent_name,
"message": text_content or "",
"html_content": html_content,
"timestamp": datetime.now(),
"is_private": is_private,
"target_uri": target_uri
})
except ValueError:
logger.debug(f"Response from {speaker_uri} is not JSON: {response.text[:200]}")
except requests.exceptions.Timeout:
logger.error(f"✗ Timeout sending to {speaker_uri}")
results[speaker_uri] = False
except requests.exceptions.RequestException as e:
logger.error(f"✗ Failed to send to {speaker_uri}: {e}")
results[speaker_uri] = False
except Exception as e:
logger.error(f"✗ Unexpected error sending to {speaker_uri}: {e}")
results[speaker_uri] = False
successful = sum(1 for success in results.values() if success)
logger.info(f"Broadcast complete: {successful}/{len(results)} successful")
return results
async def handle_incoming_envelope(
self,
session_id: str,
envelope_dict: Dict[str, Any]
) -> bool:
"""
Handle an incoming envelope and broadcast to participants
This is the main entry point for envelopes received by the floor.
The floor manager will:
1. Validate the envelope
2. Process any floor control events
3. Broadcast to all other participants
Args:
session_id: Session identifier
envelope_dict: OFP envelope dictionary
Returns:
True if handled successfully
"""
try:
# Extract sender information
sender_info = envelope_dict.get("openFloor", {}).get("sender", {})
sender_uri = sender_info.get("speakerUri")
if not sender_uri:
logger.error("Envelope missing sender speakerUri")
return False
logger.info(f"Received envelope from {sender_uri} for session {session_id}")
# Process floor control events (grant, revoke, request, etc.)
await self._process_floor_events(session_id, envelope_dict)
# Broadcast to all participants except sender
await self.broadcast_envelope(
session_id=session_id,
envelope_dict=envelope_dict,
sender_uri=sender_uri
)
return True
except Exception as e:
logger.error(f"Error handling envelope: {e}")
return False
async def _process_floor_events(
self,
session_id: str,
envelope_dict: Dict[str, Any]
):
"""
Process floor control events (grantFloor, revokeFloor, requestFloor, etc.)
Args:
session_id: Session identifier
envelope_dict: OFP envelope dictionary
"""
session = self.get_session(session_id)
if not session:
return
events = envelope_dict.get("openFloor", {}).get("events", [])
sender_uri = envelope_dict.get("openFloor", {}).get("sender", {}).get("speakerUri")
for event in events:
event_type = event.get("eventType")
if event_type == "requestFloor":
logger.info(f"Floor requested by {sender_uri}")
# In a full implementation, this would trigger convener logic
elif event_type == "grantFloor":
to_uri = event.get("to", {}).get("speakerUri")
if to_uri:
session.floor_holder = to_uri
logger.info(f"Floor granted to {to_uri}")
elif event_type == "revokeFloor":
logger.info(f"Floor revoked from {session.floor_holder}")
session.floor_holder = None
elif event_type == "yieldFloor":
if sender_uri == session.floor_holder:
session.floor_holder = None
logger.info(f"Floor yielded by {sender_uri}")
async def send_floor_notification(
self,
session_id: str,
event_type: str,
target_uri: Optional[str] = None,
reason: Optional[str] = None
) -> bool:
"""
Send a floor control notification to all participants
Args:
session_id: Session identifier
event_type: Type of floor event (grantFloor, revokeFloor, etc.)
target_uri: Optional target speaker URI
reason: Optional reason for the event
Returns:
True if sent successfully
"""
session = self.get_session(session_id)
if not session:
return False
# Create floor notification envelope
event = {
"eventType": event_type
}
if target_uri:
event["to"] = {
"speakerUri": target_uri
}
if reason:
event["reason"] = reason
envelope_dict = {
"openFloor": {
"schema": {
"version": settings.OFP_VERSION
},
"conversation": {
"id": session_id
},
"sender": {
"speakerUri": session.floor_manager_uri,
"serviceUrl": session.floor_manager_url
},
"events": [event]
}
}
# Broadcast to all participants
results = await self.broadcast_envelope(
session_id=session_id,
envelope_dict=envelope_dict
)
return any(results.values())
def get_agent_responses(self, session_id: str) -> List[Dict[str, Any]]:
"""
Get agent responses for a session
Args:
session_id: Session identifier
Returns:
List of agent response dictionaries
"""
if not hasattr(self, '_agent_responses'):
return []
return self._agent_responses.get(session_id, [])
def clear_agent_responses(self, session_id: str):
"""Clear agent responses for a session"""
if hasattr(self, '_agent_responses') and session_id in self._agent_responses:
self._agent_responses[session_id] = []