#!/usr/bin/env python3 """ Hugging Face Spaces compatible version of the Multi-Agent System Dashboard """ import os import sys import tempfile import sqlite3 from pathlib import Path # Set environment variables for Hugging Face Spaces os.environ['STREAMLIT_SERVER_HEADLESS'] = 'true' os.environ['STREAMLIT_SERVER_PORT'] = '7860' os.environ['STREAMLIT_BROWSER_GATHER_USAGE_STATS'] = 'false' # Create a writable directory for Streamlit streamlit_dir = Path(tempfile.gettempdir()) / '.streamlit' streamlit_dir.mkdir(exist_ok=True) os.environ['STREAMLIT_CONFIG_DIR'] = str(streamlit_dir) # Now import streamlit and other modules import streamlit as st import pandas as pd import plotly.express as px import plotly.graph_objects as go from datetime import datetime, timedelta import json import random import numpy as np from typing import Dict, List, Any # Set page config first st.set_page_config( page_title="🤖 Multi-Agent System Dashboard", page_icon="🤖", layout="wide", initial_sidebar_state="expanded" ) class HuggingFaceDashboard: def __init__(self): # Use temp directory for database in Hugging Face Spaces temp_dir = tempfile.gettempdir() self.db_path = os.path.join(temp_dir, "evaluation_logs.db") try: self.setup_demo_data() except Exception as e: st.error(f"Error setting up demo data: {str(e)}") # Create empty data structures as fallback self.create_empty_data() def create_empty_data(self): """Create empty data structures if database setup fails""" self.sample_data = { 'agent_performance': pd.DataFrame({ 'agent_name': ['Demo Agent'], 'task_type': ['demo'], 'success_rate': [0.0], 'avg_response_time': [0.0], 'timestamp': [datetime.now()] }), 'evaluations': pd.DataFrame({ 'test_name': ['Demo Test'], 'agent': ['Demo Agent'], 'score': [0.0], 'metric_type': ['demo'], 'timestamp': [datetime.now()] }), 'system_metrics': pd.DataFrame({ 'timestamp': [datetime.now()], 'cpu_usage': [0.0], 'memory_usage': [0.0], 'active_agents': [0] }) } def setup_demo_data(self): """Initialize demo data for the dashboard""" try: self.create_demo_database() self.sample_data = self.load_sample_data() except Exception as e: st.warning(f"Using fallback data due to: {str(e)}") self.create_empty_data() def create_demo_database(self): """Create and populate demo database""" try: # Ensure directory exists os.makedirs(os.path.dirname(self.db_path), exist_ok=True) conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Create tables cursor.execute(''' CREATE TABLE IF NOT EXISTS agent_performance ( id INTEGER PRIMARY KEY, agent_name TEXT, task_type TEXT, success_rate REAL, avg_response_time REAL, timestamp DATETIME ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS evaluations ( id INTEGER PRIMARY KEY, test_name TEXT, agent TEXT, score REAL, metric_type TEXT, timestamp DATETIME ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS system_metrics ( id INTEGER PRIMARY KEY, timestamp DATETIME, cpu_usage REAL, memory_usage REAL, active_agents INTEGER ) ''') # Check if data already exists cursor.execute("SELECT COUNT(*) FROM agent_performance") if cursor.fetchone()[0] == 0: self.populate_demo_data(cursor) conn.commit() conn.close() except Exception as e: st.error(f"Database error: {str(e)}") raise def populate_demo_data(self, cursor): """Populate database with demo data""" # Agent performance data agents = ['Research Agent', 'Analysis Agent', 'Writing Agent', 'Review Agent'] tasks = ['research', 'analysis', 'writing', 'review'] for _ in range(50): agent = random.choice(agents) task = random.choice(tasks) success_rate = random.uniform(0.7, 0.98) response_time = random.uniform(0.5, 3.0) timestamp = datetime.now() - timedelta(days=random.randint(0, 30)) cursor.execute(''' INSERT INTO agent_performance (agent_name, task_type, success_rate, avg_response_time, timestamp) VALUES (?, ?, ?, ?, ?) ''', (agent, task, success_rate, response_time, timestamp)) # Evaluation data test_names = ['Accuracy Test', 'Speed Test', 'Quality Test', 'Consistency Test'] metrics = ['accuracy', 'speed', 'quality', 'consistency'] for _ in range(100): test = random.choice(test_names) agent = random.choice(agents) score = random.uniform(0.6, 0.95) metric = random.choice(metrics) timestamp = datetime.now() - timedelta(days=random.randint(0, 30)) cursor.execute(''' INSERT INTO evaluations (test_name, agent, score, metric_type, timestamp) VALUES (?, ?, ?, ?, ?) ''', (test, agent, score, metric, timestamp)) # System metrics data for i in range(100): timestamp = datetime.now() - timedelta(hours=i) cpu_usage = random.uniform(20, 80) memory_usage = random.uniform(30, 90) active_agents = random.randint(1, 4) cursor.execute(''' INSERT INTO system_metrics (timestamp, cpu_usage, memory_usage, active_agents) VALUES (?, ?, ?, ?) ''', (timestamp, cpu_usage, memory_usage, active_agents)) def load_sample_data(self): """Load data from database""" try: conn = sqlite3.connect(self.db_path) agent_performance = pd.read_sql_query( "SELECT * FROM agent_performance ORDER BY timestamp DESC", conn ) evaluations = pd.read_sql_query( "SELECT * FROM evaluations ORDER BY timestamp DESC", conn ) system_metrics = pd.read_sql_query( "SELECT * FROM system_metrics ORDER BY timestamp DESC", conn ) conn.close() return { 'agent_performance': agent_performance, 'evaluations': evaluations, 'system_metrics': system_metrics } except Exception as e: st.error(f"Error loading data: {str(e)}") return self.create_empty_data() def render_overview_tab(self): """Render the overview tab""" st.header("🎯 System Overview") # Key metrics col1, col2, col3, col4 = st.columns(4) try: avg_success_rate = self.sample_data['agent_performance']['success_rate'].mean() total_evaluations = len(self.sample_data['evaluations']) active_agents = self.sample_data['system_metrics']['active_agents'].iloc[0] if len(self.sample_data['system_metrics']) > 0 else 0 avg_response_time = self.sample_data['agent_performance']['avg_response_time'].mean() except Exception: avg_success_rate = 0.0 total_evaluations = 0 active_agents = 0 avg_response_time = 0.0 with col1: st.metric( "Average Success Rate", f"{avg_success_rate:.1%}", delta="2.3%" if avg_success_rate > 0 else None ) with col2: st.metric( "Total Evaluations", f"{total_evaluations:,}", delta="12" if total_evaluations > 0 else None ) with col3: st.metric( "Active Agents", f"{active_agents}", delta="1" if active_agents > 0 else None ) with col4: st.metric( "Avg Response Time", f"{avg_response_time:.2f}s", delta="-0.1s" if avg_response_time > 0 else None ) st.divider() # Performance trends col1, col2 = st.columns(2) with col1: st.subheader("📈 Success Rate Trends") if len(self.sample_data['agent_performance']) > 0: fig = px.line( self.sample_data['agent_performance'].head(20), x='timestamp', y='success_rate', color='agent_name', title="Agent Success Rates Over Time" ) fig.update_layout(height=400) st.plotly_chart(fig, use_container_width=True) else: st.info("No performance data available") with col2: st.subheader("⚡ Response Time Distribution") if len(self.sample_data['agent_performance']) > 0: fig = px.histogram( self.sample_data['agent_performance'], x='avg_response_time', nbins=20, title="Response Time Distribution" ) fig.update_layout(height=400) st.plotly_chart(fig, use_container_width=True) else: st.info("No response time data available") def render_agents_tab(self): """Render the agents tab""" st.header("🤖 Agent Performance") if len(self.sample_data['agent_performance']) == 0: st.warning("No agent performance data available") return # Agent selector agents = self.sample_data['agent_performance']['agent_name'].unique() selected_agent = st.selectbox("Select Agent", ["All Agents"] + list(agents)) # Filter data if selected_agent != "All Agents": filtered_data = self.sample_data['agent_performance'][ self.sample_data['agent_performance']['agent_name'] == selected_agent ] else: filtered_data = self.sample_data['agent_performance'] # Performance metrics col1, col2 = st.columns(2) with col1: st.subheader("🎯 Success Rate by Agent") agent_success = filtered_data.groupby('agent_name')['success_rate'].mean().reset_index() fig = px.bar( agent_success, x='agent_name', y='success_rate', title="Average Success Rate by Agent" ) fig.update_layout(height=400) st.plotly_chart(fig, use_container_width=True) with col2: st.subheader("⏱️ Response Time by Task Type") task_response = filtered_data.groupby('task_type')['avg_response_time'].mean().reset_index() fig = px.bar( task_response, x='task_type', y='avg_response_time', title="Average Response Time by Task Type" ) fig.update_layout(height=400) st.plotly_chart(fig, use_container_width=True) # Detailed performance table st.subheader("📊 Detailed Performance Data") st.dataframe( filtered_data.sort_values('timestamp', ascending=False), use_container_width=True ) def render_evaluations_tab(self): """Render the evaluations tab""" st.header("📋 Evaluation Results") if len(self.sample_data['evaluations']) == 0: st.warning("No evaluation data available") return # Metric type selector metrics = self.sample_data['evaluations']['metric_type'].unique() selected_metric = st.selectbox("Select Metric Type", ["All Metrics"] + list(metrics)) # Filter data if selected_metric != "All Metrics": filtered_evals = self.sample_data['evaluations'][ self.sample_data['evaluations']['metric_type'] == selected_metric ] else: filtered_evals = self.sample_data['evaluations'] # Evaluation charts col1, col2 = st.columns(2) with col1: st.subheader("📊 Score Distribution") fig = px.histogram( filtered_evals, x='score', nbins=20, title="Evaluation Score Distribution" ) fig.update_layout(height=400) st.plotly_chart(fig, use_container_width=True) with col2: st.subheader("🏆 Agent Comparison") agent_scores = filtered_evals.groupby('agent')['score'].mean().reset_index() fig = px.bar( agent_scores, x='agent', y='score', title="Average Scores by Agent" ) fig.update_layout(height=400) st.plotly_chart(fig, use_container_width=True) # Score trends over time st.subheader("📈 Score Trends") fig = px.line( filtered_evals.head(50), x='timestamp', y='score', color='agent', title="Evaluation Scores Over Time" ) fig.update_layout(height=400) st.plotly_chart(fig, use_container_width=True) # Detailed evaluation table st.subheader("📋 Detailed Evaluation Results") st.dataframe( filtered_evals.sort_values('timestamp', ascending=False), use_container_width=True ) def render_system_tab(self): """Render the system metrics tab""" st.header("💻 System Metrics") if len(self.sample_data['system_metrics']) == 0: st.warning("No system metrics data available") return # Current system status latest_metrics = self.sample_data['system_metrics'].iloc[0] col1, col2, col3 = st.columns(3) with col1: st.metric( "CPU Usage", f"{latest_metrics['cpu_usage']:.1f}%", delta=f"{random.uniform(-5, 5):.1f}%" ) with col2: st.metric( "Memory Usage", f"{latest_metrics['memory_usage']:.1f}%", delta=f"{random.uniform(-3, 3):.1f}%" ) with col3: st.metric( "Active Agents", f"{latest_metrics['active_agents']}", delta=random.choice([-1, 0, 1]) ) st.divider() # System metrics over time col1, col2 = st.columns(2) with col1: st.subheader("💾 Resource Usage") fig = go.Figure() fig.add_trace(go.Scatter( x=self.sample_data['system_metrics']['timestamp'], y=self.sample_data['system_metrics']['cpu_usage'], mode='lines', name='CPU Usage', line=dict(color='#FF6B6B') )) fig.add_trace(go.Scatter( x=self.sample_data['system_metrics']['timestamp'], y=self.sample_data['system_metrics']['memory_usage'], mode='lines', name='Memory Usage', line=dict(color='#4ECDC4') )) fig.update_layout( title="System Resource Usage Over Time", xaxis_title="Time", yaxis_title="Usage (%)", height=400 ) st.plotly_chart(fig, use_container_width=True) with col2: st.subheader("🤖 Agent Activity") fig = px.line( self.sample_data['system_metrics'], x='timestamp', y='active_agents', title="Active Agents Over Time" ) fig.update_layout(height=400) st.plotly_chart(fig, use_container_width=True) def render_insights_tab(self): """Render the insights and recommendations tab""" st.header("💡 Insights & Recommendations") # Performance insights st.subheader("🎯 Performance Insights") try: # Calculate insights best_agent = self.sample_data['agent_performance'].groupby('agent_name')['success_rate'].mean().idxmax() worst_task = self.sample_data['agent_performance'].groupby('task_type')['success_rate'].mean().idxmin() avg_response_time = self.sample_data['agent_performance']['avg_response_time'].mean() col1, col2 = st.columns(2) with col1: st.success(f"🏆 **Best Performing Agent**: {best_agent}") st.info(f"⚡ **Average Response Time**: {avg_response_time:.2f}s") with col2: st.warning(f"⚠️ **Task Needing Improvement**: {worst_task}") st.info(f"📊 **Total Evaluations**: {len(self.sample_data['evaluations'])}") except Exception: st.info("Insufficient data for insights generation") st.divider() # Recommendations st.subheader("🚀 Recommendations") recommendations = [ "🔧 **Optimize Response Time**: Consider implementing caching for frequently requested tasks", "📈 **Scale High-Performing Agents**: Increase resources for agents with >90% success rates", "🎯 **Focus on Weak Areas**: Provide additional training data for underperforming task types", "⚡ **Monitor System Resources**: Set up alerts for CPU/Memory usage above 80%", "🔄 **Regular Evaluations**: Schedule automated evaluations every 24 hours", "📊 **Data Quality**: Implement data validation checks for better evaluation accuracy" ] for rec in recommendations: st.markdown(f"- {rec}") st.divider() # Export options st.subheader("📤 Export Options") col1, col2, col3 = st.columns(3) with col1: if st.button("📊 Export Performance Data"): csv = self.sample_data['agent_performance'].to_csv(index=False) st.download_button( label="Download CSV", data=csv, file_name="agent_performance.csv", mime="text/csv" ) with col2: if st.button("📋 Export Evaluations"): csv = self.sample_data['evaluations'].to_csv(index=False) st.download_button( label="Download CSV", data=csv, file_name="evaluations.csv", mime="text/csv" ) with col3: if st.button("💻 Export System Metrics"): csv = self.sample_data['system_metrics'].to_csv(index=False) st.download_button( label="Download CSV", data=csv, file_name="system_metrics.csv", mime="text/csv" ) def run(self): """Main dashboard application""" # Sidebar st.sidebar.title("🤖 Multi-Agent Dashboard") st.sidebar.markdown("---") # Navigation tab_names = ["🎯 Overview", "🤖 Agents", "📋 Evaluations", "💻 System", "💡 Insights"] selected_tab = st.sidebar.radio("Navigate to:", tab_names) st.sidebar.markdown("---") st.sidebar.markdown("### 📊 Quick Stats") try: total_agents = len(self.sample_data['agent_performance']['agent_name'].unique()) total_tests = len(self.sample_data['evaluations']) avg_score = self.sample_data['evaluations']['score'].mean() st.sidebar.metric("Total Agents", total_agents) st.sidebar.metric("Total Tests", total_tests) st.sidebar.metric("Avg Score", f"{avg_score:.2f}") except Exception: st.sidebar.info("Loading stats...") # Main content if selected_tab == "🎯 Overview": self.render_overview_tab() elif selected_tab == "🤖 Agents": self.render_agents_tab() elif selected_tab == "📋 Evaluations": self.render_evaluations_tab() elif selected_tab == "💻 System": self.render_system_tab() elif selected_tab == "💡 Insights": self.render_insights_tab() # Footer st.sidebar.markdown("---") st.sidebar.markdown( "🚀 **Multi-Agent System Dashboard**\n\n" "Monitor and evaluate your AI agents in real-time." ) # Initialize and run the dashboard if __name__ == "__main__": try: dashboard = HuggingFaceDashboard() dashboard.run() except Exception as e: st.error(f"Application Error: {str(e)}") st.info("Please refresh the page or contact support if the issue persists.")