multiagent / src /app_hf.py
saptyfun's picture
Upload 3 files
38e0063 verified
#!/usr/bin/env python3
"""
Hugging Face Spaces compatible version of the Multi-Agent System Dashboard
"""
import os
import sys
import tempfile
import sqlite3
from pathlib import Path
# Set environment variables for Hugging Face Spaces
os.environ['STREAMLIT_SERVER_HEADLESS'] = 'true'
os.environ['STREAMLIT_SERVER_PORT'] = '7860'
os.environ['STREAMLIT_BROWSER_GATHER_USAGE_STATS'] = 'false'
# Create a writable directory for Streamlit
streamlit_dir = Path(tempfile.gettempdir()) / '.streamlit'
streamlit_dir.mkdir(exist_ok=True)
os.environ['STREAMLIT_CONFIG_DIR'] = str(streamlit_dir)
# Now import streamlit and other modules
import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from datetime import datetime, timedelta
import json
import random
import numpy as np
from typing import Dict, List, Any
# Set page config first
st.set_page_config(
page_title="πŸ€– Multi-Agent System Dashboard",
page_icon="πŸ€–",
layout="wide",
initial_sidebar_state="expanded"
)
class HuggingFaceDashboard:
def __init__(self):
# Use temp directory for database in Hugging Face Spaces
temp_dir = tempfile.gettempdir()
self.db_path = os.path.join(temp_dir, "evaluation_logs.db")
try:
self.setup_demo_data()
except Exception as e:
st.error(f"Error setting up demo data: {str(e)}")
# Create empty data structures as fallback
self.create_empty_data()
def create_empty_data(self):
"""Create empty data structures if database setup fails"""
self.sample_data = {
'agent_performance': pd.DataFrame({
'agent_name': ['Demo Agent'],
'task_type': ['demo'],
'success_rate': [0.0],
'avg_response_time': [0.0],
'timestamp': [datetime.now()]
}),
'evaluations': pd.DataFrame({
'test_name': ['Demo Test'],
'agent': ['Demo Agent'],
'score': [0.0],
'metric_type': ['demo'],
'timestamp': [datetime.now()]
}),
'system_metrics': pd.DataFrame({
'timestamp': [datetime.now()],
'cpu_usage': [0.0],
'memory_usage': [0.0],
'active_agents': [0]
})
}
def setup_demo_data(self):
"""Initialize demo data for the dashboard"""
try:
self.create_demo_database()
self.sample_data = self.load_sample_data()
except Exception as e:
st.warning(f"Using fallback data due to: {str(e)}")
self.create_empty_data()
def create_demo_database(self):
"""Create and populate demo database"""
try:
# Ensure directory exists
os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Create tables
cursor.execute('''
CREATE TABLE IF NOT EXISTS agent_performance (
id INTEGER PRIMARY KEY,
agent_name TEXT,
task_type TEXT,
success_rate REAL,
avg_response_time REAL,
timestamp DATETIME
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS evaluations (
id INTEGER PRIMARY KEY,
test_name TEXT,
agent TEXT,
score REAL,
metric_type TEXT,
timestamp DATETIME
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS system_metrics (
id INTEGER PRIMARY KEY,
timestamp DATETIME,
cpu_usage REAL,
memory_usage REAL,
active_agents INTEGER
)
''')
# Check if data already exists
cursor.execute("SELECT COUNT(*) FROM agent_performance")
if cursor.fetchone()[0] == 0:
self.populate_demo_data(cursor)
conn.commit()
conn.close()
except Exception as e:
st.error(f"Database error: {str(e)}")
raise
def populate_demo_data(self, cursor):
"""Populate database with demo data"""
# Agent performance data
agents = ['Research Agent', 'Analysis Agent', 'Writing Agent', 'Review Agent']
tasks = ['research', 'analysis', 'writing', 'review']
for _ in range(50):
agent = random.choice(agents)
task = random.choice(tasks)
success_rate = random.uniform(0.7, 0.98)
response_time = random.uniform(0.5, 3.0)
timestamp = datetime.now() - timedelta(days=random.randint(0, 30))
cursor.execute('''
INSERT INTO agent_performance
(agent_name, task_type, success_rate, avg_response_time, timestamp)
VALUES (?, ?, ?, ?, ?)
''', (agent, task, success_rate, response_time, timestamp))
# Evaluation data
test_names = ['Accuracy Test', 'Speed Test', 'Quality Test', 'Consistency Test']
metrics = ['accuracy', 'speed', 'quality', 'consistency']
for _ in range(100):
test = random.choice(test_names)
agent = random.choice(agents)
score = random.uniform(0.6, 0.95)
metric = random.choice(metrics)
timestamp = datetime.now() - timedelta(days=random.randint(0, 30))
cursor.execute('''
INSERT INTO evaluations
(test_name, agent, score, metric_type, timestamp)
VALUES (?, ?, ?, ?, ?)
''', (test, agent, score, metric, timestamp))
# System metrics data
for i in range(100):
timestamp = datetime.now() - timedelta(hours=i)
cpu_usage = random.uniform(20, 80)
memory_usage = random.uniform(30, 90)
active_agents = random.randint(1, 4)
cursor.execute('''
INSERT INTO system_metrics
(timestamp, cpu_usage, memory_usage, active_agents)
VALUES (?, ?, ?, ?)
''', (timestamp, cpu_usage, memory_usage, active_agents))
def load_sample_data(self):
"""Load data from database"""
try:
conn = sqlite3.connect(self.db_path)
agent_performance = pd.read_sql_query(
"SELECT * FROM agent_performance ORDER BY timestamp DESC",
conn
)
evaluations = pd.read_sql_query(
"SELECT * FROM evaluations ORDER BY timestamp DESC",
conn
)
system_metrics = pd.read_sql_query(
"SELECT * FROM system_metrics ORDER BY timestamp DESC",
conn
)
conn.close()
return {
'agent_performance': agent_performance,
'evaluations': evaluations,
'system_metrics': system_metrics
}
except Exception as e:
st.error(f"Error loading data: {str(e)}")
return self.create_empty_data()
def render_overview_tab(self):
"""Render the overview tab"""
st.header("🎯 System Overview")
# Key metrics
col1, col2, col3, col4 = st.columns(4)
try:
avg_success_rate = self.sample_data['agent_performance']['success_rate'].mean()
total_evaluations = len(self.sample_data['evaluations'])
active_agents = self.sample_data['system_metrics']['active_agents'].iloc[0] if len(self.sample_data['system_metrics']) > 0 else 0
avg_response_time = self.sample_data['agent_performance']['avg_response_time'].mean()
except Exception:
avg_success_rate = 0.0
total_evaluations = 0
active_agents = 0
avg_response_time = 0.0
with col1:
st.metric(
"Average Success Rate",
f"{avg_success_rate:.1%}",
delta="2.3%" if avg_success_rate > 0 else None
)
with col2:
st.metric(
"Total Evaluations",
f"{total_evaluations:,}",
delta="12" if total_evaluations > 0 else None
)
with col3:
st.metric(
"Active Agents",
f"{active_agents}",
delta="1" if active_agents > 0 else None
)
with col4:
st.metric(
"Avg Response Time",
f"{avg_response_time:.2f}s",
delta="-0.1s" if avg_response_time > 0 else None
)
st.divider()
# Performance trends
col1, col2 = st.columns(2)
with col1:
st.subheader("πŸ“ˆ Success Rate Trends")
if len(self.sample_data['agent_performance']) > 0:
fig = px.line(
self.sample_data['agent_performance'].head(20),
x='timestamp',
y='success_rate',
color='agent_name',
title="Agent Success Rates Over Time"
)
fig.update_layout(height=400)
st.plotly_chart(fig, use_container_width=True)
else:
st.info("No performance data available")
with col2:
st.subheader("⚑ Response Time Distribution")
if len(self.sample_data['agent_performance']) > 0:
fig = px.histogram(
self.sample_data['agent_performance'],
x='avg_response_time',
nbins=20,
title="Response Time Distribution"
)
fig.update_layout(height=400)
st.plotly_chart(fig, use_container_width=True)
else:
st.info("No response time data available")
def render_agents_tab(self):
"""Render the agents tab"""
st.header("πŸ€– Agent Performance")
if len(self.sample_data['agent_performance']) == 0:
st.warning("No agent performance data available")
return
# Agent selector
agents = self.sample_data['agent_performance']['agent_name'].unique()
selected_agent = st.selectbox("Select Agent", ["All Agents"] + list(agents))
# Filter data
if selected_agent != "All Agents":
filtered_data = self.sample_data['agent_performance'][
self.sample_data['agent_performance']['agent_name'] == selected_agent
]
else:
filtered_data = self.sample_data['agent_performance']
# Performance metrics
col1, col2 = st.columns(2)
with col1:
st.subheader("🎯 Success Rate by Agent")
agent_success = filtered_data.groupby('agent_name')['success_rate'].mean().reset_index()
fig = px.bar(
agent_success,
x='agent_name',
y='success_rate',
title="Average Success Rate by Agent"
)
fig.update_layout(height=400)
st.plotly_chart(fig, use_container_width=True)
with col2:
st.subheader("⏱️ Response Time by Task Type")
task_response = filtered_data.groupby('task_type')['avg_response_time'].mean().reset_index()
fig = px.bar(
task_response,
x='task_type',
y='avg_response_time',
title="Average Response Time by Task Type"
)
fig.update_layout(height=400)
st.plotly_chart(fig, use_container_width=True)
# Detailed performance table
st.subheader("πŸ“Š Detailed Performance Data")
st.dataframe(
filtered_data.sort_values('timestamp', ascending=False),
use_container_width=True
)
def render_evaluations_tab(self):
"""Render the evaluations tab"""
st.header("πŸ“‹ Evaluation Results")
if len(self.sample_data['evaluations']) == 0:
st.warning("No evaluation data available")
return
# Metric type selector
metrics = self.sample_data['evaluations']['metric_type'].unique()
selected_metric = st.selectbox("Select Metric Type", ["All Metrics"] + list(metrics))
# Filter data
if selected_metric != "All Metrics":
filtered_evals = self.sample_data['evaluations'][
self.sample_data['evaluations']['metric_type'] == selected_metric
]
else:
filtered_evals = self.sample_data['evaluations']
# Evaluation charts
col1, col2 = st.columns(2)
with col1:
st.subheader("πŸ“Š Score Distribution")
fig = px.histogram(
filtered_evals,
x='score',
nbins=20,
title="Evaluation Score Distribution"
)
fig.update_layout(height=400)
st.plotly_chart(fig, use_container_width=True)
with col2:
st.subheader("πŸ† Agent Comparison")
agent_scores = filtered_evals.groupby('agent')['score'].mean().reset_index()
fig = px.bar(
agent_scores,
x='agent',
y='score',
title="Average Scores by Agent"
)
fig.update_layout(height=400)
st.plotly_chart(fig, use_container_width=True)
# Score trends over time
st.subheader("πŸ“ˆ Score Trends")
fig = px.line(
filtered_evals.head(50),
x='timestamp',
y='score',
color='agent',
title="Evaluation Scores Over Time"
)
fig.update_layout(height=400)
st.plotly_chart(fig, use_container_width=True)
# Detailed evaluation table
st.subheader("πŸ“‹ Detailed Evaluation Results")
st.dataframe(
filtered_evals.sort_values('timestamp', ascending=False),
use_container_width=True
)
def render_system_tab(self):
"""Render the system metrics tab"""
st.header("πŸ’» System Metrics")
if len(self.sample_data['system_metrics']) == 0:
st.warning("No system metrics data available")
return
# Current system status
latest_metrics = self.sample_data['system_metrics'].iloc[0]
col1, col2, col3 = st.columns(3)
with col1:
st.metric(
"CPU Usage",
f"{latest_metrics['cpu_usage']:.1f}%",
delta=f"{random.uniform(-5, 5):.1f}%"
)
with col2:
st.metric(
"Memory Usage",
f"{latest_metrics['memory_usage']:.1f}%",
delta=f"{random.uniform(-3, 3):.1f}%"
)
with col3:
st.metric(
"Active Agents",
f"{latest_metrics['active_agents']}",
delta=random.choice([-1, 0, 1])
)
st.divider()
# System metrics over time
col1, col2 = st.columns(2)
with col1:
st.subheader("πŸ’Ύ Resource Usage")
fig = go.Figure()
fig.add_trace(go.Scatter(
x=self.sample_data['system_metrics']['timestamp'],
y=self.sample_data['system_metrics']['cpu_usage'],
mode='lines',
name='CPU Usage',
line=dict(color='#FF6B6B')
))
fig.add_trace(go.Scatter(
x=self.sample_data['system_metrics']['timestamp'],
y=self.sample_data['system_metrics']['memory_usage'],
mode='lines',
name='Memory Usage',
line=dict(color='#4ECDC4')
))
fig.update_layout(
title="System Resource Usage Over Time",
xaxis_title="Time",
yaxis_title="Usage (%)",
height=400
)
st.plotly_chart(fig, use_container_width=True)
with col2:
st.subheader("πŸ€– Agent Activity")
fig = px.line(
self.sample_data['system_metrics'],
x='timestamp',
y='active_agents',
title="Active Agents Over Time"
)
fig.update_layout(height=400)
st.plotly_chart(fig, use_container_width=True)
def render_insights_tab(self):
"""Render the insights and recommendations tab"""
st.header("πŸ’‘ Insights & Recommendations")
# Performance insights
st.subheader("🎯 Performance Insights")
try:
# Calculate insights
best_agent = self.sample_data['agent_performance'].groupby('agent_name')['success_rate'].mean().idxmax()
worst_task = self.sample_data['agent_performance'].groupby('task_type')['success_rate'].mean().idxmin()
avg_response_time = self.sample_data['agent_performance']['avg_response_time'].mean()
col1, col2 = st.columns(2)
with col1:
st.success(f"πŸ† **Best Performing Agent**: {best_agent}")
st.info(f"⚑ **Average Response Time**: {avg_response_time:.2f}s")
with col2:
st.warning(f"⚠️ **Task Needing Improvement**: {worst_task}")
st.info(f"πŸ“Š **Total Evaluations**: {len(self.sample_data['evaluations'])}")
except Exception:
st.info("Insufficient data for insights generation")
st.divider()
# Recommendations
st.subheader("πŸš€ Recommendations")
recommendations = [
"πŸ”§ **Optimize Response Time**: Consider implementing caching for frequently requested tasks",
"πŸ“ˆ **Scale High-Performing Agents**: Increase resources for agents with >90% success rates",
"🎯 **Focus on Weak Areas**: Provide additional training data for underperforming task types",
"⚑ **Monitor System Resources**: Set up alerts for CPU/Memory usage above 80%",
"πŸ”„ **Regular Evaluations**: Schedule automated evaluations every 24 hours",
"πŸ“Š **Data Quality**: Implement data validation checks for better evaluation accuracy"
]
for rec in recommendations:
st.markdown(f"- {rec}")
st.divider()
# Export options
st.subheader("πŸ“€ Export Options")
col1, col2, col3 = st.columns(3)
with col1:
if st.button("πŸ“Š Export Performance Data"):
csv = self.sample_data['agent_performance'].to_csv(index=False)
st.download_button(
label="Download CSV",
data=csv,
file_name="agent_performance.csv",
mime="text/csv"
)
with col2:
if st.button("πŸ“‹ Export Evaluations"):
csv = self.sample_data['evaluations'].to_csv(index=False)
st.download_button(
label="Download CSV",
data=csv,
file_name="evaluations.csv",
mime="text/csv"
)
with col3:
if st.button("πŸ’» Export System Metrics"):
csv = self.sample_data['system_metrics'].to_csv(index=False)
st.download_button(
label="Download CSV",
data=csv,
file_name="system_metrics.csv",
mime="text/csv"
)
def run(self):
"""Main dashboard application"""
# Sidebar
st.sidebar.title("πŸ€– Multi-Agent Dashboard")
st.sidebar.markdown("---")
# Navigation
tab_names = ["🎯 Overview", "πŸ€– Agents", "πŸ“‹ Evaluations", "πŸ’» System", "πŸ’‘ Insights"]
selected_tab = st.sidebar.radio("Navigate to:", tab_names)
st.sidebar.markdown("---")
st.sidebar.markdown("### πŸ“Š Quick Stats")
try:
total_agents = len(self.sample_data['agent_performance']['agent_name'].unique())
total_tests = len(self.sample_data['evaluations'])
avg_score = self.sample_data['evaluations']['score'].mean()
st.sidebar.metric("Total Agents", total_agents)
st.sidebar.metric("Total Tests", total_tests)
st.sidebar.metric("Avg Score", f"{avg_score:.2f}")
except Exception:
st.sidebar.info("Loading stats...")
# Main content
if selected_tab == "🎯 Overview":
self.render_overview_tab()
elif selected_tab == "πŸ€– Agents":
self.render_agents_tab()
elif selected_tab == "πŸ“‹ Evaluations":
self.render_evaluations_tab()
elif selected_tab == "πŸ’» System":
self.render_system_tab()
elif selected_tab == "πŸ’‘ Insights":
self.render_insights_tab()
# Footer
st.sidebar.markdown("---")
st.sidebar.markdown(
"πŸš€ **Multi-Agent System Dashboard**\n\n"
"Monitor and evaluate your AI agents in real-time."
)
# Initialize and run the dashboard
if __name__ == "__main__":
try:
dashboard = HuggingFaceDashboard()
dashboard.run()
except Exception as e:
st.error(f"Application Error: {str(e)}")
st.info("Please refresh the page or contact support if the issue persists.")