FloorManager / test_app.py
BolyosCsaba
html render
aa54aeb
"""
Simple Gradio Test Interface for OFP Floor Manager
This is a minimal working version for testing the UI
"""
import gradio as gr
from datetime import datetime
from typing import List, Tuple, Optional
import uuid
import asyncio
from src.floor_manager import FloorManager, FloorSession
from src.agent_manager import AgentManager, AgentInfo
# Simple mock classes for testing
class MockAgent:
def __init__(self, agent_id: str, name: str):
self.agent_id = agent_id
self.name = name
self.has_floor = False
self.joined_at = datetime.now()
def to_display(self):
status = "🎤 Has Floor" if self.has_floor else "⏸️ Listening"
return [self.name, self.agent_id[:8], status]
class MockFloorSession:
def __init__(self, session_id: str):
self.session_id = session_id
self.agents = {}
self.floor_holder = None
self.messages = []
self.created_at = datetime.now()
def add_agent(self, name: str) -> str:
agent_id = f"agent_{uuid.uuid4().hex[:8]}"
agent = MockAgent(agent_id, name)
self.agents[agent_id] = agent
# Add system message
self.messages.append({
"role": "system",
"content": f"🔔 {name} joined the session",
"timestamp": datetime.now().strftime("%H:%M:%S")
})
# Auto-grant floor if first agent
if len(self.agents) == 1:
self.grant_floor(agent_id)
return agent_id
def grant_floor(self, agent_id: str):
# Revoke from current holder
if self.floor_holder and self.floor_holder in self.agents:
self.agents[self.floor_holder].has_floor = False
# Grant to new holder
if agent_id in self.agents:
self.agents[agent_id].has_floor = True
self.floor_holder = agent_id
self.messages.append({
"role": "system",
"content": f"🎤 Floor granted to {self.agents[agent_id].name}",
"timestamp": datetime.now().strftime("%H:%M:%S")
})
def revoke_floor(self):
if self.floor_holder and self.floor_holder in self.agents:
agent_name = self.agents[self.floor_holder].name
self.agents[self.floor_holder].has_floor = False
self.floor_holder = None
self.messages.append({
"role": "system",
"content": f"⏸️ Floor revoked from {agent_name}",
"timestamp": datetime.now().strftime("%H:%M:%S")
})
def add_message(self, agent_id: str, content: str):
if agent_id in self.agents:
agent = self.agents[agent_id]
self.messages.append({
"role": agent.name,
"content": content,
"timestamp": datetime.now().strftime("%H:%M:%S")
})
def get_agent_list(self):
return [agent.to_display() for agent in self.agents.values()]
def get_floor_status(self):
if self.floor_holder and self.floor_holder in self.agents:
agent = self.agents[self.floor_holder]
return f"🎤 Current Speaker: {agent.name} ({agent.agent_id[:8]})"
return "⏸️ No active speaker"
def format_chat_history(self):
"""Format messages for Gradio Chatbot with messages format"""
formatted = []
for msg in self.messages:
if msg["role"] == "system":
# System messages as assistant role
formatted.append({
"role": "assistant",
"content": f"_{msg['content']}_"
})
else:
# Agent messages as user role with name prefix
formatted.append({
"role": "user",
"content": f"**{msg['role']}** [{msg['timestamp']}]: {msg['content']}"
})
return formatted
# Global session (in production, this would be managed differently)
current_session: Optional[MockFloorSession] = None
# Agent Manager for OFP agent interactions
agent_manager: Optional[AgentManager] = None
# Floor Manager for handling OFP envelope broadcasting
floor_manager: FloorManager = FloorManager()
floor_session: Optional[FloorSession] = None
def create_new_session():
"""Create a new floor session"""
global current_session, agent_manager, floor_session
session_id = f"session_{uuid.uuid4().hex[:8]}"
current_session = MockFloorSession(session_id)
# Initialize agent manager and floor session
floor_manager_uri = f"tag:floormanager.local,2025:{session_id}"
floor_manager_url = "http://localhost:7860/ofp"
agent_manager = AgentManager(floor_manager_uri, floor_manager_url)
# Create OFP floor session for broadcasting
floor_session = floor_manager.create_session(
session_id=session_id,
floor_manager_uri=floor_manager_uri,
floor_manager_url=floor_manager_url
)
print(f"✅ Created floor session: {session_id}")
return (
session_id,
current_session.format_chat_history(),
current_session.get_floor_status(),
current_session.get_agent_list(),
gr.update(interactive=True),
gr.update(interactive=True) # Enable OFP agent URL input
)
def add_agent(agent_name: str):
"""Add an agent to the session"""
if not current_session:
return "❌ Create a session first!", "", [], "No session", [], gr.Dropdown(choices=[])
if not agent_name.strip():
return "❌ Agent name cannot be empty!", agent_name, current_session.format_chat_history(), current_session.get_floor_status(), current_session.get_agent_list(), gr.Dropdown(choices=[])
agent_id = current_session.add_agent(agent_name.strip())
agent_choices = get_agent_dropdown_choices()
return (
f"✅ Added agent: {agent_name}",
"",
current_session.format_chat_history(),
current_session.get_floor_status(),
current_session.get_agent_list(),
gr.Dropdown(choices=agent_choices)
)
def add_ofp_agent(agent_url: str):
"""Add an OFP agent by URL"""
global floor_session
if not current_session or not agent_manager or not floor_session:
return "❌ Create a session first!", agent_url, current_session.format_chat_history() if current_session else [], current_session.get_floor_status() if current_session else "No session", current_session.get_agent_list() if current_session else [], gr.Dropdown(choices=[])
if not agent_url.strip():
return "❌ Agent URL cannot be empty!", agent_url, current_session.format_chat_history(), current_session.get_floor_status(), current_session.get_agent_list(), gr.Dropdown(choices=[])
try:
# Run async function in event loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
agent_info = loop.run_until_complete(
agent_manager.add_agent(agent_url.strip(), current_session.session_id)
)
loop.close()
if not agent_info:
return f"❌ Failed to add OFP agent from {agent_url}", agent_url, current_session.format_chat_history(), current_session.get_floor_status(), current_session.get_agent_list(), gr.Dropdown(choices=[])
# Add agent to floor session for envelope broadcasting
floor_session.add_participant(agent_info)
print(f"✅ Added {agent_info.speaker_uri} to floor session participants")
# Add agent to mock session for UI
agent_name = agent_info.manifest.get("conversationalName", agent_info.speaker_uri.split(":")[-1])
current_session.add_agent(agent_name)
agent_choices = get_agent_dropdown_choices()
return (
f"✅ Added OFP agent: {agent_name} ({agent_info.speaker_uri})",
"",
current_session.format_chat_history(),
current_session.get_floor_status(),
current_session.get_agent_list(),
gr.Dropdown(choices=agent_choices)
)
except Exception as e:
return f"❌ Error adding OFP agent: {str(e)}", agent_url, current_session.format_chat_history(), current_session.get_floor_status(), current_session.get_agent_list(), gr.Dropdown(choices=[])
def send_message(agent_name: str, message: str):
"""Send a message from an agent and broadcast to OFP participants"""
global floor_session
if not current_session:
return "❌ Create a session first!", [], "No session"
if not message.strip():
return current_session.format_chat_history(), "", current_session.get_floor_status()
# Find agent by name
agent_id = None
for aid, agent in current_session.agents.items():
if agent.name == agent_name:
agent_id = aid
break
if not agent_id:
return current_session.format_chat_history(), message, current_session.get_floor_status()
# Add message to UI
current_session.add_message(agent_id, message.strip())
# Create and broadcast OFP envelope to all participants
if floor_session:
try:
from src.utils.config import settings
# Create sender speaker URI for the user
sender_uri = f"tag:user.floormanager.local,2025:{agent_name}"
# Create OFP envelope with utterance event
from datetime import datetime, timezone
envelope_dict = {
"openFloor": {
"schema": {
"version": settings.OFP_VERSION
},
"conversation": {
"id": current_session.session_id
},
"sender": {
"speakerUri": sender_uri,
"serviceUrl": floor_session.floor_manager_url
},
"events": [
{
"eventType": "utterance",
"parameters": {
"dialogEvent": {
"id": f"de:{uuid.uuid4()}",
"speakerUri": sender_uri,
"span": {
"startTime": datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z')
},
"features": {
"text": {
"mimeType": "text/plain",
"tokens": [{"value": message.strip(), "confidence": 1.0}]
}
}
}
}
}
]
}
}
# Broadcast to all OFP agents
print(f"📤 Broadcasting message to {len(floor_session.participants)} OFP agent(s)")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
results = loop.run_until_complete(
floor_manager.broadcast_envelope(
session_id=current_session.session_id,
envelope_dict=envelope_dict,
sender_uri=sender_uri
)
)
loop.close()
print(f"✅ Broadcast complete: {sum(1 for r in results.values() if r)}/{len(results)} successful")
# Check for agent responses
agent_responses = floor_manager.get_agent_responses(current_session.session_id)
html_content = None
if agent_responses:
print(f"📥 Received {len(agent_responses)} agent response(s)")
for response in agent_responses:
# Add agent response to UI
agent_name = response["agent_name"]
agent_msg = response["message"]
print(f" 💬 {agent_name}: {agent_msg}")
# Add to mock session as a system message
current_session.messages.append({
"role": agent_name,
"content": agent_msg,
"timestamp": response["timestamp"].strftime("%H:%M:%S")
})
# Check for HTML content
if response.get("html_content"):
html_content = response["html_content"]
print(f" 🌐 {agent_name} sent HTML content ({len(html_content)} chars)")
# Clear responses after displaying
floor_manager.clear_agent_responses(current_session.session_id)
except Exception as e:
print(f"❌ Error broadcasting message: {e}")
import traceback
traceback.print_exc()
# Display HTML if available, otherwise show default
html_display = html_content if html_content else "<p style='color: #666;'>No HTML content yet. Agents can send rich HTML responses that will be displayed here.</p>"
return (
current_session.format_chat_history(),
"",
current_session.get_floor_status(),
html_display
)
def grant_floor_to_agent(agent_name: str):
"""Grant floor to selected agent"""
if not current_session or not agent_name:
return [], "No session", []
# Find agent by name
for aid, agent in current_session.agents.items():
if agent.name == agent_name:
current_session.grant_floor(aid)
break
return (
current_session.format_chat_history(),
current_session.get_floor_status(),
current_session.get_agent_list()
)
def revoke_floor_action():
"""Revoke floor from current holder"""
if not current_session:
return [], "No session", []
current_session.revoke_floor()
return (
current_session.format_chat_history(),
current_session.get_floor_status(),
current_session.get_agent_list()
)
def get_agent_dropdown_choices():
"""Get list of agent names for dropdown"""
if not current_session:
return []
return [agent.name for agent in current_session.agents.values()]
# Create Gradio interface
with gr.Blocks(title="OFP Floor Manager Test") as demo:
gr.Markdown("# 🎤 OpenFloor Protocol - Floor Manager Test")
gr.Markdown("Test interface for managing multi-agent floor conversations")
with gr.Row():
# Left Panel: Session Control
with gr.Column(scale=1):
gr.Markdown("### Session Management")
session_id = gr.Textbox(
label="Session ID",
interactive=False,
placeholder="No active session"
)
create_btn = gr.Button("🆕 Create New Session", variant="primary")
gr.Markdown("---")
gr.Markdown("### Agent Management")
with gr.Tabs():
with gr.Tab("Manual Agent"):
agent_name_input = gr.Textbox(
label="Agent Name",
placeholder="Enter agent name..."
)
add_agent_btn = gr.Button("➕ Add Manual Agent")
with gr.Tab("OFP Agent"):
ofp_agent_url = gr.Textbox(
label="Agent Service URL",
placeholder="https://agent.example.com/ofp",
interactive=False
)
add_ofp_agent_btn = gr.Button("🌐 Add OFP Agent")
add_agent_status = gr.Textbox(
label="Status",
interactive=False
)
gr.Markdown("---")
gr.Markdown("### Connected Agents")
agent_table = gr.Dataframe(
headers=["Name", "ID", "Status"],
datatype=["str", "str", "str"],
label="Agents",
interactive=False
)
gr.Markdown("---")
gr.Markdown("### Floor Control")
agent_selector = gr.Dropdown(
label="Select Agent",
choices=[],
interactive=True,
allow_custom_value=False
)
with gr.Row():
grant_btn = gr.Button("🎤 Grant Floor", variant="primary")
revoke_btn = gr.Button("⏸️ Revoke Floor", variant="stop")
# Right Panel: Conversation
with gr.Column(scale=2):
gr.Markdown("### Floor Conversation")
floor_status = gr.Textbox(
label="Current Floor Status",
interactive=False,
value="⏸️ No active speaker"
)
chatbot = gr.Chatbot(
label="Conversation",
height=400
)
html_output = gr.HTML(
label="Agent HTML Response",
value="<p style='color: #666;'>No HTML content yet. Agents can send rich HTML responses that will be displayed here.</p>"
)
gr.Markdown("### Send Message")
with gr.Row():
message_agent = gr.Dropdown(
label="Speaking As",
choices=[],
scale=2,
interactive=True,
allow_custom_value=False
)
message_input = gr.Textbox(
label="Message",
placeholder="Type your message...",
scale=4,
interactive=True
)
send_btn = gr.Button("📤 Send", scale=1, variant="primary", interactive=False)
# Event Handlers
create_btn.click(
fn=create_new_session,
outputs=[session_id, chatbot, floor_status, agent_table, send_btn, ofp_agent_url]
)
add_agent_btn.click(
fn=add_agent,
inputs=[agent_name_input],
outputs=[add_agent_status, agent_name_input, chatbot, floor_status, agent_table, agent_selector]
).then(
fn=lambda: gr.Dropdown(choices=get_agent_dropdown_choices()),
outputs=[message_agent]
)
add_ofp_agent_btn.click(
fn=add_ofp_agent,
inputs=[ofp_agent_url],
outputs=[add_agent_status, ofp_agent_url, chatbot, floor_status, agent_table, agent_selector]
).then(
fn=lambda: gr.Dropdown(choices=get_agent_dropdown_choices()),
outputs=[message_agent]
)
send_btn.click(
fn=send_message,
inputs=[message_agent, message_input],
outputs=[chatbot, message_input, floor_status, html_output]
)
grant_btn.click(
fn=grant_floor_to_agent,
inputs=[agent_selector],
outputs=[chatbot, floor_status, agent_table]
)
revoke_btn.click(
fn=revoke_floor_action,
outputs=[chatbot, floor_status, agent_table]
)
if __name__ == "__main__":
print("🚀 Starting OFP Floor Manager Test Interface...")
print("📱 Open your browser to test the interface")
print("=" * 50)
demo.launch(server_name="0.0.0.0", server_port=7860, share=False, theme=gr.themes.Soft())