""" Simple Gradio Test Interface for OFP Floor Manager This is a minimal working version for testing the UI """ import gradio as gr from datetime import datetime from typing import List, Tuple, Optional import uuid import asyncio from src.floor_manager import FloorManager, FloorSession from src.agent_manager import AgentManager, AgentInfo # Simple mock classes for testing class MockAgent: def __init__(self, agent_id: str, name: str): self.agent_id = agent_id self.name = name self.has_floor = False self.joined_at = datetime.now() def to_display(self): status = "π€ Has Floor" if self.has_floor else "βΈοΈ Listening" return [self.name, self.agent_id[:8], status] class MockFloorSession: def __init__(self, session_id: str): self.session_id = session_id self.agents = {} self.floor_holder = None self.messages = [] self.created_at = datetime.now() def add_agent(self, name: str) -> str: agent_id = f"agent_{uuid.uuid4().hex[:8]}" agent = MockAgent(agent_id, name) self.agents[agent_id] = agent # Add system message self.messages.append({ "role": "system", "content": f"π {name} joined the session", "timestamp": datetime.now().strftime("%H:%M:%S") }) # Auto-grant floor if first agent if len(self.agents) == 1: self.grant_floor(agent_id) return agent_id def grant_floor(self, agent_id: str): # Revoke from current holder if self.floor_holder and self.floor_holder in self.agents: self.agents[self.floor_holder].has_floor = False # Grant to new holder if agent_id in self.agents: self.agents[agent_id].has_floor = True self.floor_holder = agent_id self.messages.append({ "role": "system", "content": f"π€ Floor granted to {self.agents[agent_id].name}", "timestamp": datetime.now().strftime("%H:%M:%S") }) def revoke_floor(self): if self.floor_holder and self.floor_holder in self.agents: agent_name = self.agents[self.floor_holder].name self.agents[self.floor_holder].has_floor = False self.floor_holder = None self.messages.append({ "role": "system", "content": f"βΈοΈ Floor revoked from {agent_name}", "timestamp": datetime.now().strftime("%H:%M:%S") }) def add_message(self, agent_id: str, content: str): if agent_id in self.agents: agent = self.agents[agent_id] self.messages.append({ "role": agent.name, "content": content, "timestamp": datetime.now().strftime("%H:%M:%S") }) def get_agent_list(self): return [agent.to_display() for agent in self.agents.values()] def get_floor_status(self): if self.floor_holder and self.floor_holder in self.agents: agent = self.agents[self.floor_holder] return f"π€ Current Speaker: {agent.name} ({agent.agent_id[:8]})" return "βΈοΈ No active speaker" def format_chat_history(self): """Format messages for Gradio Chatbot with messages format""" formatted = [] for msg in self.messages: if msg["role"] == "system": # System messages as assistant role formatted.append({ "role": "assistant", "content": f"_{msg['content']}_" }) else: # Agent messages as user role with name prefix formatted.append({ "role": "user", "content": f"**{msg['role']}** [{msg['timestamp']}]: {msg['content']}" }) return formatted # Global session (in production, this would be managed differently) current_session: Optional[MockFloorSession] = None # Agent Manager for OFP agent interactions agent_manager: Optional[AgentManager] = None # Floor Manager for handling OFP envelope broadcasting floor_manager: FloorManager = FloorManager() floor_session: Optional[FloorSession] = None def create_new_session(): """Create a new floor session""" global current_session, agent_manager, floor_session session_id = f"session_{uuid.uuid4().hex[:8]}" current_session = MockFloorSession(session_id) # Initialize agent manager and floor session floor_manager_uri = f"tag:floormanager.local,2025:{session_id}" floor_manager_url = "http://localhost:7860/ofp" agent_manager = AgentManager(floor_manager_uri, floor_manager_url) # Create OFP floor session for broadcasting floor_session = floor_manager.create_session( session_id=session_id, floor_manager_uri=floor_manager_uri, floor_manager_url=floor_manager_url ) print(f"β Created floor session: {session_id}") return ( session_id, current_session.format_chat_history(), current_session.get_floor_status(), current_session.get_agent_list(), gr.update(interactive=True), gr.update(interactive=True) # Enable OFP agent URL input ) def add_agent(agent_name: str): """Add an agent to the session""" if not current_session: return "β Create a session first!", "", [], "No session", [], gr.Dropdown(choices=[]) if not agent_name.strip(): return "β Agent name cannot be empty!", agent_name, current_session.format_chat_history(), current_session.get_floor_status(), current_session.get_agent_list(), gr.Dropdown(choices=[]) agent_id = current_session.add_agent(agent_name.strip()) agent_choices = get_agent_dropdown_choices() return ( f"β Added agent: {agent_name}", "", current_session.format_chat_history(), current_session.get_floor_status(), current_session.get_agent_list(), gr.Dropdown(choices=agent_choices) ) def add_ofp_agent(agent_url: str): """Add an OFP agent by URL""" global floor_session if not current_session or not agent_manager or not floor_session: return "β Create a session first!", agent_url, current_session.format_chat_history() if current_session else [], current_session.get_floor_status() if current_session else "No session", current_session.get_agent_list() if current_session else [], gr.Dropdown(choices=[]) if not agent_url.strip(): return "β Agent URL cannot be empty!", agent_url, current_session.format_chat_history(), current_session.get_floor_status(), current_session.get_agent_list(), gr.Dropdown(choices=[]) try: # Run async function in event loop loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) agent_info = loop.run_until_complete( agent_manager.add_agent(agent_url.strip(), current_session.session_id) ) loop.close() if not agent_info: return f"β Failed to add OFP agent from {agent_url}", agent_url, current_session.format_chat_history(), current_session.get_floor_status(), current_session.get_agent_list(), gr.Dropdown(choices=[]) # Add agent to floor session for envelope broadcasting floor_session.add_participant(agent_info) print(f"β Added {agent_info.speaker_uri} to floor session participants") # Add agent to mock session for UI agent_name = agent_info.manifest.get("conversationalName", agent_info.speaker_uri.split(":")[-1]) current_session.add_agent(agent_name) agent_choices = get_agent_dropdown_choices() return ( f"β Added OFP agent: {agent_name} ({agent_info.speaker_uri})", "", current_session.format_chat_history(), current_session.get_floor_status(), current_session.get_agent_list(), gr.Dropdown(choices=agent_choices) ) except Exception as e: return f"β Error adding OFP agent: {str(e)}", agent_url, current_session.format_chat_history(), current_session.get_floor_status(), current_session.get_agent_list(), gr.Dropdown(choices=[]) def send_message(agent_name: str, message: str): """Send a message from an agent and broadcast to OFP participants""" global floor_session if not current_session: return "β Create a session first!", [], "No session" if not message.strip(): return current_session.format_chat_history(), "", current_session.get_floor_status() # Find agent by name agent_id = None for aid, agent in current_session.agents.items(): if agent.name == agent_name: agent_id = aid break if not agent_id: return current_session.format_chat_history(), message, current_session.get_floor_status() # Add message to UI current_session.add_message(agent_id, message.strip()) # Create and broadcast OFP envelope to all participants if floor_session: try: from src.utils.config import settings # Create sender speaker URI for the user sender_uri = f"tag:user.floormanager.local,2025:{agent_name}" # Create OFP envelope with utterance event from datetime import datetime, timezone envelope_dict = { "openFloor": { "schema": { "version": settings.OFP_VERSION }, "conversation": { "id": current_session.session_id }, "sender": { "speakerUri": sender_uri, "serviceUrl": floor_session.floor_manager_url }, "events": [ { "eventType": "utterance", "parameters": { "dialogEvent": { "id": f"de:{uuid.uuid4()}", "speakerUri": sender_uri, "span": { "startTime": datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z') }, "features": { "text": { "mimeType": "text/plain", "tokens": [{"value": message.strip(), "confidence": 1.0}] } } } } } ] } } # Broadcast to all OFP agents print(f"π€ Broadcasting message to {len(floor_session.participants)} OFP agent(s)") loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) results = loop.run_until_complete( floor_manager.broadcast_envelope( session_id=current_session.session_id, envelope_dict=envelope_dict, sender_uri=sender_uri ) ) loop.close() print(f"β Broadcast complete: {sum(1 for r in results.values() if r)}/{len(results)} successful") # Check for agent responses agent_responses = floor_manager.get_agent_responses(current_session.session_id) html_content = None if agent_responses: print(f"π₯ Received {len(agent_responses)} agent response(s)") for response in agent_responses: # Add agent response to UI agent_name = response["agent_name"] agent_msg = response["message"] print(f" π¬ {agent_name}: {agent_msg}") # Add to mock session as a system message current_session.messages.append({ "role": agent_name, "content": agent_msg, "timestamp": response["timestamp"].strftime("%H:%M:%S") }) # Check for HTML content if response.get("html_content"): html_content = response["html_content"] print(f" π {agent_name} sent HTML content ({len(html_content)} chars)") # Clear responses after displaying floor_manager.clear_agent_responses(current_session.session_id) except Exception as e: print(f"β Error broadcasting message: {e}") import traceback traceback.print_exc() # Display HTML if available, otherwise show default html_display = html_content if html_content else "
No HTML content yet. Agents can send rich HTML responses that will be displayed here.
" return ( current_session.format_chat_history(), "", current_session.get_floor_status(), html_display ) def grant_floor_to_agent(agent_name: str): """Grant floor to selected agent""" if not current_session or not agent_name: return [], "No session", [] # Find agent by name for aid, agent in current_session.agents.items(): if agent.name == agent_name: current_session.grant_floor(aid) break return ( current_session.format_chat_history(), current_session.get_floor_status(), current_session.get_agent_list() ) def revoke_floor_action(): """Revoke floor from current holder""" if not current_session: return [], "No session", [] current_session.revoke_floor() return ( current_session.format_chat_history(), current_session.get_floor_status(), current_session.get_agent_list() ) def get_agent_dropdown_choices(): """Get list of agent names for dropdown""" if not current_session: return [] return [agent.name for agent in current_session.agents.values()] # Create Gradio interface with gr.Blocks(title="OFP Floor Manager Test") as demo: gr.Markdown("# π€ OpenFloor Protocol - Floor Manager Test") gr.Markdown("Test interface for managing multi-agent floor conversations") with gr.Row(): # Left Panel: Session Control with gr.Column(scale=1): gr.Markdown("### Session Management") session_id = gr.Textbox( label="Session ID", interactive=False, placeholder="No active session" ) create_btn = gr.Button("π Create New Session", variant="primary") gr.Markdown("---") gr.Markdown("### Agent Management") with gr.Tabs(): with gr.Tab("Manual Agent"): agent_name_input = gr.Textbox( label="Agent Name", placeholder="Enter agent name..." ) add_agent_btn = gr.Button("β Add Manual Agent") with gr.Tab("OFP Agent"): ofp_agent_url = gr.Textbox( label="Agent Service URL", placeholder="https://agent.example.com/ofp", interactive=False ) add_ofp_agent_btn = gr.Button("π Add OFP Agent") add_agent_status = gr.Textbox( label="Status", interactive=False ) gr.Markdown("---") gr.Markdown("### Connected Agents") agent_table = gr.Dataframe( headers=["Name", "ID", "Status"], datatype=["str", "str", "str"], label="Agents", interactive=False ) gr.Markdown("---") gr.Markdown("### Floor Control") agent_selector = gr.Dropdown( label="Select Agent", choices=[], interactive=True, allow_custom_value=False ) with gr.Row(): grant_btn = gr.Button("π€ Grant Floor", variant="primary") revoke_btn = gr.Button("βΈοΈ Revoke Floor", variant="stop") # Right Panel: Conversation with gr.Column(scale=2): gr.Markdown("### Floor Conversation") floor_status = gr.Textbox( label="Current Floor Status", interactive=False, value="βΈοΈ No active speaker" ) chatbot = gr.Chatbot( label="Conversation", height=400 ) html_output = gr.HTML( label="Agent HTML Response", value="No HTML content yet. Agents can send rich HTML responses that will be displayed here.
" ) gr.Markdown("### Send Message") with gr.Row(): message_agent = gr.Dropdown( label="Speaking As", choices=[], scale=2, interactive=True, allow_custom_value=False ) message_input = gr.Textbox( label="Message", placeholder="Type your message...", scale=4, interactive=True ) send_btn = gr.Button("π€ Send", scale=1, variant="primary", interactive=False) # Event Handlers create_btn.click( fn=create_new_session, outputs=[session_id, chatbot, floor_status, agent_table, send_btn, ofp_agent_url] ) add_agent_btn.click( fn=add_agent, inputs=[agent_name_input], outputs=[add_agent_status, agent_name_input, chatbot, floor_status, agent_table, agent_selector] ).then( fn=lambda: gr.Dropdown(choices=get_agent_dropdown_choices()), outputs=[message_agent] ) add_ofp_agent_btn.click( fn=add_ofp_agent, inputs=[ofp_agent_url], outputs=[add_agent_status, ofp_agent_url, chatbot, floor_status, agent_table, agent_selector] ).then( fn=lambda: gr.Dropdown(choices=get_agent_dropdown_choices()), outputs=[message_agent] ) send_btn.click( fn=send_message, inputs=[message_agent, message_input], outputs=[chatbot, message_input, floor_status, html_output] ) grant_btn.click( fn=grant_floor_to_agent, inputs=[agent_selector], outputs=[chatbot, floor_status, agent_table] ) revoke_btn.click( fn=revoke_floor_action, outputs=[chatbot, floor_status, agent_table] ) if __name__ == "__main__": print("π Starting OFP Floor Manager Test Interface...") print("π± Open your browser to test the interface") print("=" * 50) demo.launch(server_name="0.0.0.0", server_port=7860, share=False, theme=gr.themes.Soft())