import gradio as gr from gradio_leaderboard import Leaderboard import json import os import time import requests from huggingface_hub import HfApi, hf_hub_download from huggingface_hub.errors import HfHubHTTPError import backoff from dotenv import load_dotenv import pandas as pd import random import plotly.graph_objects as go from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger # Load environment variables load_dotenv(override=True) # ============================================================================= # CONFIGURATION # ============================================================================= AGENTS_REPO = "SWE-Arena/bot_data" # HuggingFace dataset for assistant metadata LEADERBOARD_FILENAME = f"{os.getenv('COMPOSE_PROJECT_NAME')}.json" LEADERBOARD_REPO = "SWE-Arena/leaderboard_data" # HuggingFace dataset for leaderboard data MAX_RETRIES = 5 LEADERBOARD_COLUMNS = [ ("Assistant", "string"), ("Website", "string"), ("Total Wiki Edits", "number"), ("Total Membership Events", "number"), ] # ============================================================================= # HUGGINGFACE API WRAPPERS WITH BACKOFF # ============================================================================= def is_rate_limit_error(e): """Check if exception is a HuggingFace rate limit error (429).""" if isinstance(e, HfHubHTTPError): return e.response.status_code == 429 return False @backoff.on_exception( backoff.expo, HfHubHTTPError, max_tries=MAX_RETRIES, base=300, max_value=3600, giveup=lambda e: not is_rate_limit_error(e), on_backoff=lambda details: print( f"Rate limited. Retrying in {details['wait']/60:.1f} minutes ({details['wait']:.0f}s) - attempt {details['tries']}/5..." ) ) def list_repo_files_with_backoff(api, **kwargs): """Wrapper for api.list_repo_files() with exponential backoff for rate limits.""" return api.list_repo_files(**kwargs) @backoff.on_exception( backoff.expo, HfHubHTTPError, max_tries=MAX_RETRIES, base=300, max_value=3600, giveup=lambda e: not is_rate_limit_error(e), on_backoff=lambda details: print( f"Rate limited. Retrying in {details['wait']/60:.1f} minutes ({details['wait']:.0f}s) - attempt {details['tries']}/5..." ) ) def hf_hub_download_with_backoff(**kwargs): """Wrapper for hf_hub_download() with exponential backoff for rate limits.""" return hf_hub_download(**kwargs) # ============================================================================= # GITHUB USERNAME VALIDATION # ============================================================================= def validate_github_username(identifier): """Verify that a GitHub identifier exists.""" try: response = requests.get(f'https://api.github.com/users/{identifier}', timeout=10) return (True, "Username is valid") if response.status_code == 200 else (False, "GitHub identifier not found" if response.status_code == 404 else f"Validation error: HTTP {response.status_code}") except Exception as e: return False, f"Validation error: {str(e)}" # ============================================================================= # HUGGINGFACE DATASET OPERATIONS # ============================================================================= def load_agents_from_hf(): """Load all assistant metadata JSON files from HuggingFace dataset.""" try: api = HfApi() assistants = [] # List all files in the repository files = list_repo_files_with_backoff(api=api, repo_id=AGENTS_REPO, repo_type="dataset") # Filter for JSON files only json_files = [f for f in files if f.endswith('.json')] # Download and parse each JSON file for json_file in json_files: try: file_path = hf_hub_download_with_backoff( repo_id=AGENTS_REPO, filename=json_file, repo_type="dataset" ) with open(file_path, 'r') as f: agent_data = json.load(f) # Only process assistants with status == "active" if agent_data.get('status') != 'active': continue # Extract github_identifier from filename (e.g., "assistant[bot].json" -> "assistant[bot]") filename_identifier = json_file.replace('.json', '') # Add or override github_identifier to match filename agent_data['github_identifier'] = filename_identifier assistants.append(agent_data) except Exception as e: print(f"Warning: Could not load {json_file}: {str(e)}") continue print(f"Loaded {len(assistants)} assistants from HuggingFace") return assistants except Exception as e: print(f"Could not load assistants from HuggingFace: {str(e)}") return None def get_hf_token(): """Get HuggingFace token from environment variables.""" token = os.getenv('HF_TOKEN') if not token: print("Warning: HF_TOKEN not found in environment variables") return token def upload_with_retry(api, path_or_fileobj, path_in_repo, repo_id, repo_type, token, max_retries=5): """Upload file to HuggingFace with exponential backoff retry logic.""" delay = 2.0 for attempt in range(max_retries): try: api.upload_file( path_or_fileobj=path_or_fileobj, path_in_repo=path_in_repo, repo_id=repo_id, repo_type=repo_type, token=token ) if attempt > 0: print(f" Upload succeeded on attempt {attempt + 1}/{max_retries}") return True except Exception as e: if attempt < max_retries - 1: wait_time = delay + random.uniform(0, 1.0) print(f" Upload failed (attempt {attempt + 1}/{max_retries}): {str(e)}") print(f" Retrying in {wait_time:.1f} seconds...") time.sleep(wait_time) delay = min(delay * 2, 60.0) else: print(f" Upload failed after {max_retries} attempts: {str(e)}") raise def save_agent_to_hf(data): """Save a new assistant to HuggingFace dataset as {identifier}.json in root.""" try: api = HfApi() token = get_hf_token() if not token: raise Exception("No HuggingFace token found. Please set HF_TOKEN in your Space settings.") identifier = data['github_identifier'] filename = f"{identifier}.json" # Save locally first with open(filename, 'w') as f: json.dump(data, f, indent=2) try: # Upload to HuggingFace (root directory) upload_with_retry( api=api, path_or_fileobj=filename, path_in_repo=filename, repo_id=AGENTS_REPO, repo_type="dataset", token=token ) print(f"Saved assistant to HuggingFace: {filename}") return True finally: # Always clean up local file, even if upload fails if os.path.exists(filename): os.remove(filename) except Exception as e: print(f"Error saving assistant: {str(e)}") return False def load_leaderboard_data_from_hf(): """Load leaderboard data and monthly metrics from HuggingFace dataset.""" try: token = get_hf_token() # Download file file_path = hf_hub_download_with_backoff( repo_id=LEADERBOARD_REPO, filename=LEADERBOARD_FILENAME, repo_type="dataset", token=token ) # Load JSON data with open(file_path, 'r') as f: data = json.load(f) last_updated = data.get('metadata', {}).get('last_updated', 'Unknown') print(f"Loaded leaderboard data from HuggingFace (last updated: {last_updated})") return data except Exception as e: print(f"Could not load leaderboard data from HuggingFace: {str(e)}") return None # ============================================================================= # UI FUNCTIONS # ============================================================================= def _empty_plot(message="No data available for visualization"): """Return an empty Plotly figure with a message.""" fig = go.Figure() fig.add_annotation( text=message, xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False, font=dict(size=16) ) fig.update_layout(title=None, xaxis_title=None, height=500) return fig def _generate_color(index, total): """Generate distinct colors using HSL color space for better distribution.""" hue = (index * 360 / total) % 360 saturation = 70 + (index % 3) * 10 lightness = 45 + (index % 2) * 10 return f'hsl({hue}, {saturation}%, {lightness}%)' def create_monthly_wiki_plot(top_n=5): """Create a Plotly figure showing monthly wiki edits as bar charts.""" saved_data = load_leaderboard_data_from_hf() if not saved_data or 'monthly_metrics' not in saved_data: return _empty_plot() metrics = saved_data['monthly_metrics'] # Apply top_n filter if top_n is not None and top_n > 0 and metrics.get('assistants'): agent_totals = [] for agent_name in metrics['assistants']: agent_data = metrics['data'].get(agent_name, {}) wiki_edits = sum(agent_data.get('total_wiki_edits', [])) agent_totals.append((agent_name, wiki_edits)) agent_totals.sort(key=lambda x: x[1], reverse=True) top_agents = [name for name, _ in agent_totals[:top_n]] metrics = { 'assistants': top_agents, 'months': metrics['months'], 'data': {a: metrics['data'][a] for a in top_agents if a in metrics['data']} } if not metrics['assistants'] or not metrics['months']: return _empty_plot() fig = go.Figure() assistants = metrics['assistants'] months = metrics['months'] data = metrics['data'] for idx, agent_name in enumerate(assistants): color = _generate_color(idx, len(assistants)) agent_data = data[agent_name] x_bars = [] y_bars = [] for month, count in zip(months, agent_data.get('total_wiki_edits', [])): if count > 0: x_bars.append(month) y_bars.append(count) if x_bars and y_bars: fig.add_trace( go.Bar( x=x_bars, y=y_bars, name=agent_name, marker=dict(color=color, opacity=0.7), hovertemplate='%{fullData.name}
Month: %{x}
Wiki Edits: %{y}', offsetgroup=agent_name ) ) fig.update_xaxes(title_text=None) fig.update_yaxes(title_text="Wiki Edits") show_legend = (top_n is not None and top_n <= 10) fig.update_layout( title=None, hovermode='closest', barmode='group', height=600, showlegend=show_legend, margin=dict(l=50, r=150 if show_legend else 50, t=50, b=50) ) return fig def create_monthly_members_plot(top_n=5): """Create a Plotly figure showing monthly membership events as bar charts.""" saved_data = load_leaderboard_data_from_hf() if not saved_data or 'monthly_metrics' not in saved_data: return _empty_plot() metrics = saved_data['monthly_metrics'] # Apply top_n filter if top_n is not None and top_n > 0 and metrics.get('assistants'): agent_totals = [] for agent_name in metrics['assistants']: agent_data = metrics['data'].get(agent_name, {}) total_members = sum(agent_data.get('total_members', [])) agent_totals.append((agent_name, total_members)) agent_totals.sort(key=lambda x: x[1], reverse=True) top_agents = [name for name, _ in agent_totals[:top_n]] metrics = { 'assistants': top_agents, 'months': metrics['months'], 'data': {a: metrics['data'][a] for a in top_agents if a in metrics['data']} } if not metrics['assistants'] or not metrics['months']: return _empty_plot() fig = go.Figure() assistants = metrics['assistants'] months = metrics['months'] data = metrics['data'] for idx, agent_name in enumerate(assistants): color = _generate_color(idx, len(assistants)) agent_data = data[agent_name] x_bars = [] y_bars = [] for month, count in zip(months, agent_data.get('total_members', [])): if count > 0: x_bars.append(month) y_bars.append(count) if x_bars and y_bars: fig.add_trace( go.Bar( x=x_bars, y=y_bars, name=agent_name, marker=dict(color=color, opacity=0.7), hovertemplate='%{fullData.name}
Month: %{x}
Membership Events: %{y}', offsetgroup=agent_name ) ) fig.update_xaxes(title_text=None) fig.update_yaxes(title_text="Membership Events") show_legend = (top_n is not None and top_n <= 10) fig.update_layout( title=None, hovermode='closest', barmode='group', height=600, showlegend=show_legend, margin=dict(l=50, r=150 if show_legend else 50, t=50, b=50) ) return fig def get_leaderboard_dataframe(): """Load leaderboard from saved dataset and convert to pandas DataFrame for display.""" saved_data = load_leaderboard_data_from_hf() if not saved_data or 'leaderboard' not in saved_data: print(f"No leaderboard data available") column_names = [col[0] for col in LEADERBOARD_COLUMNS] return pd.DataFrame(columns=column_names) cache_dict = saved_data['leaderboard'] last_updated = saved_data.get('metadata', {}).get('last_updated', 'Unknown') print(f"Loaded leaderboard from saved dataset (last updated: {last_updated})") print(f"Cache dict size: {len(cache_dict)}") if not cache_dict: print("WARNING: cache_dict is empty!") column_names = [col[0] for col in LEADERBOARD_COLUMNS] return pd.DataFrame(columns=column_names) rows = [] filtered_count = 0 for identifier, data in cache_dict.items(): wiki_edits = data.get('total_wiki_edits', 0) total_members = data.get('total_members', 0) # Filter out assistants with zero activity across both metrics if wiki_edits == 0 and total_members == 0: filtered_count += 1 continue rows.append([ data.get('name', 'Unknown'), data.get('website', 'N/A'), wiki_edits, total_members, ]) print(f"Filtered out {filtered_count} assistants with 0 activity") print(f"Leaderboard will show {len(rows)} assistants") # Create DataFrame column_names = [col[0] for col in LEADERBOARD_COLUMNS] df = pd.DataFrame(rows, columns=column_names) # Ensure numeric types numeric_cols = ["Total Wiki Edits", "Total Membership Events"] for col in numeric_cols: if col in df.columns: df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0) # Sort by combined activity descending if not df.empty: df = df.sort_values(by=["Total Wiki Edits", "Total Membership Events"], ascending=False).reset_index(drop=True) # Workaround for gradio_leaderboard bug: single-row tables don't render properly if len(df) == 1: placeholder_row = pd.DataFrame([[ "Submit yours to join!", "—", 0, 0 ]], columns=df.columns) df = pd.concat([df, placeholder_row], ignore_index=True) print("Added placeholder row for single-record workaround") print(f"Final DataFrame shape: {df.shape}") print("="*60 + "\n") return df def submit_agent(identifier, agent_name, organization, website): """Submit a new assistant to the leaderboard.""" # Validate required fields if not identifier or not identifier.strip(): return "ERROR: GitHub identifier is required", gr.update() if not agent_name or not agent_name.strip(): return "ERROR: Assistant name is required", gr.update() if not organization or not organization.strip(): return "ERROR: Organization name is required", gr.update() if not website or not website.strip(): return "ERROR: Website URL is required", gr.update() # Clean inputs identifier = identifier.strip() agent_name = agent_name.strip() organization = organization.strip() website = website.strip() # Validate GitHub identifier is_valid, message = validate_github_username(identifier) if not is_valid: return f"ERROR: {message}", gr.update() # Check for duplicates by loading assistants from HuggingFace assistants = load_agents_from_hf() if assistants: existing_names = {assistant['github_identifier'] for assistant in assistants} if identifier in existing_names: return f"WARNING: Assistant with identifier '{identifier}' already exists", gr.update() # Create submission submission = { 'name': agent_name, 'organization': organization, 'github_identifier': identifier, 'website': website, 'status': 'active' } # Save to HuggingFace if not save_agent_to_hf(submission): return "ERROR: Failed to save submission", gr.update() return f"SUCCESS: Successfully submitted {agent_name}! Community data will be automatically populated by the backend system via the maintainers.", gr.update() # ============================================================================= # DATA RELOAD FUNCTION # ============================================================================= def reload_leaderboard_data(): """Reload leaderboard data from HuggingFace. Called by scheduler daily.""" print(f"\n{'='*80}") print(f"Reloading leaderboard data from HuggingFace...") print(f"{'='*80}\n") try: data = load_leaderboard_data_from_hf() if data: print(f"Successfully reloaded leaderboard data") print(f" Last updated: {data.get('metadata', {}).get('last_updated', 'Unknown')}") print(f" Agents: {len(data.get('leaderboard', {}))}") else: print(f"No data available") except Exception as e: print(f"Error reloading leaderboard data: {str(e)}") print(f"{'='*80}\n") # ============================================================================= # GRADIO APPLICATION # ============================================================================= print(f"\nStarting SWE Assistant Community Leaderboard") print(f" Data source: {LEADERBOARD_REPO}") print(f" Reload frequency: Daily at 12:00 AM UTC\n") # Start APScheduler for daily data reload at 12:00 AM UTC scheduler = BackgroundScheduler(timezone="UTC") scheduler.add_job( reload_leaderboard_data, trigger=CronTrigger(hour=0, minute=0), id='daily_data_reload', name='Daily Data Reload', replace_existing=True ) scheduler.start() print(f"\n{'='*80}") print(f"Scheduler initialized successfully") print(f"Reload schedule: Daily at 12:00 AM UTC") print(f"On startup: Loads cached data from HuggingFace on demand") print(f"{'='*80}\n") # Create Gradio interface with gr.Blocks(title="SWE Assistant Community Leaderboard", theme=gr.themes.Soft()) as app: gr.Markdown("# SWE Assistant Community Leaderboard") gr.Markdown(f"Track and compare community activity (wiki edits & membership events) by SWE assistants") with gr.Tabs(): # Leaderboard Tab with gr.Tab("Leaderboard"): gr.Markdown("*Statistics are based on wiki edits and membership events by assistants*") leaderboard_table = Leaderboard( value=pd.DataFrame(columns=[col[0] for col in LEADERBOARD_COLUMNS]), datatype=LEADERBOARD_COLUMNS, search_columns=["Assistant", "Website"], filter_columns=[] ) # Load leaderboard data when app starts app.load( fn=get_leaderboard_dataframe, inputs=[], outputs=[leaderboard_table] ) # Monthly Performance Metrics gr.Markdown("---") gr.Markdown("## Monthly Performance Metrics - Top 5 Assistants") with gr.Row(): with gr.Column(): gr.Markdown("*Wiki edit volume over time*") wiki_plot = gr.Plot() with gr.Column(): gr.Markdown("*Membership event volume over time*") members_plot = gr.Plot() app.load( fn=lambda: create_monthly_wiki_plot(), inputs=[], outputs=[wiki_plot] ) app.load( fn=lambda: create_monthly_members_plot(), inputs=[], outputs=[members_plot] ) # Submit Assistant Tab with gr.Tab("Submit Your Assistant"): gr.Markdown("Fill in the details below to add your assistant to the leaderboard.") with gr.Row(): with gr.Column(): github_input = gr.Textbox( label="GitHub Identifier*", placeholder="Your assistant username (e.g., my-assistant[bot])" ) name_input = gr.Textbox( label="Assistant Name*", placeholder="Your assistant's display name" ) with gr.Column(): organization_input = gr.Textbox( label="Organization*", placeholder="Your organization or team name" ) website_input = gr.Textbox( label="Website*", placeholder="https://your-assistant-website.com" ) submit_button = gr.Button( "Submit Assistant", variant="primary" ) submission_status = gr.Textbox( label="Submission Status", interactive=False ) # Event handler submit_button.click( fn=submit_agent, inputs=[github_input, name_input, organization_input, website_input], outputs=[submission_status, leaderboard_table] ) # Launch application if __name__ == "__main__": app.launch()