Spaces:
Sleeping
Sleeping
| """ | |
| Simple Gradio Test Interface for OFP Floor Manager | |
| This is a minimal working version for testing the UI | |
| """ | |
| import gradio as gr | |
| from datetime import datetime | |
| from typing import List, Tuple, Optional | |
| import uuid | |
| import asyncio | |
| from src.floor_manager import FloorManager, FloorSession | |
| from src.agent_manager import AgentManager, AgentInfo | |
| # Simple mock classes for testing | |
| class MockAgent: | |
| def __init__(self, agent_id: str, name: str): | |
| self.agent_id = agent_id | |
| self.name = name | |
| self.has_floor = False | |
| self.joined_at = datetime.now() | |
| def to_display(self): | |
| status = "🎤 Has Floor" if self.has_floor else "⏸️ Listening" | |
| return [self.name, self.agent_id[:8], status] | |
| class MockFloorSession: | |
| def __init__(self, session_id: str): | |
| self.session_id = session_id | |
| self.agents = {} | |
| self.floor_holder = None | |
| self.messages = [] | |
| self.created_at = datetime.now() | |
| def add_agent(self, name: str) -> str: | |
| agent_id = f"agent_{uuid.uuid4().hex[:8]}" | |
| agent = MockAgent(agent_id, name) | |
| self.agents[agent_id] = agent | |
| # Add system message | |
| self.messages.append({ | |
| "role": "system", | |
| "content": f"🔔 {name} joined the session", | |
| "timestamp": datetime.now().strftime("%H:%M:%S") | |
| }) | |
| # Auto-grant floor if first agent | |
| if len(self.agents) == 1: | |
| self.grant_floor(agent_id) | |
| return agent_id | |
| def grant_floor(self, agent_id: str): | |
| # Revoke from current holder | |
| if self.floor_holder and self.floor_holder in self.agents: | |
| self.agents[self.floor_holder].has_floor = False | |
| # Grant to new holder | |
| if agent_id in self.agents: | |
| self.agents[agent_id].has_floor = True | |
| self.floor_holder = agent_id | |
| self.messages.append({ | |
| "role": "system", | |
| "content": f"🎤 Floor granted to {self.agents[agent_id].name}", | |
| "timestamp": datetime.now().strftime("%H:%M:%S") | |
| }) | |
| def revoke_floor(self): | |
| if self.floor_holder and self.floor_holder in self.agents: | |
| agent_name = self.agents[self.floor_holder].name | |
| self.agents[self.floor_holder].has_floor = False | |
| self.floor_holder = None | |
| self.messages.append({ | |
| "role": "system", | |
| "content": f"⏸️ Floor revoked from {agent_name}", | |
| "timestamp": datetime.now().strftime("%H:%M:%S") | |
| }) | |
| def add_message(self, agent_id: str, content: str): | |
| if agent_id in self.agents: | |
| agent = self.agents[agent_id] | |
| self.messages.append({ | |
| "role": agent.name, | |
| "content": content, | |
| "timestamp": datetime.now().strftime("%H:%M:%S") | |
| }) | |
| def get_agent_list(self): | |
| return [agent.to_display() for agent in self.agents.values()] | |
| def get_floor_status(self): | |
| if self.floor_holder and self.floor_holder in self.agents: | |
| agent = self.agents[self.floor_holder] | |
| return f"🎤 Current Speaker: {agent.name} ({agent.agent_id[:8]})" | |
| return "⏸️ No active speaker" | |
| def format_chat_history(self): | |
| """Format messages for Gradio Chatbot with messages format""" | |
| formatted = [] | |
| for msg in self.messages: | |
| if msg["role"] == "system": | |
| # System messages as assistant role | |
| formatted.append({ | |
| "role": "assistant", | |
| "content": f"_{msg['content']}_" | |
| }) | |
| else: | |
| # Agent messages as user role with name prefix | |
| formatted.append({ | |
| "role": "user", | |
| "content": f"**{msg['role']}** [{msg['timestamp']}]: {msg['content']}" | |
| }) | |
| return formatted | |
| # Global session (in production, this would be managed differently) | |
| current_session: Optional[MockFloorSession] = None | |
| # Agent Manager for OFP agent interactions | |
| agent_manager: Optional[AgentManager] = None | |
| # Floor Manager for handling OFP envelope broadcasting | |
| floor_manager: FloorManager = FloorManager() | |
| floor_session: Optional[FloorSession] = None | |
| def create_new_session(): | |
| """Create a new floor session""" | |
| global current_session, agent_manager, floor_session | |
| session_id = f"session_{uuid.uuid4().hex[:8]}" | |
| current_session = MockFloorSession(session_id) | |
| # Initialize agent manager and floor session | |
| floor_manager_uri = f"tag:floormanager.local,2025:{session_id}" | |
| floor_manager_url = "http://localhost:7860/ofp" | |
| agent_manager = AgentManager(floor_manager_uri, floor_manager_url) | |
| # Create OFP floor session for broadcasting | |
| floor_session = floor_manager.create_session( | |
| session_id=session_id, | |
| floor_manager_uri=floor_manager_uri, | |
| floor_manager_url=floor_manager_url | |
| ) | |
| print(f"✅ Created floor session: {session_id}") | |
| return ( | |
| session_id, | |
| current_session.format_chat_history(), | |
| current_session.get_floor_status(), | |
| current_session.get_agent_list(), | |
| gr.update(interactive=True), | |
| gr.update(interactive=True) # Enable OFP agent URL input | |
| ) | |
| def add_agent(agent_name: str): | |
| """Add an agent to the session""" | |
| if not current_session: | |
| return "❌ Create a session first!", "", [], "No session", [], gr.Dropdown(choices=[]) | |
| if not agent_name.strip(): | |
| return "❌ Agent name cannot be empty!", agent_name, current_session.format_chat_history(), current_session.get_floor_status(), current_session.get_agent_list(), gr.Dropdown(choices=[]) | |
| agent_id = current_session.add_agent(agent_name.strip()) | |
| agent_choices = get_agent_dropdown_choices() | |
| return ( | |
| f"✅ Added agent: {agent_name}", | |
| "", | |
| current_session.format_chat_history(), | |
| current_session.get_floor_status(), | |
| current_session.get_agent_list(), | |
| gr.Dropdown(choices=agent_choices) | |
| ) | |
| def add_ofp_agent(agent_url: str): | |
| """Add an OFP agent by URL""" | |
| global floor_session | |
| if not current_session or not agent_manager or not floor_session: | |
| return "❌ Create a session first!", agent_url, current_session.format_chat_history() if current_session else [], current_session.get_floor_status() if current_session else "No session", current_session.get_agent_list() if current_session else [], gr.Dropdown(choices=[]) | |
| if not agent_url.strip(): | |
| return "❌ Agent URL cannot be empty!", agent_url, current_session.format_chat_history(), current_session.get_floor_status(), current_session.get_agent_list(), gr.Dropdown(choices=[]) | |
| try: | |
| # Run async function in event loop | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| agent_info = loop.run_until_complete( | |
| agent_manager.add_agent(agent_url.strip(), current_session.session_id) | |
| ) | |
| loop.close() | |
| if not agent_info: | |
| return f"❌ Failed to add OFP agent from {agent_url}", agent_url, current_session.format_chat_history(), current_session.get_floor_status(), current_session.get_agent_list(), gr.Dropdown(choices=[]) | |
| # Add agent to floor session for envelope broadcasting | |
| floor_session.add_participant(agent_info) | |
| print(f"✅ Added {agent_info.speaker_uri} to floor session participants") | |
| # Add agent to mock session for UI | |
| agent_name = agent_info.manifest.get("conversationalName", agent_info.speaker_uri.split(":")[-1]) | |
| current_session.add_agent(agent_name) | |
| agent_choices = get_agent_dropdown_choices() | |
| return ( | |
| f"✅ Added OFP agent: {agent_name} ({agent_info.speaker_uri})", | |
| "", | |
| current_session.format_chat_history(), | |
| current_session.get_floor_status(), | |
| current_session.get_agent_list(), | |
| gr.Dropdown(choices=agent_choices) | |
| ) | |
| except Exception as e: | |
| return f"❌ Error adding OFP agent: {str(e)}", agent_url, current_session.format_chat_history(), current_session.get_floor_status(), current_session.get_agent_list(), gr.Dropdown(choices=[]) | |
| def send_message(agent_name: str, message: str): | |
| """Send a message from an agent and broadcast to OFP participants""" | |
| global floor_session | |
| if not current_session: | |
| return "❌ Create a session first!", [], "No session" | |
| if not message.strip(): | |
| return current_session.format_chat_history(), "", current_session.get_floor_status() | |
| # Find agent by name | |
| agent_id = None | |
| for aid, agent in current_session.agents.items(): | |
| if agent.name == agent_name: | |
| agent_id = aid | |
| break | |
| if not agent_id: | |
| return current_session.format_chat_history(), message, current_session.get_floor_status() | |
| # Add message to UI | |
| current_session.add_message(agent_id, message.strip()) | |
| # Create and broadcast OFP envelope to all participants | |
| if floor_session: | |
| try: | |
| from src.utils.config import settings | |
| # Create sender speaker URI for the user | |
| sender_uri = f"tag:user.floormanager.local,2025:{agent_name}" | |
| # Create OFP envelope with utterance event | |
| from datetime import datetime, timezone | |
| envelope_dict = { | |
| "openFloor": { | |
| "schema": { | |
| "version": settings.OFP_VERSION | |
| }, | |
| "conversation": { | |
| "id": current_session.session_id | |
| }, | |
| "sender": { | |
| "speakerUri": sender_uri, | |
| "serviceUrl": floor_session.floor_manager_url | |
| }, | |
| "events": [ | |
| { | |
| "eventType": "utterance", | |
| "parameters": { | |
| "dialogEvent": { | |
| "id": f"de:{uuid.uuid4()}", | |
| "speakerUri": sender_uri, | |
| "span": { | |
| "startTime": datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z') | |
| }, | |
| "features": { | |
| "text": { | |
| "mimeType": "text/plain", | |
| "tokens": [{"value": message.strip(), "confidence": 1.0}] | |
| } | |
| } | |
| } | |
| } | |
| } | |
| ] | |
| } | |
| } | |
| # Broadcast to all OFP agents | |
| print(f"📤 Broadcasting message to {len(floor_session.participants)} OFP agent(s)") | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| results = loop.run_until_complete( | |
| floor_manager.broadcast_envelope( | |
| session_id=current_session.session_id, | |
| envelope_dict=envelope_dict, | |
| sender_uri=sender_uri | |
| ) | |
| ) | |
| loop.close() | |
| print(f"✅ Broadcast complete: {sum(1 for r in results.values() if r)}/{len(results)} successful") | |
| # Check for agent responses | |
| agent_responses = floor_manager.get_agent_responses(current_session.session_id) | |
| html_content = None | |
| if agent_responses: | |
| print(f"📥 Received {len(agent_responses)} agent response(s)") | |
| for response in agent_responses: | |
| # Add agent response to UI | |
| agent_name = response["agent_name"] | |
| agent_msg = response["message"] | |
| print(f" 💬 {agent_name}: {agent_msg}") | |
| # Add to mock session as a system message | |
| current_session.messages.append({ | |
| "role": agent_name, | |
| "content": agent_msg, | |
| "timestamp": response["timestamp"].strftime("%H:%M:%S") | |
| }) | |
| # Check for HTML content | |
| if response.get("html_content"): | |
| html_content = response["html_content"] | |
| print(f" 🌐 {agent_name} sent HTML content ({len(html_content)} chars)") | |
| # Clear responses after displaying | |
| floor_manager.clear_agent_responses(current_session.session_id) | |
| except Exception as e: | |
| print(f"❌ Error broadcasting message: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| # Display HTML if available, otherwise show default | |
| html_display = html_content if html_content else "<p style='color: #666;'>No HTML content yet. Agents can send rich HTML responses that will be displayed here.</p>" | |
| return ( | |
| current_session.format_chat_history(), | |
| "", | |
| current_session.get_floor_status(), | |
| html_display | |
| ) | |
| def grant_floor_to_agent(agent_name: str): | |
| """Grant floor to selected agent""" | |
| if not current_session or not agent_name: | |
| return [], "No session", [] | |
| # Find agent by name | |
| for aid, agent in current_session.agents.items(): | |
| if agent.name == agent_name: | |
| current_session.grant_floor(aid) | |
| break | |
| return ( | |
| current_session.format_chat_history(), | |
| current_session.get_floor_status(), | |
| current_session.get_agent_list() | |
| ) | |
| def revoke_floor_action(): | |
| """Revoke floor from current holder""" | |
| if not current_session: | |
| return [], "No session", [] | |
| current_session.revoke_floor() | |
| return ( | |
| current_session.format_chat_history(), | |
| current_session.get_floor_status(), | |
| current_session.get_agent_list() | |
| ) | |
| def get_agent_dropdown_choices(): | |
| """Get list of agent names for dropdown""" | |
| if not current_session: | |
| return [] | |
| return [agent.name for agent in current_session.agents.values()] | |
| # Create Gradio interface | |
| with gr.Blocks(title="OFP Floor Manager Test") as demo: | |
| gr.Markdown("# 🎤 OpenFloor Protocol - Floor Manager Test") | |
| gr.Markdown("Test interface for managing multi-agent floor conversations") | |
| with gr.Row(): | |
| # Left Panel: Session Control | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Session Management") | |
| session_id = gr.Textbox( | |
| label="Session ID", | |
| interactive=False, | |
| placeholder="No active session" | |
| ) | |
| create_btn = gr.Button("🆕 Create New Session", variant="primary") | |
| gr.Markdown("---") | |
| gr.Markdown("### Agent Management") | |
| with gr.Tabs(): | |
| with gr.Tab("Manual Agent"): | |
| agent_name_input = gr.Textbox( | |
| label="Agent Name", | |
| placeholder="Enter agent name..." | |
| ) | |
| add_agent_btn = gr.Button("➕ Add Manual Agent") | |
| with gr.Tab("OFP Agent"): | |
| ofp_agent_url = gr.Textbox( | |
| label="Agent Service URL", | |
| placeholder="https://agent.example.com/ofp", | |
| interactive=False | |
| ) | |
| add_ofp_agent_btn = gr.Button("🌐 Add OFP Agent") | |
| add_agent_status = gr.Textbox( | |
| label="Status", | |
| interactive=False | |
| ) | |
| gr.Markdown("---") | |
| gr.Markdown("### Connected Agents") | |
| agent_table = gr.Dataframe( | |
| headers=["Name", "ID", "Status"], | |
| datatype=["str", "str", "str"], | |
| label="Agents", | |
| interactive=False | |
| ) | |
| gr.Markdown("---") | |
| gr.Markdown("### Floor Control") | |
| agent_selector = gr.Dropdown( | |
| label="Select Agent", | |
| choices=[], | |
| interactive=True, | |
| allow_custom_value=False | |
| ) | |
| with gr.Row(): | |
| grant_btn = gr.Button("🎤 Grant Floor", variant="primary") | |
| revoke_btn = gr.Button("⏸️ Revoke Floor", variant="stop") | |
| # Right Panel: Conversation | |
| with gr.Column(scale=2): | |
| gr.Markdown("### Floor Conversation") | |
| floor_status = gr.Textbox( | |
| label="Current Floor Status", | |
| interactive=False, | |
| value="⏸️ No active speaker" | |
| ) | |
| chatbot = gr.Chatbot( | |
| label="Conversation", | |
| height=400 | |
| ) | |
| html_output = gr.HTML( | |
| label="Agent HTML Response", | |
| value="<p style='color: #666;'>No HTML content yet. Agents can send rich HTML responses that will be displayed here.</p>" | |
| ) | |
| gr.Markdown("### Send Message") | |
| with gr.Row(): | |
| message_agent = gr.Dropdown( | |
| label="Speaking As", | |
| choices=[], | |
| scale=2, | |
| interactive=True, | |
| allow_custom_value=False | |
| ) | |
| message_input = gr.Textbox( | |
| label="Message", | |
| placeholder="Type your message...", | |
| scale=4, | |
| interactive=True | |
| ) | |
| send_btn = gr.Button("📤 Send", scale=1, variant="primary", interactive=False) | |
| # Event Handlers | |
| create_btn.click( | |
| fn=create_new_session, | |
| outputs=[session_id, chatbot, floor_status, agent_table, send_btn, ofp_agent_url] | |
| ) | |
| add_agent_btn.click( | |
| fn=add_agent, | |
| inputs=[agent_name_input], | |
| outputs=[add_agent_status, agent_name_input, chatbot, floor_status, agent_table, agent_selector] | |
| ).then( | |
| fn=lambda: gr.Dropdown(choices=get_agent_dropdown_choices()), | |
| outputs=[message_agent] | |
| ) | |
| add_ofp_agent_btn.click( | |
| fn=add_ofp_agent, | |
| inputs=[ofp_agent_url], | |
| outputs=[add_agent_status, ofp_agent_url, chatbot, floor_status, agent_table, agent_selector] | |
| ).then( | |
| fn=lambda: gr.Dropdown(choices=get_agent_dropdown_choices()), | |
| outputs=[message_agent] | |
| ) | |
| send_btn.click( | |
| fn=send_message, | |
| inputs=[message_agent, message_input], | |
| outputs=[chatbot, message_input, floor_status, html_output] | |
| ) | |
| grant_btn.click( | |
| fn=grant_floor_to_agent, | |
| inputs=[agent_selector], | |
| outputs=[chatbot, floor_status, agent_table] | |
| ) | |
| revoke_btn.click( | |
| fn=revoke_floor_action, | |
| outputs=[chatbot, floor_status, agent_table] | |
| ) | |
| if __name__ == "__main__": | |
| print("🚀 Starting OFP Floor Manager Test Interface...") | |
| print("📱 Open your browser to test the interface") | |
| print("=" * 50) | |
| demo.launch(server_name="0.0.0.0", server_port=7860, share=False, theme=gr.themes.Soft()) | |