Spaces:
Sleeping
Sleeping
| """ | |
| MedVidBench Leaderboard - Interactive leaderboard for evaluating Video-Language Models | |
| on the MedVidBench benchmark across 8 medical video understanding tasks. | |
| """ | |
| import gradio as gr | |
| import pandas as pd | |
| import json | |
| import os | |
| import shutil | |
| import subprocess | |
| import sys | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Dict, List, Tuple, Optional | |
| from collections import defaultdict | |
| from huggingface_hub import hf_hub_download, HfApi | |
| def load_ground_truth(): | |
| """ | |
| Load ground truth from private HuggingFace dataset repository. | |
| Falls back to local file for development. | |
| """ | |
| try: | |
| # Get token from environment (HF Space secret) | |
| token = os.environ.get('HF_TOKEN') | |
| if not token: | |
| print("β οΈ HF_TOKEN not found in environment, trying local file...") | |
| raise ValueError("HF_TOKEN not found") | |
| # Download from private repository | |
| print("β³ Downloading ground truth from private repository...") | |
| gt_file = hf_hub_download( | |
| repo_id="UIIAmerica/MedVidBench-GroundTruth", | |
| filename="ground_truth.json", | |
| repo_type="dataset", | |
| token=token, | |
| cache_dir="./cache" # Cache locally to avoid re-downloading | |
| ) | |
| # Load the data | |
| with open(gt_file) as f: | |
| data = json.load(f) | |
| print(f"β Loaded ground truth from private repo: {len(data)} samples") | |
| return data | |
| except Exception as e: | |
| print(f"β οΈ Could not load from private repo: {e}") | |
| # Fallback to local file for development | |
| local_file = Path("data/ground_truth.json") | |
| if local_file.exists(): | |
| with open(local_file) as f: | |
| data = json.load(f) | |
| print(f"β Loaded ground truth from local file: {len(data)} samples") | |
| return data | |
| else: | |
| raise FileNotFoundError( | |
| "Ground truth not found. Please set HF_TOKEN secret or provide local file." | |
| ) | |
| # Configuration - Use persistent storage on HuggingFace Spaces | |
| # On HF Spaces, /data is persistent across app updates | |
| PERSISTENT_DIR = Path("/data") if Path("/data").exists() else Path(".") | |
| SUBMISSIONS_DIR = PERSISTENT_DIR / "submissions" | |
| RESULTS_DIR = PERSISTENT_DIR / "results" | |
| LEADERBOARD_FILE = PERSISTENT_DIR / "leaderboard.json" | |
| EVAL_SCRIPT = Path("evaluation/evaluate_all_pai.py") # Local copy in repo | |
| # Ensure directories exist | |
| SUBMISSIONS_DIR.mkdir(exist_ok=True) | |
| RESULTS_DIR.mkdir(exist_ok=True) | |
| # Load ground truth at startup | |
| print("=" * 60) | |
| print("LOADING GROUND TRUTH DATA") | |
| print("=" * 60) | |
| GROUND_TRUTH = load_ground_truth() | |
| print("=" * 60) | |
| # Ensure ground truth is available at expected path for evaluation subprocess | |
| GROUND_TRUTH_FILE = Path("data/ground_truth.json") | |
| if not GROUND_TRUTH_FILE.exists(): | |
| print(f"β οΈ Saving ground truth to {GROUND_TRUTH_FILE} for evaluation subprocess...") | |
| GROUND_TRUTH_FILE.parent.mkdir(parents=True, exist_ok=True) | |
| with open(GROUND_TRUTH_FILE, 'w') as f: | |
| json.dump(GROUND_TRUTH, f) | |
| print(f"β Ground truth saved to {GROUND_TRUTH_FILE}") | |
| # MedVidBench Metrics Definitions (10 metrics from 8 tasks) | |
| # Note: TAL has 2 metrics, DVC has 2 metrics, others have 1 metric each | |
| METRICS = { | |
| "cvs_acc": { | |
| "name": "CVS_acc", | |
| "full_name": "CVS Assessment Accuracy", | |
| "higher_better": True, | |
| "description": "Clinical variable scoring accuracy" | |
| }, | |
| "nap_acc": { | |
| "name": "NAP_acc", | |
| "full_name": "Next Action Prediction Accuracy", | |
| "higher_better": True, | |
| "description": "Accuracy in predicting next surgical step" | |
| }, | |
| "sa_acc": { | |
| "name": "SA_acc", | |
| "full_name": "Skill Assessment Accuracy", | |
| "higher_better": True, | |
| "description": "Surgical skill level evaluation accuracy" | |
| }, | |
| "stg_miou": { | |
| "name": "STG_mIoU", | |
| "full_name": "Spatiotemporal Grounding mIoU", | |
| "higher_better": True, | |
| "description": "Mean IoU for spatial+temporal localization" | |
| }, | |
| "tag_miou_03": { | |
| "name": "TAG_mIoU@0.3", | |
| "full_name": "Temporal Action Grounding mIoU@0.3", | |
| "higher_better": True, | |
| "description": "Mean IoU at threshold 0.3 for temporal localization" | |
| }, | |
| "tag_miou_05": { | |
| "name": "TAG_mIoU@0.5", | |
| "full_name": "Temporal Action Grounding mIoU@0.5", | |
| "higher_better": True, | |
| "description": "Mean IoU at threshold 0.5 for temporal localization" | |
| }, | |
| "dvc_llm": { | |
| "name": "DVC_llm", | |
| "full_name": "Dense Video Captioning LLM Score", | |
| "higher_better": True, | |
| "description": "Caption quality score (LLM judge or semantic similarity)" | |
| }, | |
| "dvc_f1": { | |
| "name": "DVC_F1", | |
| "full_name": "Dense Video Captioning F1", | |
| "higher_better": True, | |
| "description": "F1 score for temporal segment localization" | |
| }, | |
| "vs_llm": { | |
| "name": "VS_llm", | |
| "full_name": "Video Summary LLM Score", | |
| "higher_better": True, | |
| "description": "Video summary quality score" | |
| }, | |
| "rc_llm": { | |
| "name": "RC_llm", | |
| "full_name": "Region Caption LLM Score", | |
| "higher_better": True, | |
| "description": "Region caption quality score" | |
| }, | |
| } | |
| # Task descriptions for Tasks & Metrics tab | |
| TASKS = { | |
| "tal": { | |
| "name": "Temporal Action Localization (TAL)", | |
| "key": "tal", | |
| "metrics": "TAG_mIoU@0.3, TAG_mIoU@0.5", | |
| "description": "Identify and temporally localize surgical actions in video" | |
| }, | |
| "stg": { | |
| "name": "Spatiotemporal Grounding (STG)", | |
| "key": "stg", | |
| "metrics": "STG_mIoU", | |
| "description": "Localize objects in both space (bbox) and time (temporal span)" | |
| }, | |
| "next_action": { | |
| "name": "Next Action Prediction (NAP)", | |
| "key": "next_action", | |
| "metrics": "NAP_acc", | |
| "description": "Predict the next surgical step given current video context" | |
| }, | |
| "dvc": { | |
| "name": "Dense Video Captioning (DVC)", | |
| "key": "dvc", | |
| "metrics": "DVC_llm, DVC_F1", | |
| "description": "Generate captions for multiple events with temporal localization" | |
| }, | |
| "vs": { | |
| "name": "Video Summary (VS)", | |
| "key": "vs", | |
| "metrics": "VS_llm", | |
| "description": "Generate comprehensive summary of surgical procedure" | |
| }, | |
| "rc": { | |
| "name": "Region Caption (RC)", | |
| "key": "rc", | |
| "metrics": "RC_llm", | |
| "description": "Describe specific spatial regions in surgical frames" | |
| }, | |
| "skill_assessment": { | |
| "name": "Skill Assessment (SA)", | |
| "key": "skill_assessment", | |
| "metrics": "SA_acc", | |
| "description": "Evaluate surgeon skill level (novice/intermediate/expert)" | |
| }, | |
| "cvs_assessment": { | |
| "name": "CVS Assessment", | |
| "key": "cvs_assessment", | |
| "metrics": "CVS_acc", | |
| "description": "Score clinical variables in surgical performance" | |
| }, | |
| } | |
| # Test set statistics | |
| TEST_SET_STATS = { | |
| "total_samples": 6245, | |
| "datasets": ["AVOS", "CholecT50", "CholecTrack20", "Cholec80_CVS", "CoPESD", "EgoSurgery", "NurViD", "JIGSAWS"], | |
| "video_frames": 103742, | |
| } | |
| def load_leaderboard() -> pd.DataFrame: | |
| """ | |
| Load leaderboard from private HuggingFace repo. | |
| Falls back to local file for development. | |
| """ | |
| try: | |
| # Try loading from private repo first | |
| token = os.environ.get('HF_TOKEN') | |
| if token: | |
| print("β³ Downloading leaderboard from private repository...") | |
| try: | |
| leaderboard_file = hf_hub_download( | |
| repo_id="UIIAmerica/MedVidBench-GroundTruth", | |
| filename="leaderboard.json", | |
| repo_type="dataset", | |
| token=token, | |
| cache_dir="./cache" | |
| ) | |
| with open(leaderboard_file, 'r') as f: | |
| data = json.load(f) | |
| if data: | |
| df = pd.DataFrame(data) | |
| # Remove any 'average' column from old leaderboard format | |
| if 'average' in df.columns: | |
| df = df.drop('average', axis=1) | |
| # Sort by first metric (CVS_acc) descending | |
| if 'cvs_acc' in df.columns: | |
| df = df.sort_values('cvs_acc', ascending=False).reset_index(drop=True) | |
| print(f"β Loaded leaderboard from private repo: {len(df)} entries") | |
| return df | |
| except Exception as e: | |
| print(f"β οΈ Could not load leaderboard from private repo: {e}") | |
| print(" Using local fallback...") | |
| except Exception: | |
| pass | |
| # Fallback to local file | |
| if LEADERBOARD_FILE.exists(): | |
| with open(LEADERBOARD_FILE, 'r') as f: | |
| data = json.load(f) | |
| if data: | |
| df = pd.DataFrame(data) | |
| # Remove any 'average' column from old leaderboard format | |
| if 'average' in df.columns: | |
| df = df.drop('average', axis=1) | |
| # Sort by first metric (CVS_acc) descending - no overall average | |
| if 'cvs_acc' in df.columns: | |
| df = df.sort_values('cvs_acc', ascending=False).reset_index(drop=True) | |
| print(f"β Loaded leaderboard from local file: {len(df)} entries") | |
| return df | |
| # Return empty dataframe with correct structure (no average column) | |
| print("π No existing leaderboard found, starting fresh") | |
| columns = ["rank", "model_name", "organization"] + list(METRICS.keys()) + ["date", "contact"] | |
| return pd.DataFrame(columns=columns) | |
| def save_leaderboard(df: pd.DataFrame): | |
| """ | |
| Save leaderboard to both local file and private HuggingFace repo. | |
| This ensures persistence across app updates on HuggingFace Spaces. | |
| """ | |
| # Add rank column | |
| df['rank'] = range(1, len(df) + 1) | |
| # Save to local JSON first | |
| with open(LEADERBOARD_FILE, 'w') as f: | |
| json.dump(df.to_dict('records'), f, indent=2) | |
| print(f"β Saved leaderboard locally: {len(df)} entries") | |
| # Upload to private HuggingFace repo | |
| try: | |
| token = os.environ.get('HF_TOKEN') | |
| if not token: | |
| print("β οΈ HF_TOKEN not found in environment") | |
| print(" Set HF_TOKEN secret in Space settings to enable repo sync") | |
| print(" Leaderboard saved locally only (will not persist across restarts)") | |
| return | |
| print("β³ Uploading leaderboard to private repository...") | |
| print(f" Target: UIIAmerica/MedVidBench-GroundTruth/leaderboard.json") | |
| print(f" Entries: {len(df)}") | |
| api = HfApi() | |
| # Upload with detailed error handling | |
| result = api.upload_file( | |
| path_or_fileobj=str(LEADERBOARD_FILE), | |
| path_in_repo="leaderboard.json", | |
| repo_id="UIIAmerica/MedVidBench-GroundTruth", | |
| repo_type="dataset", | |
| token=token, | |
| commit_message=f"Update leaderboard: {len(df)} entries ({datetime.now().strftime('%Y-%m-%d %H:%M:%S')})" | |
| ) | |
| print(f"β Successfully uploaded leaderboard to private repo") | |
| print(f" Commit URL: {result}") | |
| except Exception as e: | |
| error_msg = str(e) | |
| print(f"β Failed to upload leaderboard to private repo") | |
| print(f" Error: {error_msg}") | |
| # Provide specific guidance based on error type | |
| if "401" in error_msg or "Unauthorized" in error_msg: | |
| print(" β Issue: Invalid or expired token") | |
| print(" β Fix: Regenerate HF_TOKEN with write permission") | |
| elif "404" in error_msg or "Not Found" in error_msg: | |
| print(" β Issue: Repository not found") | |
| print(" β Fix: Create UIIAmerica/MedVidBench-GroundTruth repo") | |
| elif "403" in error_msg or "Forbidden" in error_msg: | |
| print(" β Issue: Token lacks write permission") | |
| print(" β Fix: Use token with write access to dataset") | |
| else: | |
| print(f" β Check HuggingFace status and repo permissions") | |
| print(" β οΈ Leaderboard saved locally only (will not persist)") | |
| def backup_results_to_repo(model_name: str, results_dir: Path): | |
| """ | |
| Backup full evaluation results to private HuggingFace repo. | |
| This stores detailed evaluation logs for each submission. | |
| """ | |
| try: | |
| token = os.environ.get('HF_TOKEN') | |
| if not token: | |
| return | |
| # Check if results directory exists | |
| if not results_dir.exists(): | |
| return | |
| print(f"β³ Backing up results for {model_name} to private repository...") | |
| api = HfApi() | |
| # Upload eval_output.txt if exists | |
| eval_output = results_dir / "eval_output.txt" | |
| if eval_output.exists(): | |
| api.upload_file( | |
| path_or_fileobj=str(eval_output), | |
| path_in_repo=f"results/{model_name}/eval_output.txt", | |
| repo_id="UIIAmerica/MedVidBench-GroundTruth", | |
| repo_type="dataset", | |
| token=token, | |
| commit_message=f"Backup results for {model_name}" | |
| ) | |
| # Upload input.json if exists (the submitted predictions) | |
| input_file = results_dir / "input.json" | |
| if input_file.exists(): | |
| api.upload_file( | |
| path_or_fileobj=str(input_file), | |
| path_in_repo=f"results/{model_name}/input.json", | |
| repo_id="UIIAmerica/MedVidBench-GroundTruth", | |
| repo_type="dataset", | |
| token=token, | |
| commit_message=f"Backup predictions for {model_name}" | |
| ) | |
| print(f"β Backed up results for {model_name}") | |
| except Exception as e: | |
| print(f"β οΈ Failed to backup results: {e}") | |
| # ============================================================================ | |
| # Admin Functions | |
| # ============================================================================ | |
| def check_admin_password(password: str) -> bool: | |
| """ | |
| Check if provided password matches admin password. | |
| Admin password is set via ADMIN_PASSWORD environment variable. | |
| """ | |
| admin_password = os.environ.get('ADMIN_PASSWORD', '') | |
| if not admin_password: | |
| # If no admin password set, use a default (should be changed in production) | |
| admin_password = 'admin-2025' | |
| return password == admin_password | |
| def delete_model_submission(model_name: str) -> Tuple[bool, str]: | |
| """ | |
| Delete a model submission from leaderboard and cleanup associated files. | |
| Args: | |
| model_name: Name of the model to delete | |
| Returns: | |
| (success, message) | |
| """ | |
| try: | |
| # Load current leaderboard | |
| df = load_leaderboard() | |
| if df.empty: | |
| return False, "Leaderboard is empty" | |
| # Check if model exists | |
| if model_name not in df['model_name'].values: | |
| return False, f"Model '{model_name}' not found in leaderboard" | |
| # Get model info before deletion | |
| model_row = df[df['model_name'] == model_name].iloc[0] | |
| organization = model_row.get('organization', 'Unknown') | |
| date = model_row.get('date', 'Unknown') | |
| # Remove from leaderboard | |
| df = df[df['model_name'] != model_name].reset_index(drop=True) | |
| save_leaderboard(df) | |
| # Cleanup associated files | |
| model_dir_name = model_name.replace(" ", "_") | |
| results_dir = RESULTS_DIR / model_dir_name | |
| submissions_dir = SUBMISSIONS_DIR / model_dir_name | |
| cleanup_info = [] | |
| # Remove results directory | |
| if results_dir.exists(): | |
| shutil.rmtree(results_dir) | |
| cleanup_info.append(f"Removed results: {results_dir}") | |
| # Remove submissions directory | |
| if submissions_dir.exists(): | |
| shutil.rmtree(submissions_dir) | |
| cleanup_info.append(f"Removed submissions: {submissions_dir}") | |
| message = f"β Successfully deleted model '{model_name}'\n" | |
| message += f" Organization: {organization}\n" | |
| message += f" Date: {date}\n\n" | |
| if cleanup_info: | |
| message += "Cleaned up:\n" + "\n".join(f" - {info}" for info in cleanup_info) | |
| return True, message | |
| except Exception as e: | |
| return False, f"Error deleting model: {str(e)}" | |
| def get_leaderboard_for_admin() -> pd.DataFrame: | |
| """Get leaderboard data formatted for admin view.""" | |
| df = load_leaderboard() | |
| if df.empty: | |
| return pd.DataFrame(columns=["rank", "model_name", "organization", "date", "contact"]) | |
| # Select key columns for admin view | |
| admin_cols = ["rank", "model_name", "organization", "date", "contact"] | |
| available_cols = [col for col in admin_cols if col in df.columns] | |
| return df[available_cols] | |
| def detect_evaluation_output_format(file_path: str) -> Tuple[bool, str]: | |
| """ | |
| Detect if uploaded file is pre-processed evaluation output with LLM judge scores. | |
| Expected evaluation output format: | |
| { | |
| "model_name": "...", | |
| "evaluated_samples": [ | |
| { | |
| "sample_id": "...", | |
| "dataset": "...", | |
| "evaluations": { | |
| "0.3": [{"prediction": "...", "ground_truth": "...", | |
| "llm_judge": {"R2": 4, "R3": 2, ...}}], | |
| "0.5": [...], | |
| "0.7": [...] | |
| } | |
| } | |
| ], | |
| "aggregated_results": {...} | |
| } | |
| Returns: | |
| (is_evaluation_output, message) | |
| Note: Pre-processed files typically only have LLM judge scores for captioning tasks (DVC, VS, RC). | |
| Other metrics (TAL, STG, NAP, SA, CVS) still need to be calculated from raw inference results. | |
| """ | |
| try: | |
| with open(file_path, 'r') as f: | |
| data = json.load(f) | |
| # Check for evaluation output structure | |
| if isinstance(data, dict): | |
| has_model_name = "model_name" in data | |
| has_evaluated_samples = "evaluated_samples" in data | |
| has_aggregated_results = "aggregated_results" in data | |
| if has_model_name and has_evaluated_samples and has_aggregated_results: | |
| # Verify structure by checking first evaluated sample | |
| if len(data["evaluated_samples"]) > 0: | |
| sample = data["evaluated_samples"][0] | |
| if "evaluations" in sample and isinstance(sample["evaluations"], dict): | |
| return True, "β Detected pre-processed evaluation output with LLM judge scores (captioning tasks only)" | |
| return False, "Not an evaluation output file" | |
| except Exception as e: | |
| return False, f"Error detecting format: {str(e)}" | |
| def check_for_precomputed_llm_scores(file_path: str) -> Tuple[bool, Optional[Dict]]: | |
| """ | |
| Check if the results file has pre-computed LLM judge scores in struc_info. | |
| Returns: | |
| (has_precomputed_scores, llm_score_dict or None) | |
| llm_score_dict format: {'dvc': score, 'vs': score, 'rc': score} | |
| """ | |
| try: | |
| with open(file_path, 'r') as f: | |
| data = json.load(f) | |
| # Handle both list and dict formats | |
| if isinstance(data, dict): | |
| records = list(data.values()) | |
| elif isinstance(data, list): | |
| records = data | |
| else: | |
| return False, None | |
| if len(records) == 0: | |
| return False, None | |
| # Check if records have struc_info with llm_judge scores | |
| has_llm_scores = False | |
| for record in records[:10]: # Check first 10 samples | |
| if "struc_info" in record: | |
| struc_info = record.get("struc_info", []) | |
| if isinstance(struc_info, list) and len(struc_info) > 0: | |
| for item in struc_info: | |
| if isinstance(item, dict) and "llm_judge" in item: | |
| has_llm_scores = True | |
| break | |
| if has_llm_scores: | |
| break | |
| return has_llm_scores, None # Return None for score dict as we'll get it from evaluation | |
| except Exception as e: | |
| return False, None | |
| def validate_results_file(file_path: str) -> Tuple[bool, str, bool]: | |
| """ | |
| Validate uploaded file - accepts both prediction-only and merged formats. | |
| Expected format for predictions (preferred): | |
| [ | |
| { | |
| "id": "video_id&&start&&end&&fps", | |
| "qa_type": "tal/stg/next_action/dvc/vs/rc/skill_assessment/cvs_assessment", | |
| "prediction": "Model's answer..." | |
| }, | |
| ... | |
| ] | |
| Also accepts merged format (for testing): | |
| { | |
| "0": { | |
| "metadata": {...}, | |
| "qa_type": "tal", | |
| "question": "...", | |
| "answer": "...", | |
| "gnd": "...", | |
| "struc_info": [...] | |
| }, | |
| ... | |
| } | |
| Returns: | |
| (valid, message, has_precomputed_llm_scores) | |
| """ | |
| try: | |
| with open(file_path, 'r') as f: | |
| data = json.load(f) | |
| # Handle both list and dict formats | |
| if isinstance(data, dict): | |
| records = list(data.values()) | |
| elif isinstance(data, list): | |
| records = data | |
| else: | |
| return False, f"Invalid format: expected list or dict, got {type(data)}", False | |
| if len(records) == 0: | |
| return False, "Empty predictions file", False | |
| # Check first record to detect format | |
| sample = records[0] | |
| # Format 1: Prediction-only (id, qa_type, prediction) | |
| is_prediction_only = "id" in sample and "prediction" in sample | |
| # Format 2: Merged (metadata, question, answer, gnd, struc_info) | |
| is_merged = "metadata" in sample and "question" in sample and "answer" in sample | |
| if is_prediction_only: | |
| # Validate prediction-only format | |
| if "qa_type" not in sample: | |
| return False, "Missing required field: 'qa_type'", False | |
| # Check qa_type is valid | |
| valid_qa_types = ["tal", "stg", "next_action", "dense_captioning", "video_summary", "region_caption", | |
| "skill_assessment", "cvs_assessment"] | |
| qa_type = sample.get("qa_type", "") | |
| if not any(valid in qa_type for valid in valid_qa_types): | |
| return False, f"Invalid qa_type: {qa_type}", False | |
| # Check if file has reasonable number of samples (relaxed requirement) | |
| if len(records) < 100: | |
| return False, f"Too few samples ({len(records)}). Need at least 100 samples.", False | |
| return True, f"β Valid predictions file (prediction-only format) with {len(records)} samples", False | |
| elif is_merged: | |
| # Validate merged format | |
| if "qa_type" not in sample: | |
| return False, "Missing required field: 'qa_type'", False | |
| # Check if file has reasonable number of samples (relaxed requirement) | |
| if len(records) < 100: | |
| return False, f"Too few samples ({len(records)}). Need at least 100 samples.", False | |
| return True, f"β Valid predictions file (merged format) with {len(records)} samples", False | |
| else: | |
| return False, "Invalid format: Must be either prediction-only (id, qa_type, prediction) or merged format (metadata, question, answer)", False | |
| except json.JSONDecodeError as e: | |
| return False, f"Invalid JSON: {str(e)}", False | |
| except Exception as e: | |
| return False, f"Error validating file: {str(e)}", False | |
| def extract_metrics_from_evaluation_output(file_path: str) -> Tuple[bool, Dict, str]: | |
| """ | |
| Extract metrics directly from pre-processed evaluation output. | |
| Returns: | |
| (success, metrics_dict, message) | |
| """ | |
| try: | |
| with open(file_path, 'r') as f: | |
| data = json.load(f) | |
| if not isinstance(data, dict) or "aggregated_results" not in data: | |
| return False, {}, "Invalid evaluation output structure" | |
| aggregated = data["aggregated_results"] | |
| metrics = {} | |
| # Extract metrics from aggregated results | |
| # The structure varies by task, so we need to handle each carefully | |
| # TAL: Extract mIoU@0.3 and mIoU@0.5 | |
| if "tal" in aggregated or "TAL" in aggregated: | |
| tal_results = aggregated.get("tal", aggregated.get("TAL", {})) | |
| if "overall" in tal_results: | |
| overall = tal_results["overall"] | |
| metrics["tag_miou_03"] = overall.get("meanIoU@0.3", 0.0) | |
| metrics["tag_miou_05"] = overall.get("meanIoU@0.5", 0.0) | |
| # STG: Extract mIoU | |
| if "stg" in aggregated or "STG" in aggregated: | |
| stg_results = aggregated.get("stg", aggregated.get("STG", {})) | |
| if "overall" in stg_results: | |
| overall = stg_results["overall"] | |
| metrics["stg_miou"] = overall.get("mean_iou", 0.0) | |
| # Next Action: Extract accuracy | |
| if "next_action" in aggregated or "NEXT_ACTION" in aggregated: | |
| nap_results = aggregated.get("next_action", aggregated.get("NEXT_ACTION", {})) | |
| if "overall" in nap_results: | |
| overall = nap_results["overall"] | |
| metrics["nap_acc"] = overall.get("accuracy", 0.0) | |
| # DVC: Extract caption_score and temporal_f1 | |
| if "dvc" in aggregated or "DVC" in aggregated or "dense_captioning" in aggregated: | |
| dvc_results = aggregated.get("dvc", aggregated.get("DVC", aggregated.get("dense_captioning", {}))) | |
| if "overall" in dvc_results: | |
| overall = dvc_results["overall"] | |
| metrics["dvc_llm"] = overall.get("caption_score", 0.0) | |
| metrics["dvc_f1"] = overall.get("temporal_f1", 0.0) | |
| # VS: Extract LLM score | |
| if "vs" in aggregated or "VS" in aggregated or "video_summary" in aggregated: | |
| vs_results = aggregated.get("vs", aggregated.get("VS", aggregated.get("video_summary", {}))) | |
| if "overall" in vs_results: | |
| overall = vs_results["overall"] | |
| # Try multiple possible field names | |
| metrics["vs_llm"] = overall.get("score", overall.get("average_score", overall.get("caption_score", 0.0))) | |
| # RC: Extract LLM score | |
| if "rc" in aggregated or "RC" in aggregated or "region_caption" in aggregated: | |
| rc_results = aggregated.get("rc", aggregated.get("RC", aggregated.get("region_caption", {}))) | |
| if "overall" in rc_results: | |
| overall = rc_results["overall"] | |
| # Try multiple possible field names | |
| metrics["rc_llm"] = overall.get("score", overall.get("average_score", overall.get("caption_score", 0.0))) | |
| # Skill Assessment: Extract accuracy | |
| if "skill_assessment" in aggregated or "SKILL" in aggregated: | |
| sa_results = aggregated.get("skill_assessment", aggregated.get("SKILL", {})) | |
| if "overall" in sa_results: | |
| overall = sa_results["overall"] | |
| metrics["sa_acc"] = overall.get("accuracy", 0.0) | |
| # CVS Assessment: Extract accuracy | |
| if "cvs_assessment" in aggregated or "CVS" in aggregated: | |
| cvs_results = aggregated.get("cvs_assessment", aggregated.get("CVS", {})) | |
| if "overall" in cvs_results: | |
| overall = cvs_results["overall"] | |
| metrics["cvs_acc"] = overall.get("accuracy", 0.0) | |
| # Check if we got all 10 metrics | |
| if len(metrics) < 10: | |
| missing = [m for m in METRICS.keys() if m not in metrics] | |
| return False, metrics, f"Incomplete metrics extracted. Missing: {missing}" | |
| return True, metrics, "β Metrics extracted from pre-processed evaluation output" | |
| except Exception as e: | |
| return False, {}, f"Error extracting metrics: {str(e)}" | |
| def run_evaluation(results_file: str, model_name: str, has_precomputed_llm: bool = False, | |
| log_callback=None) -> Tuple[bool, Dict, str]: | |
| """ | |
| Run evaluation using evaluate_predictions.py wrapper. | |
| Handles both prediction-only and merged formats automatically. | |
| Args: | |
| results_file: Path to predictions JSON (either format) | |
| model_name: Name of the model | |
| has_precomputed_llm: Not used (kept for compatibility) | |
| log_callback: Optional callback function(line) to stream logs | |
| Returns: | |
| (success, metrics_dict, message) | |
| """ | |
| try: | |
| # Create output directory for this submission | |
| output_dir = RESULTS_DIR / model_name.replace(" ", "_") | |
| output_dir.mkdir(exist_ok=True) | |
| # Save user file | |
| input_file = output_dir / "input.json" | |
| shutil.copy(results_file, input_file) | |
| # Use evaluate_predictions.py wrapper which handles both formats | |
| eval_wrapper = Path("evaluation/evaluate_predictions.py") | |
| cmd = [ | |
| sys.executable, | |
| str(eval_wrapper), | |
| str(input_file), | |
| "--grouping", "overall", | |
| "--ground-truth", "data/ground_truth.json" | |
| ] | |
| print("=" * 60) | |
| print("RUNNING EVALUATION") | |
| print("=" * 60) | |
| print(f"Command: {' '.join(cmd)}") | |
| # Run with real-time output streaming | |
| process = subprocess.Popen( | |
| cmd, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, | |
| text=True, | |
| bufsize=1 # Line buffered | |
| ) | |
| # Capture output while streaming to callback | |
| output_lines = [] | |
| for line in process.stdout: | |
| line = line.rstrip() | |
| output_lines.append(line) | |
| print(line) # Print to server logs | |
| # Stream to callback if provided | |
| if log_callback and line.strip(): | |
| log_callback(line) | |
| # Wait for completion | |
| process.wait(timeout=600) | |
| if process.returncode != 0: | |
| full_output = '\n'.join(output_lines) | |
| return False, {}, f"Evaluation failed (exit code {process.returncode})" | |
| # Parse output to extract metrics | |
| full_output = '\n'.join(output_lines) | |
| metrics = parse_evaluation_output(full_output) | |
| # Save evaluation output | |
| with open(output_dir / "eval_output.txt", 'w') as f: | |
| f.write(full_output) | |
| print(f"β Evaluation completed") | |
| print("=" * 60) | |
| return True, metrics, "β Evaluation completed successfully" | |
| except subprocess.TimeoutExpired: | |
| if process: | |
| process.kill() | |
| return False, {}, "Evaluation timed out (>10 minutes)" | |
| except Exception as e: | |
| return False, {}, f"Error running evaluation: {str(e)}" | |
| def parse_evaluation_output(output: str) -> Dict[str, float]: | |
| """ | |
| Parse evaluation output to extract 10 metrics. | |
| Returns dict with keys: | |
| cvs_acc, nap_acc, sa_acc, stg_miou, | |
| tag_miou_03, tag_miou_05, dvc_llm, dvc_f1, vs_llm, rc_llm | |
| """ | |
| metrics = {} | |
| lines = output.split('\n') | |
| current_task = None | |
| current_iou_section = None # Track IoU_0.3 or IoU_0.5 sections for TAL | |
| for i, line in enumerate(lines): | |
| line = line.strip() | |
| # Detect task headers | |
| if "TAL" in line and "Overall" in line: | |
| current_task = "tal" | |
| elif "STG" in line and "Overall" in line: | |
| current_task = "stg" | |
| elif "NEXT_ACTION" in line and "Overall" in line or "Next Action" in line: | |
| current_task = "next_action" | |
| elif "DVC" in line and "Overall" in line or "Dense Video Captioning" in line: | |
| current_task = "dvc" | |
| elif "RC" in line and "Overall" in line or "Region Caption" in line: | |
| current_task = "rc" | |
| elif "VS" in line and "Overall" in line or "Video Summary" in line: | |
| current_task = "vs" | |
| elif "SKILL" in line and "Overall" in line or "Skill Assessment" in line: | |
| current_task = "skill_assessment" | |
| elif "CVS" in line and "Overall" in line or "CVS Assessment" in line: | |
| current_task = "cvs_assessment" | |
| # Detect IoU sections for TAL (new format) | |
| if current_task == "tal": | |
| if "IoU_0.3:" in line: | |
| current_iou_section = "0.3" | |
| elif "IoU_0.5:" in line: | |
| current_iou_section = "0.5" | |
| # Extract metrics based on task | |
| if current_task: | |
| # TAL: Extract both mIoU@0.3 and mIoU@0.5 | |
| if current_task == "tal": | |
| # Old format: meanIoU@0.3 or mIoU@0.3 | |
| if "meanIoU@0.3" in line or "mIoU@0.3" in line: | |
| try: | |
| value = float(line.split(":")[-1].strip()) | |
| metrics["tag_miou_03"] = value | |
| except: | |
| pass | |
| if "meanIoU@0.5" in line or "mIoU@0.5" in line: | |
| try: | |
| value = float(line.split(":")[-1].strip()) | |
| metrics["tag_miou_05"] = value | |
| except: | |
| pass | |
| # New format: IoU_0.3: section with meanIoU: | |
| if current_iou_section and "meanIoU:" in line and "meanIoU@" not in line: | |
| try: | |
| value = float(line.split(":")[-1].strip()) | |
| if current_iou_section == "0.3": | |
| metrics["tag_miou_03"] = value | |
| elif current_iou_section == "0.5": | |
| metrics["tag_miou_05"] = value | |
| except: | |
| pass | |
| # STG: Extract mIoU | |
| elif current_task == "stg" and ("mean_iou" in line.lower() or "miou" in line.lower()): | |
| try: | |
| value = float(line.split(":")[-1].strip()) | |
| metrics["stg_miou"] = value | |
| except: | |
| pass | |
| # Next Action: Extract accuracy | |
| elif current_task == "next_action" and "accuracy" in line.lower(): | |
| try: | |
| value = float(line.split(":")[-1].strip()) | |
| metrics["nap_acc"] = value | |
| except: | |
| pass | |
| # DVC: Extract both caption_score and temporal_f1 | |
| elif current_task == "dvc": | |
| if "caption_score" in line.lower() or "caption score" in line.lower(): | |
| try: | |
| value = float(line.split(":")[-1].strip()) | |
| metrics["dvc_llm"] = value | |
| except: | |
| pass | |
| if "temporal_f1" in line.lower() or "temporal f1" in line.lower(): | |
| try: | |
| value = float(line.split(":")[-1].strip()) | |
| metrics["dvc_f1"] = value | |
| except: | |
| pass | |
| # VS: Extract LLM score | |
| elif current_task == "vs" and ("score" in line.lower() or "average" in line.lower()): | |
| try: | |
| value = float(line.split(":")[-1].strip()) | |
| metrics["vs_llm"] = value | |
| except: | |
| pass | |
| # RC: Extract LLM score | |
| elif current_task == "rc" and ("score" in line.lower() or "average" in line.lower()): | |
| try: | |
| value = float(line.split(":")[-1].strip()) | |
| metrics["rc_llm"] = value | |
| except: | |
| pass | |
| # Skill Assessment: Extract Overall Accuracy (not Aspect Balanced Accuracy) | |
| elif current_task == "skill_assessment" and "overall accuracy:" in line.lower() and "aspect" not in line.lower(): | |
| try: | |
| # Extract from "Overall Accuracy: 0.2437 (39/160)" | |
| value = float(line.split(":")[1].split("(")[0].strip()) | |
| metrics["sa_acc"] = value | |
| except: | |
| pass | |
| # CVS Assessment: Extract accuracy (not component_balanced_accuracy) | |
| elif current_task == "cvs_assessment" and "accuracy:" in line and "component_balanced" not in line: | |
| try: | |
| value = float(line.split(":")[-1].strip()) | |
| metrics["cvs_acc"] = value | |
| except: | |
| pass | |
| return metrics | |
| def submit_model(file, model_name: str, organization: str, contact: str = "", progress=gr.Progress()): | |
| """ | |
| Process model submission: validate, evaluate, and add to leaderboard. | |
| Yields progress updates during evaluation. | |
| Returns: | |
| (success, message) | |
| """ | |
| # Validation | |
| if not file: | |
| yield "β Please upload a results file" | |
| return | |
| if not model_name or not organization: | |
| yield "β Please provide both model name and organization" | |
| return | |
| # Step 1: Check if model exists | |
| progress(0.05, desc="Checking model name...") | |
| yield "π **Step 1/6**: Checking if model name is available..." | |
| df = load_leaderboard() | |
| if model_name in df['model_name'].values: | |
| yield f"β Model '{model_name}' already exists in leaderboard. Please use a different name." | |
| return | |
| # Step 2: Validate file format | |
| progress(0.15, desc="Validating file format...") | |
| yield "π **Step 2/6**: Validating predictions file format..." | |
| valid, msg, has_precomputed_llm = validate_results_file(file.name) | |
| if not valid: | |
| yield f"β Invalid results file: {msg}" | |
| return | |
| yield f"β {msg}" | |
| # Step 3: Run evaluation with real-time log streaming | |
| progress(0.25, desc="Running evaluation...") | |
| import time | |
| # Start evaluation | |
| eval_wrapper = Path("evaluation/evaluate_predictions.py") | |
| output_dir = RESULTS_DIR / model_name.replace(" ", "_") | |
| output_dir.mkdir(exist_ok=True) | |
| input_file = output_dir / "input.json" | |
| shutil.copy(file.name, input_file) | |
| cmd = [ | |
| sys.executable, | |
| "-u", # Unbuffered output | |
| str(eval_wrapper), | |
| str(input_file), | |
| "--grouping", "overall", | |
| "--ground-truth", str(GROUND_TRUTH_FILE), | |
| "--skip-llm-judge" # Skip DVC/VS/RC for faster testing | |
| ] | |
| process = subprocess.Popen( | |
| cmd, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, | |
| text=True, | |
| bufsize=1, | |
| env={**os.environ, "PYTHONUNBUFFERED": "1"} # Force unbuffered | |
| ) | |
| yield "βοΈ **Step 3/6**: Running evaluation (streaming logs)...\n\n```\nStarting evaluation subprocess...\n```" | |
| log_buffer = [] | |
| last_update = time.time() | |
| last_heartbeat = time.time() | |
| line_count = 0 | |
| start_time = time.time() | |
| # Non-blocking read with select (for timeout handling) | |
| import select | |
| while True: | |
| # Check if process has finished | |
| if process.poll() is not None: | |
| # Process finished, read any remaining output | |
| remaining = process.stdout.read() | |
| if remaining: | |
| for line in remaining.split('\n'): | |
| line = line.rstrip() | |
| if line.strip() and 'WARNING: All log messages' not in line: | |
| log_buffer.append(line) | |
| break | |
| # Check for available output (with timeout) | |
| ready, _, _ = select.select([process.stdout], [], [], 0.5) | |
| if ready: | |
| # Read available line | |
| line = process.stdout.readline() | |
| if not line: | |
| break | |
| line = line.rstrip() | |
| if not line.strip(): | |
| continue | |
| # Filter noise | |
| if 'WARNING: All log messages' in line: | |
| continue | |
| log_buffer.append(line) | |
| line_count += 1 | |
| last_heartbeat = time.time() | |
| # Update UI every 0.5 seconds | |
| if time.time() - last_update > 0.5: | |
| # Show heartbeat if no logs yet | |
| if not log_buffer: | |
| elapsed = int(time.time() - start_time) | |
| log_text = f"βοΈ **Step 3/6**: Running evaluation...\n\n```\nWaiting for evaluation output... ({elapsed}s elapsed)\n```" | |
| yield log_text | |
| else: | |
| # Show last 25 lines | |
| recent = log_buffer[-25:] | |
| log_text = "βοΈ **Step 3/6**: Running evaluation...\n\n```\n" | |
| log_text += '\n'.join(recent) | |
| log_text += "\n```" | |
| yield log_text | |
| last_update = time.time() | |
| # Increment progress gradually from 25% to 75% | |
| # Assume ~500 lines of output, increment by 0.1% per line | |
| progress_increment = min(0.75, 0.25 + (line_count / 500) * 0.50) | |
| progress(progress_increment, desc="Running evaluation...") | |
| # Wait for process to fully complete | |
| process.wait() | |
| # Final log display | |
| if log_buffer: | |
| final_logs = log_buffer[-30:] | |
| log_text = "βοΈ **Step 3/6**: Evaluation completed\n\n```\n" | |
| log_text += '\n'.join(final_logs) | |
| log_text += "\n```" | |
| yield log_text | |
| # Save output | |
| with open(output_dir / "eval_output.txt", 'w') as f: | |
| f.write('\n'.join(log_buffer)) | |
| # Check if evaluation succeeded | |
| if process.returncode != 0: | |
| yield f"\nβ Evaluation failed (exit code {process.returncode})" | |
| return | |
| # Parse metrics from output | |
| full_output = '\n'.join(log_buffer) | |
| metrics = parse_evaluation_output(full_output) | |
| if not metrics: | |
| yield f"\nβ Failed to parse evaluation metrics" | |
| return | |
| # Step 4: Check metrics | |
| progress(0.80, desc="Validating metrics...") | |
| yield "β Evaluation completed!" | |
| yield "π **Step 4/6**: Validating extracted metrics..." | |
| # Fill missing metrics with 0 (allow partial submissions) | |
| caption_metrics = ['dvc_llm', 'dvc_f1', 'vs_llm', 'rc_llm'] | |
| missing_metrics = [m for m in METRICS.keys() if m not in metrics] | |
| # Separate caption and other metrics | |
| missing_caption = [m for m in missing_metrics if m in caption_metrics] | |
| missing_other = [m for m in missing_metrics if m not in caption_metrics] | |
| # Fill all missing metrics with 0.0 (allow partial submissions) | |
| if missing_caption: | |
| for metric in missing_caption: | |
| metrics[metric] = 0.0 | |
| yield f"β οΈ Skipped caption tasks, setting to 0: {missing_caption}" | |
| if missing_other: | |
| for metric in missing_other: | |
| metrics[metric] = 0.0 | |
| yield f"β οΈ Missing tasks (setting to 0): {missing_other}" | |
| yield f" Note: Partial submissions are allowed. Missing tasks will show as 0.0." | |
| computed_metrics = [m for m in METRICS.keys() if m in metrics and metrics[m] > 0] | |
| yield f"β Computed {len(computed_metrics)}/10 metrics (remaining set to 0.0)" | |
| # Step 5: Add to leaderboard | |
| progress(0.90, desc="Adding to leaderboard...") | |
| yield "π **Step 5/6**: Adding model to leaderboard..." | |
| new_entry = { | |
| "model_name": model_name, | |
| "organization": organization, | |
| **{metric: round(metrics.get(metric, 0.0), 4) for metric in METRICS.keys()}, | |
| "date": datetime.now().strftime("%Y-%m-%d"), | |
| "contact": contact | |
| } | |
| df = pd.concat([df, pd.DataFrame([new_entry])], ignore_index=True) | |
| # Sort by first metric (CVS_acc) | |
| df = df.sort_values('cvs_acc', ascending=False).reset_index(drop=True) | |
| save_leaderboard(df) | |
| yield "β Leaderboard updated!" | |
| # Backup results to private repo (non-blocking) | |
| backup_results_to_repo(model_name.replace(" ", "_"), output_dir) | |
| # Step 6: Build success message | |
| progress(1.0, desc="Complete!") | |
| yield "β **Step 6/6**: Submission complete!" | |
| # Build final success message | |
| success_msg = f""" | |
| --- | |
| ## β Submission Successful! | |
| **Model**: {model_name} | |
| **Organization**: {organization} | |
| """ | |
| # Add note if LLM judge was skipped | |
| if has_precomputed_llm: | |
| success_msg += "\nπ **Note**: Used pre-computed LLM judge scores from struc_info (skipped re-evaluation of DVC/VS/RC)\n" | |
| else: | |
| success_msg += "\nβοΈ **Note**: Full evaluation completed (including LLM judge for DVC/VS/RC)\n" | |
| success_msg += "\n### π Metric Scores\n" | |
| for metric_key, metric_info in METRICS.items(): | |
| score = metrics.get(metric_key, 0.0) | |
| success_msg += f"- **{metric_info['name']}**: {score:.4f}\n" | |
| rank = df[df['model_name'] == model_name].index[0] + 1 | |
| success_msg += f"\n### π Ranking\n**Rank**: #{rank} out of {len(df)} models\n" | |
| success_msg += "\nRefresh the Leaderboard tab to see your model's position!" | |
| yield success_msg | |
| def format_leaderboard_display(df: pd.DataFrame) -> pd.DataFrame: | |
| """Format leaderboard dataframe for display with 10 metrics (no average).""" | |
| if df.empty: | |
| return df | |
| # Remove 'average' column if it exists (from old format) | |
| if 'average' in df.columns: | |
| df = df.drop('average', axis=1) | |
| # Create display dataframe with selected columns (no average) | |
| display_cols = ["rank", "model_name", "organization"] | |
| # Add metric columns in order | |
| for metric_key in METRICS.keys(): | |
| if metric_key in df.columns: | |
| display_cols.append(metric_key) | |
| # Add date and contact | |
| display_cols.extend(["date", "contact"]) | |
| # Filter to only existing columns | |
| display_cols = [col for col in display_cols if col in df.columns] | |
| # Rename columns for display | |
| display_df = df[display_cols].copy() | |
| # Build column names | |
| column_names = [] | |
| for col in display_cols: | |
| if col == "rank": | |
| column_names.append("Rank") | |
| elif col == "model_name": | |
| column_names.append("Model") | |
| elif col == "organization": | |
| column_names.append("Organization") | |
| elif col == "date": | |
| column_names.append("Date") | |
| elif col == "contact": | |
| column_names.append("Contact") | |
| elif col in METRICS: | |
| column_names.append(METRICS[col]["name"]) | |
| else: | |
| column_names.append(col) | |
| display_df.columns = column_names | |
| return display_df | |
| def check_needs_llm_judge(model_name: str) -> Tuple[bool, str]: | |
| """ | |
| Check if a model needs LLM judge evaluation. | |
| Returns: | |
| (needs_llm_judge, message) | |
| """ | |
| df = load_leaderboard() | |
| if model_name not in df['model_name'].values: | |
| return False, f"Model '{model_name}' not found" | |
| model_row = df[df['model_name'] == model_name].iloc[0] | |
| # Check if all three caption metrics are zero | |
| dvc_llm = model_row.get('dvc_llm', 0.0) | |
| vs_llm = model_row.get('vs_llm', 0.0) | |
| rc_llm = model_row.get('rc_llm', 0.0) | |
| if dvc_llm == 0.0 and vs_llm == 0.0 and rc_llm == 0.0: | |
| return True, "All caption metrics are 0.0, can run LLM judge" | |
| else: | |
| return False, "Caption metrics already computed" | |
| def check_llm_judge_status(model_name: str) -> Tuple[str, str]: | |
| """ | |
| Check the status of an ongoing LLM judge evaluation. | |
| Returns: | |
| (status, message) | |
| status: 'not_started', 'running', 'completed', 'failed' | |
| """ | |
| model_dir = RESULTS_DIR / model_name.replace(" ", "_") | |
| status_file = model_dir / "llm_judge_status.json" | |
| if not status_file.exists(): | |
| return 'not_started', 'No LLM judge evaluation in progress' | |
| try: | |
| with open(status_file, 'r') as f: | |
| status_data = json.load(f) | |
| status = status_data.get('status', 'not_started') | |
| progress = status_data.get('progress', '') | |
| timestamp = status_data.get('timestamp', '') | |
| if status == 'running': | |
| return 'running', f"Evaluation in progress: {progress}\nStarted: {timestamp}" | |
| elif status == 'completed': | |
| return 'completed', f"Evaluation completed: {timestamp}" | |
| elif status == 'failed': | |
| error = status_data.get('error', 'Unknown error') | |
| return 'failed', f"Evaluation failed: {error}" | |
| else: | |
| return 'not_started', 'No evaluation in progress' | |
| except Exception as e: | |
| return 'not_started', f"Error reading status: {e}" | |
| def update_llm_judge_status(model_name: str, status: str, progress: str = "", error: str = ""): | |
| """Update the LLM judge evaluation status file.""" | |
| model_dir = RESULTS_DIR / model_name.replace(" ", "_") | |
| status_file = model_dir / "llm_judge_status.json" | |
| status_data = { | |
| 'status': status, | |
| 'progress': progress, | |
| 'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
| } | |
| if error: | |
| status_data['error'] = error | |
| with open(status_file, 'w') as f: | |
| json.dump(status_data, f, indent=2) | |
| def run_llm_judge_evaluation(model_name: str, progress=gr.Progress()) -> str: | |
| """ | |
| Start LLM judge evaluation in the background for DVC/VS/RC tasks. | |
| This function: | |
| 1. Validates the model and checks if evaluation is needed | |
| 2. Starts background evaluation process (can close browser) | |
| 3. Returns immediately with status information | |
| Args: | |
| model_name: Name of the model to re-evaluate | |
| progress: Gradio progress tracker | |
| Returns: | |
| Status message (markdown) | |
| """ | |
| try: | |
| # Check if model exists and needs LLM judge | |
| needs_llm, msg = check_needs_llm_judge(model_name) | |
| if not needs_llm: | |
| return f"β {msg}" | |
| # Check if evaluation is already running | |
| status, status_msg = check_llm_judge_status(model_name) | |
| if status == 'running': | |
| return f"β³ **Evaluation Already Running**\n\n{status_msg}\n\nCheck status by refreshing or clicking 'Check Status' button." | |
| elif status == 'completed': | |
| return f"β **Already Completed**\n\n{status_msg}\n\nRefresh the leaderboard to see results." | |
| progress(0.1, desc="Validating...") | |
| yield f"π **Validation**: Checking model predictions...\n\n" | |
| # Find the predictions file - try local first, then download from repo | |
| model_dir = RESULTS_DIR / model_name.replace(" ", "_") | |
| model_dir.mkdir(parents=True, exist_ok=True) | |
| input_file = model_dir / "input.json" | |
| if not input_file.exists(): | |
| # Try to download from private HuggingFace repo | |
| try: | |
| token = os.environ.get('HF_TOKEN') | |
| if token: | |
| yield f"β³ Downloading predictions from private repository...\n\n" | |
| from huggingface_hub import hf_hub_download | |
| # Download the predictions file | |
| predictions_path = hf_hub_download( | |
| repo_id="UIIAmerica/MedVidBench-GroundTruth", | |
| filename=f"results/{model_name.replace(' ', '_')}/input.json", | |
| repo_type="dataset", | |
| token=token, | |
| cache_dir="./cache" | |
| ) | |
| # Copy to local results directory | |
| import shutil | |
| shutil.copy(predictions_path, input_file) | |
| yield f"β Downloaded predictions from repository\n\n" | |
| else: | |
| yield f"β Predictions file not found locally and HF_TOKEN not available\n" | |
| yield f" Looked for: {input_file}\n" | |
| return | |
| except Exception as e: | |
| yield f"β Predictions file not found: {input_file}\n" | |
| yield f" Also failed to download from repository: {e}\n" | |
| return | |
| else: | |
| yield f"β Found predictions file locally\n\n" | |
| # Update status to running | |
| update_llm_judge_status(model_name, 'running', 'Starting evaluation...') | |
| # Start background process | |
| progress(0.2, desc="Starting background evaluation...") | |
| yield f"π **Starting Background Evaluation**\n\n" | |
| yield f"β³ This will take 10-20 minutes depending on API rate limits\n\n" | |
| yield f"β **You can close this browser tab** - evaluation runs in background\n\n" | |
| eval_wrapper = Path("evaluation/evaluate_predictions.py") | |
| log_file = model_dir / "eval_llm_judge_log.txt" | |
| # Build command for background execution | |
| cmd = [ | |
| sys.executable, | |
| "-u", | |
| str(eval_wrapper), | |
| str(input_file), | |
| "--grouping", "overall", | |
| "--ground-truth", str(GROUND_TRUTH_FILE) | |
| # NOTE: No --skip-llm-judge flag, so LLM judge will run | |
| ] | |
| # Start process in background (detached) | |
| with open(log_file, 'w') as log_f: | |
| log_f.write(f"Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") | |
| log_f.write(f"Command: {' '.join(cmd)}\n") | |
| log_f.write("="*60 + "\n\n") | |
| # Launch background process that continues after app closes | |
| process = subprocess.Popen( | |
| cmd, | |
| stdout=open(log_file, 'a'), | |
| stderr=subprocess.STDOUT, | |
| text=True, | |
| env={**os.environ, "PYTHONUNBUFFERED": "1"}, | |
| start_new_session=True # Detach from parent process | |
| ) | |
| # Save PID for tracking | |
| pid_file = model_dir / "llm_judge_pid.txt" | |
| with open(pid_file, 'w') as f: | |
| f.write(str(process.pid)) | |
| progress(0.5, desc="Background process started...") | |
| success_msg = f""" | |
| --- | |
| ## β Background Evaluation Started! | |
| **Model**: {model_name} | |
| **Process ID**: {process.pid} | |
| **Started**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | |
| ### β³ Evaluation Progress | |
| The evaluation is now running in the background. This will take approximately 10-20 minutes. | |
| ### π What's Happening | |
| 1. βοΈ Running LLM judge on DVC/VS/RC tasks | |
| 2. π Using GPT-4 with retry logic (up to 5 attempts per sample) | |
| 3. π Will automatically update leaderboard when complete | |
| ### β You Can Now: | |
| - β **Close this browser tab** - evaluation continues running | |
| - β Come back later and check status using "Check Status" button | |
| - β Refresh the leaderboard in 10-20 minutes to see results | |
| ### π Check Status Later | |
| 1. Enter the same model name: `{model_name}` | |
| 2. Click "Check Status" button | |
| 3. Or refresh the leaderboard to see if metrics are updated | |
| ### π Logs | |
| Evaluation logs are being written to: | |
| `{log_file}` | |
| """ | |
| yield success_msg | |
| # Start background monitor thread to update status and leaderboard when complete | |
| import threading | |
| def monitor_and_update(): | |
| """Monitor background process and update leaderboard when complete.""" | |
| try: | |
| # Wait for process to complete | |
| process.wait() | |
| # Read final output | |
| with open(log_file, 'r') as f: | |
| full_output = f.read() | |
| if process.returncode == 0: | |
| # Parse metrics | |
| metrics = parse_evaluation_output(full_output) | |
| dvc_llm = metrics.get('dvc_llm', 0.0) | |
| vs_llm = metrics.get('vs_llm', 0.0) | |
| rc_llm = metrics.get('rc_llm', 0.0) | |
| if dvc_llm > 0.0 or vs_llm > 0.0 or rc_llm > 0.0: | |
| # Update leaderboard | |
| df = load_leaderboard() | |
| df.loc[df['model_name'] == model_name, 'dvc_llm'] = round(dvc_llm, 4) | |
| df.loc[df['model_name'] == model_name, 'vs_llm'] = round(vs_llm, 4) | |
| df.loc[df['model_name'] == model_name, 'rc_llm'] = round(rc_llm, 4) | |
| df = df.sort_values('cvs_acc', ascending=False).reset_index(drop=True) | |
| save_leaderboard(df) | |
| # Update status to completed | |
| update_llm_judge_status( | |
| model_name, | |
| 'completed', | |
| f"DVC: {dvc_llm:.4f}, VS: {vs_llm:.4f}, RC: {rc_llm:.4f}" | |
| ) | |
| else: | |
| update_llm_judge_status(model_name, 'failed', 'Failed to extract metrics') | |
| else: | |
| update_llm_judge_status(model_name, 'failed', f'Exit code {process.returncode}') | |
| except Exception as e: | |
| update_llm_judge_status(model_name, 'failed', str(e)) | |
| # Start monitor thread (daemon so it doesn't block app shutdown) | |
| monitor_thread = threading.Thread(target=monitor_and_update, daemon=True) | |
| monitor_thread.start() | |
| except Exception as e: | |
| update_llm_judge_status(model_name, 'failed', str(e)) | |
| yield f"β Error starting LLM judge evaluation: {str(e)}" | |
| def check_llm_judge_evaluation_status(model_name: str) -> str: | |
| """Check and display status of LLM judge evaluation.""" | |
| if not model_name or not model_name.strip(): | |
| return "β Please enter a model name" | |
| status, msg = check_llm_judge_status(model_name.strip()) | |
| if status == 'not_started': | |
| return f"βΉοΈ **No Evaluation Running**\n\n{msg}" | |
| elif status == 'running': | |
| model_dir = RESULTS_DIR / model_name.replace(" ", "_") | |
| log_file = model_dir / "eval_llm_judge_log.txt" | |
| # Read last 30 lines of log | |
| try: | |
| with open(log_file, 'r') as f: | |
| lines = f.readlines() | |
| recent_lines = lines[-30:] | |
| log_preview = ''.join(recent_lines) | |
| return f""" | |
| ## β³ Evaluation Running | |
| **Model**: {model_name} | |
| **Status**: {msg} | |
| ### π Recent Logs (last 30 lines) | |
| ``` | |
| {log_preview} | |
| ``` | |
| **Note**: Refresh this page or click "Check Status" again for updates. | |
| """ | |
| except Exception as e: | |
| return f"β³ **Evaluation Running**\n\n{msg}\n\nβ οΈ Unable to read logs: {e}" | |
| elif status == 'completed': | |
| # Check if leaderboard was updated | |
| df = load_leaderboard() | |
| if model_name in df['model_name'].values: | |
| row = df[df['model_name'] == model_name].iloc[0] | |
| dvc = row.get('dvc_llm', 0.0) | |
| vs = row.get('vs_llm', 0.0) | |
| rc = row.get('rc_llm', 0.0) | |
| return f""" | |
| ## β Evaluation Complete! | |
| **Model**: {model_name} | |
| **Completed**: {msg} | |
| ### π Caption Metrics | |
| - **DVC_llm**: {dvc:.4f} | |
| - **VS_llm**: {vs:.4f} | |
| - **RC_llm**: {rc:.4f} | |
| β Leaderboard has been updated! | |
| Refresh the Leaderboard tab to see updated rankings. | |
| """ | |
| else: | |
| return f"β **Evaluation Complete**\n\n{msg}\n\nβ οΈ Model not found in leaderboard" | |
| elif status == 'failed': | |
| return f""" | |
| ## β Evaluation Failed | |
| **Model**: {model_name} | |
| **Error**: {msg} | |
| Please check the logs or try running the evaluation again. | |
| """ | |
| return f"βΉοΈ **Status**: {status}\n\n{msg}" | |
| # Create Gradio interface | |
| with gr.Blocks(title="MedVidBench Leaderboard", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown(""" | |
| # π₯ MedVidBench Leaderboard | |
| Interactive leaderboard for evaluating **Video-Language Models** on the **MedVidBench benchmark** - | |
| 8 medical video understanding tasks across 8 surgical datasets. | |
| π **Paper**: [MedGRPO: Multi-Task Reinforcement Learning for Heterogeneous Medical Video Understanding](https://arxiv.org/abs/2512.06581) | |
| π **Project**: [yuhaosu.github.io/MedGRPO](https://yuhaosu.github.io/MedGRPO/) | |
| πΎ **Dataset**: [huggingface.co/datasets/UIIAmerica/MedVidBench](https://huggingface.co/datasets/UIIAmerica/MedVidBench) | |
| π» **GitHub**: [github.com/YuhaoSu/MedGRPO](https://github.com/YuhaoSu/MedGRPO) | |
| """) | |
| with gr.Tabs(): | |
| # Tab 1: Leaderboard | |
| with gr.Tab("π Leaderboard"): | |
| gr.Markdown(""" | |
| ### Current Rankings | |
| The leaderboard displays all submitted models ranked by their performance across 10 metrics on 8 medical video understanding tasks. | |
| **Note**: Models with all caption metrics (DVC_llm, VS_llm, RC_llm) at 0.0 can be re-evaluated with LLM judge using the section below. | |
| """) | |
| def load_and_format_leaderboard(): | |
| """Load and format leaderboard with empty state handling.""" | |
| df = load_leaderboard() | |
| if df.empty: | |
| # Return empty dataframe with column headers | |
| columns = ["rank", "model_name", "organization"] + list(METRICS.keys()) + ["date", "contact"] | |
| return pd.DataFrame(columns=columns) | |
| return format_leaderboard_display(df) | |
| leaderboard_table = gr.Dataframe( | |
| value=load_and_format_leaderboard(), | |
| interactive=False, | |
| wrap=True, | |
| label="Leaderboard Rankings" | |
| ) | |
| with gr.Row(): | |
| refresh_btn = gr.Button("π Refresh Leaderboard", size="sm") | |
| status_text = gr.Markdown("", elem_classes="status-text") | |
| def refresh_leaderboard(): | |
| """Refresh leaderboard and return status message.""" | |
| df = load_and_format_leaderboard() | |
| num_models = len(df) if not df.empty else 0 | |
| status = f"β Loaded {num_models} model(s)" if num_models > 0 else "No submissions yet" | |
| return df, status | |
| refresh_btn.click( | |
| fn=refresh_leaderboard, | |
| outputs=[leaderboard_table, status_text] | |
| ) | |
| # Auto-load on page load | |
| demo.load( | |
| fn=refresh_leaderboard, | |
| outputs=[leaderboard_table, status_text] | |
| ) | |
| # LLM Judge Evaluation Section | |
| gr.Markdown(""" | |
| --- | |
| ### π€ Run LLM Judge Evaluation | |
| If a model was submitted with `--skip-llm-judge` (caption metrics are 0.0), you can run LLM judge evaluation here. | |
| This will compute DVC_llm, VS_llm, and RC_llm scores using GPT-4.1/Gemini. | |
| **β Background Execution**: The evaluation runs in the background - you can close the browser and come back later! | |
| **Note**: This feature is only available when ALL three caption metrics (DVC_llm, VS_llm, RC_llm) are 0.0. | |
| """) | |
| with gr.Row(): | |
| llm_judge_model_input = gr.Textbox( | |
| label="Model Name", | |
| placeholder="Enter exact model name from leaderboard", | |
| scale=3 | |
| ) | |
| with gr.Column(scale=1): | |
| run_llm_judge_btn = gr.Button("π Start Evaluation", variant="primary") | |
| check_status_btn = gr.Button("π Check Status", variant="secondary") | |
| llm_judge_output = gr.Markdown(label="Evaluation Status") | |
| # Wire up LLM judge evaluation | |
| run_llm_judge_btn.click( | |
| fn=run_llm_judge_evaluation, | |
| inputs=[llm_judge_model_input], | |
| outputs=llm_judge_output | |
| ) | |
| # Wire up status check | |
| check_status_btn.click( | |
| fn=check_llm_judge_evaluation_status, | |
| inputs=[llm_judge_model_input], | |
| outputs=llm_judge_output | |
| ) | |
| # Tab 2: Submit | |
| with gr.Tab("π€ Submit Results"): | |
| gr.Markdown(""" | |
| ### Submit Your Model Results | |
| Upload your model's **predictions only** on the **MedVidBench test set (6,245 samples)** to be added to the leaderboard. | |
| #### π Requirements | |
| 1. **Run inference** on the full test set (download from [HuggingFace](https://huggingface.co/datasets/UIIAmerica/MedVidBench)) | |
| 2. **Upload predictions JSON** in the format below (NO ground truth needed) | |
| 3. **Provide model info** (name, organization) | |
| #### π Expected File Format | |
| Your predictions JSON should contain **6,245 samples** with this structure: | |
| ```json | |
| [ | |
| { | |
| "id": "video_id&&start&&end&&fps", | |
| "qa_type": "tal", | |
| "prediction": "Your model's answer here" | |
| }, | |
| { | |
| "id": "another_video&&0&&10&&1.0", | |
| "qa_type": "video_summary", | |
| "prediction": "The surgeon performs..." | |
| } | |
| ] | |
| ``` | |
| **Required fields**: | |
| - `id`: Sample identifier (matches test data from HuggingFace dataset) | |
| - `qa_type`: Task type (tal/stg/next_action/dense_captioning/video_summary/region_caption/skill_assessment/cvs_assessment) | |
| - `prediction`: Your model's answer (text output) | |
| **Important**: | |
| - β Submit **predictions only** (no ground truth needed) | |
| - β Must include all 6,245 test samples | |
| - β Format can be list or dict (dict values will be extracted) | |
| - β Do NOT include ground truth fields (server handles this securely) | |
| #### βοΈ Evaluation Process | |
| After upload, the system will: | |
| 1. **Validate** your predictions file format | |
| 2. **Merge** your predictions with server-side ground truth (private) | |
| 3. **Run evaluation** for all 8 tasks across 10 metrics | |
| 4. **Add to leaderboard** if successful | |
| **Evaluation takes**: ~5-10 minutes (includes LLM judge for caption quality assessment) | |
| **Security**: Ground truth data is stored privately and never exposed to users. | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| model_name_input = gr.Textbox( | |
| label="Model Name", | |
| placeholder="e.g., Qwen2.5-VL-7B-MedGRPO", | |
| info="Unique identifier for your model" | |
| ) | |
| org_input = gr.Textbox( | |
| label="Organization / Author", | |
| placeholder="e.g., University Name or Your Name", | |
| info="Who developed this model?" | |
| ) | |
| contact_input = gr.Textbox( | |
| label="Contact (Optional)", | |
| placeholder="email@example.com or github.com/username", | |
| info="For follow-up questions" | |
| ) | |
| with gr.Column(): | |
| results_file_input = gr.File( | |
| label="Upload Results JSON", | |
| file_types=[".json"], | |
| file_count="single" | |
| ) | |
| submit_btn = gr.Button("π Submit to Leaderboard", variant="primary", size="lg") | |
| submission_output = gr.Markdown(label="Submission Status") | |
| # Wire up submission with progress tracking | |
| submit_btn.click( | |
| fn=submit_model, | |
| inputs=[results_file_input, model_name_input, org_input, contact_input], | |
| outputs=submission_output | |
| ) | |
| # Tab 3: Tasks & Metrics | |
| with gr.Tab("π Tasks & Metrics"): | |
| gr.Markdown(""" | |
| ### MedVidBench Benchmark Tasks | |
| The benchmark evaluates models across **8 diverse tasks** spanning video, segment, and frame-level understanding: | |
| """) | |
| # Create tasks table | |
| tasks_data = [] | |
| for task_key, task_info in TASKS.items(): | |
| tasks_data.append({ | |
| "Task": task_info["name"], | |
| "Key": task_info["key"], | |
| "Metrics": task_info["metrics"], | |
| "Description": task_info["description"] | |
| }) | |
| tasks_df = pd.DataFrame(tasks_data) | |
| gr.Dataframe(value=tasks_df, interactive=False) | |
| gr.Markdown(""" | |
| ### Evaluation Metrics | |
| - **TAL** (Temporal Action Localization): **mAP@0.5** - mean Average Precision at IoU threshold 0.5 | |
| - **STG** (Spatiotemporal Grounding): **mIoU** - mean Intersection over Union (spatial + temporal) | |
| - **Next Action**: **Accuracy** - Classification accuracy | |
| - **DVC** (Dense Video Captioning): **LLM Judge** - GPT-4.1/Gemini scoring (average of top-5 aspects) | |
| - **VS** (Video Summary): **LLM Judge** - GPT-4.1/Gemini scoring (average of top-5 aspects) | |
| - **RC** (Region Caption): **LLM Judge** - GPT-4.1/Gemini scoring (average of top-5 aspects) | |
| - **Skill Assessment**: **Accuracy** - Surgical skill level classification (JIGSAWS) | |
| - **CVS Assessment**: **Accuracy** - Clinical variable scoring | |
| #### LLM Judge Details | |
| Caption tasks (DVC, VS, RC) use GPT-4.1 or Gemini-Pro with rubric-based scoring (1-5 scale) across 5 key aspects: | |
| - **R2**: Relevance & Medical Terminology | |
| - **R4**: Actionable Surgical Actions | |
| - **R5**: Comprehensive Detail Level | |
| - **R7**: Anatomical & Instrument Precision | |
| - **R8**: Clinical Context & Coherence | |
| The **final score** is the average across these 5 aspects. | |
| ### Test Set Statistics | |
| - **Total samples**: 6,245 | |
| - **Source datasets**: 8 (AVOS, CholecT50, CholecTrack20, Cholec80_CVS, CoPESD, EgoSurgery, NurViD, JIGSAWS) | |
| - **Video frames**: ~103,742 | |
| - **Task distribution**: | |
| - TAL: ~800 samples | |
| - STG: ~900 samples | |
| - Next Action: ~700 samples | |
| - DVC: ~800 samples | |
| - VS: ~900 samples | |
| - RC: ~1000 samples | |
| - Skill Assessment: ~600 samples | |
| - CVS Assessment: ~545 samples | |
| """) | |
| # Tab 4: About | |
| with gr.Tab("βΉοΈ About"): | |
| gr.Markdown(""" | |
| ### About MedVidBench | |
| **MedVidBench** is a comprehensive benchmark for evaluating Video-Language Models on medical and surgical video understanding. | |
| It was introduced in the **MedGRPO** paper (Multi-Task Reinforcement Learning for Heterogeneous Medical Video Understanding). | |
| #### Key Features | |
| - **8 diverse tasks** covering multiple levels of video understanding | |
| - **8 source datasets** from various surgical procedures | |
| - **6,245 test samples** with high-quality annotations | |
| - **Automatic evaluation** with standardized metrics | |
| - **LLM-based judging** for caption quality assessment | |
| #### Paper | |
| ```bibtex | |
| @article{su2024medgrpo, | |
| title={MedGRPO: Multi-Task Reinforcement Learning for Heterogeneous Medical Video Understanding}, | |
| author={Su, Yuhao and Choudhuri, Anwesa and Gao, Zhongpai and Planche, Benjamin and Nguyen, Van Nguyen and Zheng, Meng and Shen, Yuhan and Innanje, Arun and Chen, Terrence and Elhamifar, Ehsan and Wu, Ziyan}, | |
| journal={arXiv preprint arXiv:2512.06581}, | |
| year={2025} | |
| } | |
| ``` | |
| #### Links | |
| - π **Paper**: [https://arxiv.org/abs/2512.06581](https://arxiv.org/abs/2512.06581) | |
| - π **Project Page**: [https://yuhaosu.github.io/MedGRPO/](https://yuhaosu.github.io/MedGRPO/) | |
| - πΎ **Dataset**: [https://huggingface.co/datasets/UIIAmerica/MedVidBench](https://huggingface.co/datasets/UIIAmerica/MedVidBench) | |
| - π» **GitHub**: [https://github.com/YuhaoSu/MedGRPO](https://github.com/YuhaoSu/MedGRPO) | |
| - π **Leaderboard**: [https://huggingface.co/spaces/UIIAmerica/MedVidBench-Leaderboard](https://huggingface.co/spaces/UIIAmerica/MedVidBench-Leaderboard) | |
| #### Dataset | |
| The MedVidBench benchmark includes: | |
| - 21,060 training samples | |
| - 6,245 test samples | |
| - Multi-modal annotations (video, text, temporal spans, bounding boxes) | |
| - 8 source datasets covering various medical procedures | |
| #### License | |
| - **Dataset**: CC BY-NC-SA 4.0 (Non-commercial, Share-alike) | |
| - **Leaderboard Code**: Apache 2.0 | |
| - **Evaluation Scripts**: MIT | |
| #### Contact | |
| For questions or issues: | |
| - Open an issue on [GitHub](https://github.com/YuhaoSu/MedGRPO) | |
| - Visit the [project page](https://yuhaosu.github.io/MedGRPO/) | |
| - Email: [Contact via GitHub](https://github.com/YuhaoSu) | |
| """) | |
| # Tab 5: Admin Panel (Password Protected) | |
| with gr.Tab("π Admin"): | |
| gr.Markdown(""" | |
| ### Admin Panel | |
| This panel allows administrators to manage leaderboard submissions. | |
| **Features**: | |
| - View all submissions | |
| - Delete individual models | |
| - Cleanup test/dummy data | |
| **Note**: Admin password is set via `ADMIN_PASSWORD` environment variable in HuggingFace Spaces settings. | |
| """) | |
| # Password authentication | |
| with gr.Row(): | |
| admin_password_input = gr.Textbox( | |
| label="Admin Password", | |
| type="password", | |
| placeholder="Enter admin password", | |
| scale=3 | |
| ) | |
| login_btn = gr.Button("π Login", variant="primary", scale=1) | |
| login_status = gr.Markdown("", visible=True) | |
| # Admin panel (hidden by default, shown after successful login) | |
| with gr.Column(visible=False) as admin_panel: | |
| gr.Markdown("### π Current Submissions") | |
| # Table showing all submissions | |
| admin_table = gr.Dataframe( | |
| value=get_leaderboard_for_admin(), | |
| interactive=False, | |
| label="Leaderboard Entries", | |
| wrap=True | |
| ) | |
| with gr.Row(): | |
| refresh_admin_btn = gr.Button("π Refresh List", size="sm") | |
| delete_model_input = gr.Textbox( | |
| label="Model Name to Delete", | |
| placeholder="Enter exact model name", | |
| scale=2 | |
| ) | |
| delete_btn = gr.Button("ποΈ Delete Model", variant="stop", scale=1) | |
| delete_status = gr.Markdown("") | |
| gr.Markdown(""" | |
| --- | |
| ### β οΈ Deletion Instructions | |
| 1. **Find the model** in the table above (check the `model_name` column) | |
| 2. **Copy the exact model name** (case-sensitive) | |
| 3. **Paste it** in the "Model Name to Delete" field | |
| 4. **Click "Delete Model"** - this will: | |
| - Remove the entry from leaderboard | |
| - Delete all associated files (`/data/results/{model_name}/`, `/data/submissions/{model_name}/`) | |
| - Update rankings | |
| **β οΈ Warning**: Deletion is permanent and cannot be undone! | |
| ### π Security Notes | |
| - Set `ADMIN_PASSWORD` in HuggingFace Spaces β Settings β Secrets | |
| - Default password (if not set): `medvidben ch_admin_2025` (change this!) | |
| - Admin actions are not logged (consider adding audit trail) | |
| """) | |
| # Login handler | |
| def handle_login(password): | |
| if check_admin_password(password): | |
| return ( | |
| "β Login successful! Admin panel unlocked.", | |
| gr.update(visible=True), # Show admin panel | |
| get_leaderboard_for_admin() # Load initial data | |
| ) | |
| else: | |
| return ( | |
| "β Invalid password. Please try again.", | |
| gr.update(visible=False), # Keep admin panel hidden | |
| get_leaderboard_for_admin() | |
| ) | |
| login_btn.click( | |
| fn=handle_login, | |
| inputs=[admin_password_input], | |
| outputs=[login_status, admin_panel, admin_table] | |
| ) | |
| # Refresh handler | |
| def refresh_admin_table(): | |
| return get_leaderboard_for_admin() | |
| refresh_admin_btn.click( | |
| fn=refresh_admin_table, | |
| outputs=[admin_table] | |
| ) | |
| # Delete handler | |
| def handle_delete(model_name): | |
| if not model_name or not model_name.strip(): | |
| return "β Please enter a model name", get_leaderboard_for_admin() | |
| success, message = delete_model_submission(model_name.strip()) | |
| if success: | |
| # Refresh table after successful deletion | |
| return f"## β Deletion Successful\n\n{message}", get_leaderboard_for_admin() | |
| else: | |
| return f"## β Deletion Failed\n\n{message}", get_leaderboard_for_admin() | |
| delete_btn.click( | |
| fn=handle_delete, | |
| inputs=[delete_model_input], | |
| outputs=[delete_status, admin_table] | |
| ) | |
| if __name__ == "__main__": | |
| # Launch with queue for better concurrency | |
| demo.queue(default_concurrency_limit=5) | |
| demo.launch( | |
| share=True, | |
| server_name="0.0.0.0" | |
| ) | |