""" MADQA Leaderboard - Streamlit Version Benchmark for evaluating AI systems on document collection question answering. Based on the paper: "Strategic Navigation or Stochastic Search? How Agents and Humans Reason Over Document Collections" Color palette: Snowflake colors - SNOWFLAKE BLUE: #29B5E8 - MID-BLUE: #11567F - MIDNIGHT: #000000 - MEDIUM GRAY: #5B5B5B - STAR BLUE: #75CDD7 - VALENCIA ORANGE: #FF9F36 - FIRST LIGHT: #D45B90 - PURPLE MOON: #7254A3 """ import base64 import json import os import secrets import shutil import sys from collections import defaultdict from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timezone from pathlib import Path from urllib.parse import urlencode, quote, unquote # Parallelization config for LLM evaluation MAX_EVAL_WORKERS = 24 import pandas as pd import plotly.graph_objects as go import requests import streamlit as st from huggingface_hub import snapshot_download, HfApi, hf_hub_download # Add eval module to path sys.path.insert(0, str(Path(__file__).parent / "eval")) try: from metrics import ( anls_star, anls_star_llm, aggregate_anls_star_llm, standard_error, confidence_interval, citation_f1, kuiper_statistic, get_effort_value, LLM_JUDGE_SPECIFICITY, LLM_JUDGE_SENSITIVITY ) from datasets import load_dataset EVAL_AVAILABLE = True except ImportError: EVAL_AVAILABLE = False # Fallback values for constants LLM_JUDGE_SPECIFICITY = 1.0 LLM_JUDGE_SENSITIVITY = 0.98 # Page configuration st.set_page_config( page_title="MADQA Leaderboard", page_icon="📄", layout="wide", initial_sidebar_state="collapsed", ) # HuggingFace Hub configuration TOKEN = os.environ.get("HF_TOKEN") QUEUE_REPO = "agentic-document-ai/backend-requests" RESULTS_REPO = "agentic-document-ai/backend-results" CACHE_PATH = os.getenv("HF_HOME", ".") # Submission rate limiting SUBMISSION_LIMITS_FILE = "submission_limits.json" SUBMISSION_LIMIT_HOURS = float(os.environ.get("SUBMISSION_LIMIT_HOURS", 24)) # Configurable, default 24 hours NEWS_FILE = "news.json" NEWS_MAX_DISPLAY = 5 def get_submission_limits() -> dict: """Download submission limits from HF Hub.""" try: # Try to download the limits file file_path = hf_hub_download( repo_id=RESULTS_REPO, filename=SUBMISSION_LIMITS_FILE, repo_type="dataset", token=TOKEN, ) with open(file_path) as f: return json.load(f) except Exception: return {} # File doesn't exist yet def can_user_submit(username: str) -> tuple[bool, str, float]: """Check if user can submit based on rate limit. Returns: (can_submit, message, hours_remaining) """ limits = get_submission_limits() if username not in limits: return True, "", 0 last_submission_str = limits[username] last_submission = datetime.fromisoformat(last_submission_str) now = datetime.now(timezone.utc) time_since = now - last_submission hours_since = time_since.total_seconds() / 3600 if hours_since < SUBMISSION_LIMIT_HOURS: hours_remaining = SUBMISSION_LIMIT_HOURS - hours_since hours = int(hours_remaining) minutes = int((hours_remaining - hours) * 60) return False, f"Please wait {hours}h {minutes}m before your next test set submission.", hours_remaining return True, "", 0 def record_submission(username: str): """Record a new submission timestamp for the user.""" import tempfile # Get current limits (fresh, not cached) limits = get_submission_limits() # Update with new timestamp limits[username] = datetime.now(timezone.utc).isoformat() # Upload updated file try: api = HfApi(token=TOKEN) # Create temp file with updated limits with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: json.dump(limits, f, indent=2) temp_path = f.name api.upload_file( path_or_fileobj=temp_path, path_in_repo=SUBMISSION_LIMITS_FILE, repo_id=RESULTS_REPO, repo_type="dataset", token=TOKEN, ) os.unlink(temp_path) # Clean up except Exception as e: st.warning(f"Could not record submission time: {e}") def get_news() -> list: """Load news items from HF Hub.""" try: file_path = hf_hub_download( repo_id=RESULTS_REPO, filename=NEWS_FILE, repo_type="dataset", token=TOKEN, ) with open(file_path) as f: news = json.load(f) # Sort by date descending news.sort(key=lambda x: x.get('date', ''), reverse=True) return news except Exception: # Return default news if file doesn't exist return [ {"date": "2025-01-04", "text": "Leaderboard launched! Submit your results to appear on the board."} ] def save_news(news: list) -> tuple[bool, str]: """Save news items to HF Hub.""" import tempfile try: # Sort by date descending before saving news.sort(key=lambda x: x.get('date', ''), reverse=True) with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: json.dump(news, f, indent=2) temp_path = f.name api = HfApi(token=TOKEN) api.upload_file( path_or_fileobj=temp_path, path_in_repo=NEWS_FILE, repo_id=RESULTS_REPO, repo_type="dataset", token=TOKEN, commit_message="Update news" ) os.unlink(temp_path) return True, "News updated successfully" except Exception as e: return False, f"Failed to save news: {str(e)}" def get_oauth_config() -> dict | None: """Get HuggingFace OAuth configuration from environment variables. These are automatically set by HuggingFace Spaces when hf_oauth: true is in README.md. See: https://huggingface.co/docs/hub/en/spaces-oauth """ client_id = os.environ.get("OAUTH_CLIENT_ID") client_secret = os.environ.get("OAUTH_CLIENT_SECRET") if client_id and client_secret: return { "client_id": client_id, "client_secret": client_secret, "scopes": os.environ.get("OAUTH_SCOPES", "openid profile"), "provider_url": os.environ.get("OPENID_PROVIDER_URL", "https://huggingface.co"), } return None def get_hf_user() -> dict | None: """Get the logged-in HuggingFace user info from OAuth. Returns dict with 'username', 'name', 'picture' if logged in, None otherwise. Works on HuggingFace Spaces with hf_oauth: true in README.md For local testing, set environment variable: TEST_HF_USER=your_username """ # Check for test user (local development) test_user = os.environ.get("TEST_HF_USER") if test_user: return { 'username': test_user, 'name': test_user, 'picture': '', } # Check session state for logged in user (from OAuth callback) if 'hf_user' in st.session_state and st.session_state.hf_user: return st.session_state.hf_user return None def handle_oauth_callback(): """Handle OAuth callback from HuggingFace. After user authorizes, HF redirects back with 'code' and 'state' query params. We exchange the code for tokens and store user info in session state. Note: We don't strictly validate state because Streamlit session state is lost during the redirect flow. The OAuth is still secure because: 1. The code can only be used once 2. The code is tied to our client_id 3. We're on HTTPS in production """ try: query_params = st.query_params except Exception: # SessionInfo not yet initialized - skip OAuth handling on this run return False # Check if this is an OAuth callback code = query_params.get("code") if not code: return False # If user is already logged in, just clear the query params try: if 'hf_user' in st.session_state and st.session_state.hf_user: st.query_params.clear() return True except Exception: pass oauth_config = get_oauth_config() if not oauth_config: st.query_params.clear() return False # Get redirect URI - must match what HuggingFace expects (.hf.space domain) space_host = os.environ.get("SPACE_HOST", "") if space_host: redirect_uri = f"https://{space_host}" else: redirect_uri = "http://localhost:8501" # Exchange code for tokens token_url = f"{oauth_config['provider_url']}/oauth/token" try: # Prepare auth header credentials = f"{oauth_config['client_id']}:{oauth_config['client_secret']}" auth_header = base64.b64encode(credentials.encode()).decode() response = requests.post( token_url, data={ "grant_type": "authorization_code", "code": code, "redirect_uri": redirect_uri, "client_id": oauth_config["client_id"], }, headers={ "Authorization": f"Basic {auth_header}", "Content-Type": "application/x-www-form-urlencoded", }, timeout=10, ) if response.status_code != 200: # Code might have been used already or expired - clear and let user retry st.query_params.clear() return False tokens = response.json() access_token = tokens.get("access_token") # Get user info userinfo_url = f"{oauth_config['provider_url']}/oauth/userinfo" userinfo_response = requests.get( userinfo_url, headers={"Authorization": f"Bearer {access_token}"}, timeout=10, ) if userinfo_response.status_code == 200: userinfo = userinfo_response.json() st.session_state.hf_user = { 'username': userinfo.get('preferred_username', userinfo.get('name', '')), 'name': userinfo.get('name', ''), 'picture': userinfo.get('picture', ''), } # Clean up query params st.query_params.clear() return True except Exception as e: # Silent failure - user can retry login pass st.query_params.clear() return False def is_running_on_hf_spaces() -> bool: """Check if the app is running on HuggingFace Spaces.""" return os.environ.get("SPACE_ID") is not None def get_login_url() -> str | None: """Generate the HuggingFace OAuth login URL.""" oauth_config = get_oauth_config() if not oauth_config: return None # Get redirect URI - must use .hf.space domain (required by HuggingFace OAuth) space_host = os.environ.get("SPACE_HOST", "") if space_host: redirect_uri = f"https://{space_host}" else: redirect_uri = "http://localhost:8501" # Generate a random state (required by OAuth spec, but we can't validate it # reliably due to Streamlit session loss during redirect) state = secrets.token_urlsafe(16) # Build authorization URL params = { "client_id": oauth_config["client_id"], "redirect_uri": redirect_uri, "scope": oauth_config["scopes"], "state": state, "response_type": "code", } return f"{oauth_config['provider_url']}/oauth/authorize?{urlencode(params)}" def show_login_button(): """Show the HuggingFace login button.""" login_url = get_login_url() if login_url: # Use custom HTML styled like Streamlit's default button, aligned left st.markdown(f''' Sign in with Hugging Face ''', unsafe_allow_html=True) return True return False def logout(): """Log out the current user.""" if 'hf_user' in st.session_state: del st.session_state.hf_user # Colors SNOWFLAKE_BLUE = "#29B5E8" MID_BLUE = "#11567F" VALENCIA_ORANGE = "#FF9F36" STAR_BLUE = "#75CDD7" FIRST_LIGHT = "#D45B90" PURPLE_MOON = "#7254A3" MEDIUM_GRAY = "#5B5B5B" # Available tags for filtering - can be extended AVAILABLE_TAGS = [ "Agentic", "Conventional RAG", "Sparse Search Tool", "Semantic Search Tool", "Vision and Language", "Text-only", ] # Tag colors for visual distinction (cycling through Snowflake secondary colors) TAG_COLORS = { "Agentic": SNOWFLAKE_BLUE, "Conventional RAG": STAR_BLUE, "Sparse Search Tool": VALENCIA_ORANGE, "Semantic Search Tool": FIRST_LIGHT, "Vision and Language": PURPLE_MOON, "Text-only": SNOWFLAKE_BLUE, } # Custom CSS following Snowflake Brand Color Guide # Primary: MID-BLUE (#11567F) for accents/sections, SNOWFLAKE BLUE (#29B5E8) sparingly # Use white text on dark backgrounds per accessibility guidelines st.markdown(f""" """, unsafe_allow_html=True) # Data paths EVAL_RESULTS_PATH = Path(CACHE_PATH) / "eval-results" EVAL_REQUESTS_PATH = Path(CACHE_PATH) / "eval-queue" @st.cache_data(ttl=300) # Cache for 5 minutes def download_data(): """Download data from HuggingFace Hub.""" try: snapshot_download( repo_id=QUEUE_REPO, local_dir=str(EVAL_REQUESTS_PATH), repo_type="dataset", tqdm_class=None, etag_timeout=30, token=TOKEN, ) except Exception as e: st.warning(f"Could not download queue data: {e}") try: snapshot_download( repo_id=RESULTS_REPO, local_dir=str(EVAL_RESULTS_PATH), repo_type="dataset", tqdm_class=None, etag_timeout=30, token=TOKEN, ) except Exception as e: st.warning(f"Could not download results data: {e}") class ModelType: API = "api" OPEN_WEIGHT = "open-weight" @staticmethod def get_color(model_type: str) -> str: if model_type == ModelType.API: return VALENCIA_ORANGE elif model_type == ModelType.OPEN_WEIGHT: return STAR_BLUE return MEDIUM_GRAY # Load SVG icons from local assets folder ASSETS_PATH = Path(__file__).resolve().parent / "assets" def load_svg_icon(icon_name: str, fill_color: str = None) -> str: """Load SVG icon and return as data URI with optional color replacement. This matches the Gradio app's load_svg_data_uri function. """ svg_file = ASSETS_PATH / f"{icon_name}.svg" if not svg_file.exists(): return "" try: with open(svg_file, "r", encoding="utf-8") as f: svg_content = f.read() # Replace black fill with specified color for visibility on dark background if fill_color: svg_content = svg_content.replace('fill="black"', f'fill="{fill_color}"') svg_content = svg_content.replace('stroke="black"', f'stroke="{fill_color}"') b64 = base64.b64encode(svg_content.encode()).decode() return f"data:image/svg+xml;base64,{b64}" except Exception: return "" def load_png_icon(icon_name: str) -> str: """Load PNG icon and return as data URI.""" png_file = ASSETS_PATH / f"{icon_name}.png" if not png_file.exists(): return "" try: with open(png_file, "rb") as f: png_bytes = f.read() b64 = base64.b64encode(png_bytes).decode() return f"data:image/png;base64,{b64}" except Exception: return "" # Preload icons with Snowflake colors (matching Gradio app) ICON_CLOUD = load_svg_icon("snow_cloud2", VALENCIA_ORANGE) # Orange cloud for API (same as Gradio) ICON_CODE = load_svg_icon("snow_code", STAR_BLUE) # Blue code for open-weight (same as Gradio) ICON_HUMAN = load_png_icon("human_performance") # Tab header icons - use white to match header text color HEADER_ICON_COLOR = "#FFFFFF" ICON_MEDAL = load_svg_icon("snow_medal", HEADER_ICON_COLOR) # Leaderboard header icon ICON_EYE = load_svg_icon("snow_eye", HEADER_ICON_COLOR) # Analysis header icon ICON_DOCS = load_svg_icon("snow_docs", HEADER_ICON_COLOR) # About header icon ICON_WRITE = load_svg_icon("snow_write", HEADER_ICON_COLOR) # Submit header icon def generate_placeholder_description(model_name: str, tags: list, model_type: str) -> str: """Generate a placeholder description based on model metadata.""" parts = [] # Describe model type if model_type == "api": parts.append("API-based") elif model_type == "open-weight": parts.append("Open-weight") # Describe approach based on tags if tags: if "Agentic" in tags: parts.append("agentic system") elif "Conventional RAG" in tags: parts.append("RAG pipeline") else: parts.append("model") # Add tool/capability info capabilities = [] if "Sparse Search" in tags: capabilities.append("sparse search") if "Semantic Search Tool" in tags: capabilities.append("semantic search") if "Vision and Language" in tags: capabilities.append("vision") if "Text-only" in tags: capabilities.append("text-only") if capabilities: parts.append(f"with {', '.join(capabilities)}") else: parts.append("model") return " ".join(parts) if parts else "" def get_model_type_html(model_type: str) -> str: """Get HTML for model type with icon and colored text.""" color = ModelType.get_color(model_type) icon_uri = ICON_CLOUD if model_type == ModelType.API else ICON_CODE # Fallback emoji if icon doesn't load fallback_emoji = "☁️" if model_type == ModelType.API else "" if icon_uri: return f'''
{model_type}
''' # Fallback without icon return f'{fallback_emoji} {model_type}' def _extract_timestamp_from_filename(filename: str) -> str: """Extract timestamp from filename like 'Model_results_20260109_152104.json'.""" import re match = re.search(r'_(\d{8}_\d{6})\.json$', filename) return match.group(1) if match else "00000000_000000" def _detect_effort_uniform(result_file: Path, data: dict) -> bool: """Check if all predictions in the companion JSONL have the same effort value.""" pred_rel = data.get("source_predictions_file") if pred_rel: pred_path = Path(EVAL_RESULTS_PATH) / pred_rel else: pred_path = Path(str(result_file).replace("_results_", "_predictions_").replace(".json", ".jsonl")) if not pred_path.exists(): return False try: effort_values = set() with open(pred_path) as f: for line in f: line = line.strip() if not line: continue pred = json.loads(line) search_history = pred.get('search_history', []) steps = len(search_history) if isinstance(search_history, list) and search_history else 0 if steps == 0: steps = pred.get('iterations', 0) try: steps = float(steps) if steps else 0 except (TypeError, ValueError): steps = 0 effort_dict = { 'steps': steps, 'llm_calls': pred.get('llm_calls') or (pred.get('trajectory', {}) or {}).get('llm_calls'), 'effort': pred.get('effort') or (pred.get('trajectory', {}) or {}).get('effort'), } val = get_effort_value(effort_dict) if val > 0: effort_values.add(val) if len(effort_values) > 1: return False return len(effort_values) == 1 except Exception: return False @st.cache_data(ttl=300) # Cache for 5 minutes def load_eval_results() -> pd.DataFrame: """Load evaluation results from JSON files, keeping only the most recent per model.""" seen_models = {} # Track: model_name -> (timestamp, result_dict, filepath) results_path = Path(EVAL_RESULTS_PATH) if not results_path.exists(): return pd.DataFrame() for org_dir in results_path.iterdir(): if org_dir.is_dir() and not org_dir.name.startswith('.'): for result_file in org_dir.glob("*_results_*.json"): try: with open(result_file) as f: data = json.load(f) # Extract data model_name = data.get("model_name", "Unknown") metadata = data.get("metadata", {}) result_scores = data.get("results", {}) # Get tags - default to ["Agentic"] if not specified tags = data.get("tags", metadata.get("tags", ["Agentic"])) if isinstance(tags, str): tags = [tags] # Convert single tag to list # Get per-domain scores if available by_domain = result_scores.get("by_domain", {}) # Use semantic accuracy if available, otherwise fall back to ANLS* overall = result_scores.get("overall", {}) single_ev = result_scores.get("single_evidence", {}) multi_page = result_scores.get("multi_evidence_same_doc", {}) multi_doc = result_scores.get("multi_evidence_multi_doc", {}) # Primary metric: semantic (ANLS* + LLM) if available, otherwise ANLS* semantic_acc = overall.get("semantic", overall.get("anls", 0.0)) semantic_ci = overall.get("semantic_ci") # 95% CI tuple semantic_se = None # Calculate CI/SE on-the-fly using bias correction if not stored if semantic_acc > 0: try: from metrics import confidence_interval, standard_error n = result_scores.get("single_evidence", {}).get("n", 500) p = semantic_acc / 100.0 # Convert to proportion if not semantic_ci: ci = confidence_interval(p, n) # Uses calibrated q0, q1, m0, m1 semantic_ci = (ci[0] * 100, ci[1] * 100) if semantic_se is None: semantic_se = standard_error(p, n) * 100 # SE in percentage points except Exception: semantic_ci = semantic_ci if semantic_ci else None semantic_se = semantic_se if semantic_se is not None else None anls_acc = overall.get("anls", 0.0) # Detect effort uniformity for Agentic models with Kuiper kuiper_val = overall.get("kuiper", 0.0) is_agentic = "Agentic" in tags if isinstance(tags, list) else False effort_uniform = False if is_agentic and kuiper_val and EVAL_AVAILABLE: effort_uniform = _detect_effort_uniform(result_file, data) result_dict = { "Model": model_name, "Organization": data.get("organization", data.get("submitted_by", org_dir.name)), "Model Type": metadata.get("model_type", "unknown"), "Tags": tags, # Store as list # Primary: Accuracy with LLM judge (ANLS* + LLM with bias correction) "Accuracy (LLM judge)": semantic_acc, "_Accuracy_SE": semantic_se, # Hidden: for ±SE display "_Accuracy_CI": semantic_ci, # Hidden: for tooltip display "Acc. Single-Hop": single_ev.get("semantic", single_ev.get("anls", 0.0)), "Acc. Cross-Page": multi_page.get("semantic", multi_page.get("anls", 0.0)), "Acc. Cross-Doc": multi_doc.get("semantic", multi_doc.get("anls", 0.0)), # Secondary: Pure string-based ANLS* (hidden by default) "ANLS* (string)": anls_acc, # Attribution metrics "Attribution (Page F1)": overall.get("page_f1", 0.0), "Attribution (Doc F1)": overall.get("doc_f1", 0.0), # Calibration metric "Effort (Kuiper)": kuiper_val, "_effort_uniform": effort_uniform, "Submission Date": data.get("submission_date", ""), "Link": data.get("link", ""), "Description": data.get("description", metadata.get("description", "")) or generate_placeholder_description(model_name, tags, metadata.get("model_type", "")), # Per-domain scores (stored as JSON string for DataFrame compatibility) "_by_domain": json.dumps(by_domain) if by_domain else "{}", } # Extract timestamp from filename file_timestamp = _extract_timestamp_from_filename(result_file.name) # Keep only the most recent result per model if model_name not in seen_models or file_timestamp > seen_models[model_name][0]: seen_models[model_name] = (file_timestamp, result_dict) except Exception as e: st.warning(f"Error loading {result_file}: {e}") if not seen_models: return pd.DataFrame() # Build results list from deduplicated models results = [result_dict for _, result_dict in seen_models.values()] df = pd.DataFrame(results) df = df.sort_values("Accuracy (LLM judge)", ascending=False).reset_index(drop=True) return df def get_all_tags_from_df(df: pd.DataFrame) -> list: """Extract all unique tags from the DataFrame.""" all_tags = set() if "Tags" in df.columns: for tags in df["Tags"]: if isinstance(tags, list): all_tags.update(tags) return sorted(list(all_tags)) def filter_df_by_tags(df: pd.DataFrame, selected_tags: list) -> pd.DataFrame: """Filter DataFrame to show only rows that have at least one of the selected tags.""" if not selected_tags: return df def has_any_tag(row_tags): if not isinstance(row_tags, list): return False return any(tag in row_tags for tag in selected_tags) return df[df["Tags"].apply(has_any_tag)] def render_tags_html(tags: list) -> str: """Render tags as styled badges.""" if not tags or not isinstance(tags, list): return "" badges = [] for tag in tags: color = TAG_COLORS.get(tag, MID_BLUE) # Use lighter background with colored border for better readability badge = f'''{tag}''' badges.append(badge) return "".join(badges) def format_model_name(row) -> str: """Format model name with optional link.""" model_name = row["Model"] link = row.get("Link", "") if link and link.strip(): return f'{model_name}' return model_name def format_model_type(model_type: str) -> str: """Format model type with icon and color.""" icon = ModelType.get_icon(model_type) color = ModelType.get_color(model_type) return f'{icon} {model_type}' # Metric tooltips for table headers METRIC_TOOLTIPS = { "Accuracy (LLM judge)": "Answer accuracy using ANLS* + LLM judge with bias correction. Captures semantic correctness beyond string matching. Higher is better.", "ANLS* (string)": "String-based accuracy using ANLS* (Average Normalized Levenshtein Similarity). Stricter than semantic. Higher is better.", "Acc. Single-Hop": "Accuracy on questions requiring evidence from a single page.", "Acc. Cross-Page": "Accuracy on multi-hop questions requiring evidence from multiple pages within the same document.", "Acc. Cross-Doc": "Accuracy on multi-hop questions requiring evidence from multiple documents.", "Attribution (Page F1)": "F1 score for page-level attribution. Measures overlap between cited pages and gold evidence. Higher is better.", "Attribution (Doc F1)": "F1 score for document-level attribution. Measures whether the correct documents were identified. Higher is better.", "Effort (Kuiper)": "Effort calibration metric (Kuiper statistic). Measures if effort correlates with problem difficulty. Lower is better.", "Model Type": "API = cloud-based model, open-weight = downloadable weights", "Tags": "Approach characteristics: Agentic, RAG, search tools, vision capabilities, etc.", } def render_leaderboard_table(df: pd.DataFrame, columns: list, show_analyze_column: bool = True, uncertainty_mode: str = "± SE"): """Render an HTML table matching the Gradio leaderboard style. Args: uncertainty_mode: One of "± SE", "90% CI", "95% CI", or "None" """ if df.empty: st.warning("No data available") return # Build table HTML with tooltips header_cells = [] for col in columns: # Add line break before brackets for cleaner display display_col = col.replace(" (", "
(") if " (" in col else col tooltip = METRIC_TOOLTIPS.get(col, "") if tooltip: header_cells.append(f'{display_col}') else: header_cells.append(f'{display_col}') # Add "Analyze" column header if show_analyze_column: header_cells.append('Analyze') header_cells = "".join(header_cells) # Columns that should be merged for human performance rows HUMAN_MERGE_COLS = ["Model", "Organization", "Model Type"] rows_html = "" for _, row in df.iterrows(): cells = [] model_name = row.get("Model", "") organization = row.get("Organization", "") hide_attrib_kuiper = model_name == "Human with Oracle Retriever" # Check if this is a human performance row (should merge Model, Organization, Model Type) is_human_row = organization == "Humanity" # Calculate colspan for human rows (count how many merge columns are in selected columns) human_colspan = sum(1 for col in HUMAN_MERGE_COLS if col in columns) if is_human_row else 1 for col in columns: value = row.get(col, "") # Skip Organization and Model Type for human rows (they're merged into Model) if is_human_row and col in ["Organization", "Model Type"]: continue if col == "Model": # Model name with optional link and description link = row.get("Link", "") description = row.get("Description", "") human_icon_html = "" if is_human_row and ICON_HUMAN: human_icon_html = ( f'Human baseline' ) if link and str(link).strip(): name_html = f'{human_icon_html}{value}' else: name_html = f'{human_icon_html}{value}' if description and str(description).strip(): cell_html = f'{name_html}
{description}' else: cell_html = name_html # For human rows, use colspan to span Model, Organization, and Model Type columns if is_human_row and human_colspan > 1: cells.append(f'{cell_html}') else: cells.append(f'{cell_html}') elif col == "Model Type": # Model type with icon cell_html = get_model_type_html(str(value)) cells.append(f'{cell_html}') elif col == "Tags": # Render tags as badges cell_html = render_tags_html(value) cells.append(f'{cell_html}') elif col == "Accuracy (LLM judge)" or col == "ANLS* (string)" or col.startswith("Acc."): # Format accuracy scores (scale 0-100) try: acc_val = f"{float(value):.1f}" if value else "0" acc_float = float(value) if value else 0 except (ValueError, TypeError): acc_val = str(value) acc_float = 0 # Add uncertainty based on mode cell_html = acc_val if uncertainty_mode != "None" and col == "Accuracy (LLM judge)": se = row.get("_Accuracy_SE") ci = row.get("_Accuracy_CI") if uncertainty_mode == "± SE" and se is not None and se > 0: ci_tooltip = f"95% CI: [{ci[0]:.1f}, {ci[1]:.1f}]" if ci else "" uncertainty_text = f' ± {se:.1f}' cell_html = f'{acc_val}{uncertainty_text}' elif uncertainty_mode == "95% CI" and ci: uncertainty_text = f' [{ci[0]:.1f}-{ci[1]:.1f}]' cell_html = f'{acc_val}{uncertainty_text}' elif uncertainty_mode == "90% CI" and se is not None and se > 0: # 90% CI: z=1.645 instead of 1.96, so CI is ~84% of 95% CI width z_90 = 1.645 half_width = se * z_90 ci_90_low = max(0, acc_float - half_width) ci_90_high = min(100, acc_float + half_width) uncertainty_text = f' [{ci_90_low:.1f}-{ci_90_high:.1f}]' cell_html = f'{acc_val}{uncertainty_text}' elif uncertainty_mode != "None" and col.startswith("Acc.") and acc_float > 0: # Compute uncertainty for breakdown accuracy columns n_approx = 150 # Rough estimate for breakdown categories p = acc_float / 100.0 if 0 < p < 1: from math import sqrt se_raw = sqrt(p * (1 - p) / n_approx) se_adj = se_raw / (LLM_JUDGE_SPECIFICITY + LLM_JUDGE_SENSITIVITY - 1) * 100 if uncertainty_mode == "± SE": uncertainty_text = f' ± {se_adj:.1f}' cell_html = f'{acc_val}{uncertainty_text}' elif uncertainty_mode == "95% CI": half_width = se_adj * 1.96 ci_low = max(0, acc_float - half_width) ci_high = min(100, acc_float + half_width) uncertainty_text = f' [{ci_low:.1f}-{ci_high:.1f}]' cell_html = f'{acc_val}{uncertainty_text}' elif uncertainty_mode == "90% CI": half_width = se_adj * 1.645 ci_low = max(0, acc_float - half_width) ci_high = min(100, acc_float + half_width) uncertainty_text = f' [{ci_low:.1f}-{ci_high:.1f}]' cell_html = f'{acc_val}{uncertainty_text}' cells.append(f'{cell_html}') elif col.startswith("Attribution"): # Format F1 scores (scale 0-100) - NOT bias-adjusted if hide_attrib_kuiper: cells.append('—') continue try: attr_val = f"{float(value):.1f}" if value else "0" attr_float = float(value) if value else 0 except (ValueError, TypeError): attr_val = str(value) attr_float = 0 cell_html = attr_val # Add uncertainty for attribution metrics (simple binomial, no bias adjustment) if uncertainty_mode != "None" and attr_float > 0: n_approx = 500 # Test set size p = attr_float / 100.0 if 0 < p < 1: from math import sqrt se = sqrt(p * (1 - p) / n_approx) * 100 # No bias adjustment if uncertainty_mode == "± SE": uncertainty_text = f' ± {se:.1f}' cell_html = f'{attr_val}{uncertainty_text}' elif uncertainty_mode == "95% CI": half_width = se * 1.96 ci_low = max(0, attr_float - half_width) ci_high = min(100, attr_float + half_width) uncertainty_text = f' [{ci_low:.1f}-{ci_high:.1f}]' cell_html = f'{attr_val}{uncertainty_text}' elif uncertainty_mode == "90% CI": half_width = se * 1.645 ci_low = max(0, attr_float - half_width) ci_high = min(100, attr_float + half_width) uncertainty_text = f' [{ci_low:.1f}-{ci_high:.1f}]' cell_html = f'{attr_val}{uncertainty_text}' cells.append(f'{cell_html}') elif col == "Effort (Kuiper)": # Format Kuiper statistic (lower is better for calibration) # Hide for Conventional RAG models (not meaningful) if hide_attrib_kuiper: cells.append('—') continue tags = row.get("Tags", []) is_conventional_rag = "Conventional RAG" in tags if isinstance(tags, list) else False if is_conventional_rag: cell_html = "—" else: try: cell_html = f"{float(value):.1f}" if value else "0" except (ValueError, TypeError): cell_html = str(value) if row.get("_effort_uniform", False) and cell_html != "0": tooltip = "This agent uses the same effort for all samples, so effort-invariance metric is not meaningful." cell_html = f'({cell_html})' cells.append(f'{cell_html}') elif col == "Organization": cell_html = str(value) if value else "" cells.append(f'{cell_html}') else: cell_html = str(value) if value else "" cells.append(f'{cell_html}') # Add "Analyze" link cell if show_analyze_column: # URL-encode the model name for query param encoded_name = quote(str(model_name)) analyze_link = f'View' cells.append(f'{analyze_link}') rows_html += f'{"".join(cells)}' table_html = f'''
{header_cells} {rows_html}
''' st.markdown(table_html, unsafe_allow_html=True) def build_csv_download_df(df: pd.DataFrame, columns: list, uncertainty_mode: str) -> pd.DataFrame: """Build a CSV-friendly DataFrame with uncertainty text included.""" if df.empty or not columns: return pd.DataFrame() export_df = df[columns].copy() for idx in export_df.index: row = df.loc[idx] for col in columns: value = row.get(col, "") if col == "Accuracy (LLM judge)" or col == "ANLS* (string)" or col.startswith("Acc."): try: acc_float = float(value) if value else 0.0 acc_val = f"{acc_float:.1f}" except (ValueError, TypeError): export_df.at[idx, col] = value continue text = acc_val if uncertainty_mode != "None": if col == "Accuracy (LLM judge)": se = row.get("_Accuracy_SE") ci = row.get("_Accuracy_CI") if uncertainty_mode == "± SE" and se is not None and se > 0: text = f"{acc_val} ± {se:.1f}" elif uncertainty_mode == "95% CI": if ci: text = f"{acc_val} [{ci[0]:.1f}-{ci[1]:.1f}]" elif se is not None and se > 0: half_width = se * 1.96 text = f"{acc_val} [{max(0, acc_float - half_width):.1f}-{min(100, acc_float + half_width):.1f}]" elif uncertainty_mode == "90% CI" and se is not None and se > 0: half_width = se * 1.645 text = f"{acc_val} [{max(0, acc_float - half_width):.1f}-{min(100, acc_float + half_width):.1f}]" elif col.startswith("Acc.") and acc_float > 0: n_approx = 150 p = acc_float / 100.0 if 0 < p < 1: from math import sqrt se_raw = sqrt(p * (1 - p) / n_approx) se_adj = se_raw / (LLM_JUDGE_SPECIFICITY + LLM_JUDGE_SENSITIVITY - 1) * 100 if uncertainty_mode == "± SE": text = f"{acc_val} ± {se_adj:.1f}" elif uncertainty_mode == "95% CI": half_width = se_adj * 1.96 text = f"{acc_val} [{max(0, acc_float - half_width):.1f}-{min(100, acc_float + half_width):.1f}]" elif uncertainty_mode == "90% CI": half_width = se_adj * 1.645 text = f"{acc_val} [{max(0, acc_float - half_width):.1f}-{min(100, acc_float + half_width):.1f}]" export_df.at[idx, col] = text elif col.startswith("Attribution"): try: attr_float = float(value) if value else 0.0 attr_val = f"{attr_float:.1f}" except (ValueError, TypeError): export_df.at[idx, col] = value continue text = attr_val if uncertainty_mode != "None" and attr_float > 0: n_approx = 500 p = attr_float / 100.0 if 0 < p < 1: from math import sqrt se = sqrt(p * (1 - p) / n_approx) * 100 if uncertainty_mode == "± SE": text = f"{attr_val} ± {se:.1f}" elif uncertainty_mode == "95% CI": half_width = se * 1.96 text = f"{attr_val} [{max(0, attr_float - half_width):.1f}-{min(100, attr_float + half_width):.1f}]" elif uncertainty_mode == "90% CI": half_width = se * 1.645 text = f"{attr_val} [{max(0, attr_float - half_width):.1f}-{min(100, attr_float + half_width):.1f}]" export_df.at[idx, col] = text return export_df def create_accuracy_vs_attribution_plot(df: pd.DataFrame) -> go.Figure: """Create scatter plot of Accuracy vs Attribution.""" if df.empty: fig = go.Figure() fig.add_annotation( text="No data available", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False, font=dict(size=20, color="white") ) return fig color_map = { "api": VALENCIA_ORANGE, # Orange for API "open-weight": STAR_BLUE, # Star Blue for open-weight } fig = go.Figure() for model_type in df["Model Type"].unique(): df_type = df[df["Model Type"] == model_type] fig.add_trace(go.Scatter( x=df_type["Attribution (Page F1)"], y=df_type["Accuracy (LLM judge)"], mode="markers", name=model_type, text=df_type["Model"], marker=dict( size=12, color=color_map.get(model_type, MEDIUM_GRAY), line=dict(width=1.5, color="white") ), hovertemplate="%{text}
Attribution: %{x:.1f}
Accuracy: %{y:.1f}", )) fig.update_layout( title=dict(text="Accuracy vs Attribution", font=dict(color="white")), xaxis_title="Attribution (Page F1)", yaxis_title="Accuracy (LLM judge)", hovermode="closest", template="plotly_dark", height=650, showlegend=True, legend=dict(title="Model Type", yanchor="top", y=0.99, xanchor="left", x=0.01, font=dict(color="#ccc")), paper_bgcolor="rgba(0,0,0,0)", plot_bgcolor="rgba(14,17,23,0.8)", xaxis=dict(gridcolor=MID_BLUE, zerolinecolor=MID_BLUE), yaxis=dict(gridcolor=MID_BLUE, zerolinecolor=MID_BLUE), ) return fig def create_accuracy_vs_effort_plot(df: pd.DataFrame) -> go.Figure: """Create scatter plot of Accuracy vs Effort (Kuiper).""" # Filter out Conventional RAG models (Kuiper not meaningful for them) def is_not_conventional_rag(tags): if isinstance(tags, list): return "Conventional RAG" not in tags return True df_filtered = df[df["Tags"].apply(is_not_conventional_rag)] if df_filtered.empty: fig = go.Figure() fig.add_annotation( text="No data available", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False, font=dict(size=20, color="white") ) return fig color_map = { "api": VALENCIA_ORANGE, # Orange for API "open-weight": STAR_BLUE, # Star Blue for open-weight } fig = go.Figure() for model_type in df_filtered["Model Type"].unique(): df_type = df_filtered[df_filtered["Model Type"] == model_type] fig.add_trace(go.Scatter( x=df_type["Effort (Kuiper)"], y=df_type["Accuracy (LLM judge)"], mode="markers", name=model_type, text=df_type["Model"], marker=dict( size=12, color=color_map.get(model_type, MEDIUM_GRAY), line=dict(width=1.5, color="white") ), hovertemplate="%{text}
Effort: %{x:.1f}
Accuracy: %{y:.1f}", )) fig.update_layout( title=dict(text="Accuracy vs Effort", font=dict(color="white")), xaxis_title="Effort (Kuiper) — lower is better", yaxis_title="Accuracy (LLM judge)", hovermode="closest", template="plotly_dark", height=650, showlegend=True, legend=dict(title="Model Type", yanchor="top", y=0.99, xanchor="left", x=0.01, font=dict(color="#ccc")), paper_bgcolor="rgba(0,0,0,0)", plot_bgcolor="rgba(14,17,23,0.8)", xaxis=dict(gridcolor=MID_BLUE, zerolinecolor=MID_BLUE), yaxis=dict(gridcolor=MID_BLUE, zerolinecolor=MID_BLUE), ) return fig def create_domain_accuracy_chart(by_domain: dict, model_name: str, overall_accuracy: float = 0) -> go.Figure: """Create a horizontal bar chart showing accuracy by domain.""" # Filter out "Other" category filtered_domain = {k: v for k, v in by_domain.items() if k.lower() != 'other'} if not filtered_domain: fig = go.Figure() fig.add_annotation( text="No per-domain data available", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False, font=dict(size=16, color="white") ) fig.update_layout( template="plotly_dark", paper_bgcolor="rgba(0,0,0,0)", plot_bgcolor="rgba(14,17,23,0.8)", ) return fig # Sort domains by accuracy (descending) sorted_domains = sorted(filtered_domain.items(), key=lambda x: x[1].get('anls', 0), reverse=True) domains = [d[0] for d in sorted_domains] accuracies = [d[1].get('anls', 0) for d in sorted_domains] counts = [d[1].get('n', 0) for d in sorted_domains] # Color based on above/below overall accuracy colors = [SNOWFLAKE_BLUE if acc >= overall_accuracy else VALENCIA_ORANGE for acc in accuracies] fig = go.Figure() fig.add_trace(go.Bar( y=domains, x=accuracies, orientation='h', marker=dict( color=colors, line=dict(width=1, color='white') ), text=[f"{acc:.1f}% (n={n})" for acc, n in zip(accuracies, counts)], textposition='auto', textfont=dict(color='white', size=11), hovertemplate="%{y}
Accuracy: %{x:.1f}%", )) fig.update_layout( title=dict( text=f"Accuracy by Domain: {model_name}", font=dict(color="white", size=16) ), xaxis_title="Accuracy (ANLS* %)", yaxis_title="", template="plotly_dark", height=max(400, len(domains) * 35), # Dynamic height based on number of domains paper_bgcolor="rgba(0,0,0,0)", plot_bgcolor="rgba(14,17,23,0.8)", xaxis=dict( gridcolor=MID_BLUE, zerolinecolor=MID_BLUE, range=[0, 100] ), yaxis=dict( gridcolor=MID_BLUE, autorange="reversed" # Keep highest at top ), margin=dict(l=150, r=50, t=60, b=50), ) return fig def show_model_details(model_name: str): """Show detailed per-domain breakdown for a model.""" # Load model data from cached DataFrame df = load_eval_results() if df.empty: st.warning("No model data available") return model_row = df[df["Model"] == model_name] if model_row.empty: st.warning(f"Model '{model_name}' not found") return model_data = model_row.iloc[0] # Check if this is a Conventional RAG model tags = model_data.get('Tags', []) is_conventional_rag = "Conventional RAG" in tags if isinstance(tags, list) else False # Display main metrics col1, col2, col3 = st.columns(3) with col1: st.metric("Accuracy (LLM judge)", f"{model_data['Accuracy (LLM judge)']:.1f}%") with col2: st.metric("Attribution (Page F1)", f"{model_data['Attribution (Page F1)']:.1f}%") with col3: if is_conventional_rag: st.metric("Effort (Kuiper)", "—") elif model_data.get('_effort_uniform', False): kuiper = model_data.get('Effort (Kuiper)', 0) st.metric("Effort (Kuiper)", f"({kuiper:.2f})" if kuiper else "N/A", help="This agent uses the same effort for all samples, so effort-invariance metric is not meaningful.") else: kuiper = model_data.get('Effort (Kuiper)', 0) st.metric("Effort (Kuiper)", f"{kuiper:.2f}" if kuiper else "N/A") # Show note for Conventional RAG models if is_conventional_rag: st.caption("*Effort (Kuiper) is only meaningful for Agentic systems with iterative search behavior.*") # Display breakdown by hop type col1, col2, col3 = st.columns(3) with col1: single_hop = model_data.get('Acc. Single-Hop', 0) st.metric("Acc. Single-Hop", f"{single_hop:.1f}%" if single_hop else "N/A") with col2: cross_page = model_data.get('Acc. Cross-Page', 0) st.metric("Acc. Cross-Page", f"{cross_page:.1f}%" if cross_page else "N/A") with col3: cross_doc = model_data.get('Acc. Cross-Doc', 0) st.metric("Acc. Cross-Doc", f"{cross_doc:.1f}%" if cross_doc else "N/A") # Get per-domain data by_domain_str = model_data.get('_by_domain', '{}') try: by_domain = json.loads(by_domain_str) if isinstance(by_domain_str, str) else by_domain_str except (json.JSONDecodeError, TypeError): by_domain = {} if by_domain: # Show per-domain chart (use overall accuracy as threshold for coloring) overall_accuracy = model_data.get('Accuracy (LLM judge)', 0) fig = create_domain_accuracy_chart(by_domain, model_name, overall_accuracy) st.plotly_chart(fig, width="stretch") else: st.info("Per-domain breakdown not available for this submission. Newer submissions will include this data.") def _prediction_has_effort(pred: dict) -> bool: """Check if a prediction contains at least one valid effort measure.""" search_history = pred.get('search_history', []) if isinstance(search_history, list) and len(search_history) > 0: return True for key in ('iterations', 'steps', 'llm_calls', 'effort'): val = pred.get(key) if val is not None: try: if float(val) > 0: return True except (TypeError, ValueError): pass trajectory = pred.get('trajectory', {}) if isinstance(trajectory, dict): for key in ('llm_calls', 'effort'): val = trajectory.get(key) if val is not None: try: if float(val) > 0: return True except (TypeError, ValueError): pass return False def validate_jsonl_submission(file_content: str) -> tuple[bool, str, list]: """Validate JSONL submission format and return parsed predictions.""" try: lines = file_content.strip().split("\n") if not lines or (len(lines) == 1 and not lines[0].strip()): return False, "File is empty", [] predictions = [] for line_num, line in enumerate(lines, 1): line = line.strip() if not line: continue try: pred = json.loads(line) except json.JSONDecodeError as e: return False, f"Line {line_num}: Invalid JSON - {str(e)}", [] # Required: question and answer if "question" not in pred: return False, f"Line {line_num}: Missing required field 'question'", [] if "answer" not in pred: return False, f"Line {line_num}: Missing required field 'answer'", [] predictions.append(pred) return True, "", predictions except Exception as e: return False, f"Error reading file: {str(e)}", [] @st.cache_data(ttl=3600) # Cache for 1 hour def derive_hop_type(evidence: list) -> str: """Derive hop type from evidence list. - single: Single page from a single document - cross_page: Multiple pages from the same document - cross_doc: Pages from different documents Args: evidence: List of dicts with 'document' and 'page' keys Returns: 'single', 'cross_page', or 'cross_doc' """ if not evidence: return 'single' # Get unique documents and pages documents = set() pages = set() for ev in evidence: doc = ev.get('document') page = ev.get('page') if doc is not None: documents.add(doc) if doc is not None and page is not None: pages.add((doc, page)) # Determine hop type based on evidence structure if len(documents) > 1: return 'cross_doc' # Multiple documents elif len(pages) > 1: return 'cross_page' # Multiple pages from same document else: return 'single' # Single page def load_gold_standard(dataset_name: str = "agentic-document-ai/dataset-PRIVATE", split: str = "test"): """Load gold standard from HuggingFace dataset. Note: Uses dataset-PRIVATE for test split (contains gold answers). """ if not EVAL_AVAILABLE: return {}, {} try: dataset = load_dataset(dataset_name, split=split) by_text = {} by_id = {} for ex in dataset: question = ex['question'].strip() qid = ex.get('id', '') # Try multiple field names for answers (different splits may use different names) answers = ex.get('answer_variants') or ex.get('answers') or [] # If answers is a string, wrap it in a list if isinstance(answers, str): answers = [[answers]] # If answers is a flat list of strings, wrap each in a list elif answers and isinstance(answers[0], str): answers = [answers] evidence = ex.get('evidence', []) gold_data = { 'answers': answers, 'evidence': evidence, 'category': ex.get('document_category', ''), 'domain': ex.get('domain', ''), # Derive hop type from evidence structure 'hop_type': derive_hop_type(evidence) } by_text[question] = gold_data if qid: by_id[qid] = gold_data return by_text, by_id except Exception as e: st.error(f"Error loading dataset: {e}") return {}, {} def _evaluate_single_item(args, max_retries=3): """Evaluate a single prediction item (for parallel processing).""" import time as _time idx, pred, gold_data, use_llm_judge = args question = pred.get('question', '').strip() answer = pred.get('answer', '') citations = pred.get('citations', []) search_history = pred.get('search_history', []) steps = len(search_history) if search_history else pred.get('iterations', 0) # Look for effort metrics at top level or nested in 'trajectory' trajectory = pred.get('trajectory', {}) # Ensure trajectory is a dict before calling .get() on it if not isinstance(trajectory, dict): trajectory = {} llm_calls = pred.get('llm_calls') or trajectory.get('llm_calls') effort = pred.get('effort') or trajectory.get('effort') # Calculate non-LLM metrics first anls = anls_star(answer, gold_data['answers']) doc_f1 = citation_f1(citations, gold_data['evidence'], level='document') page_f1 = citation_f1(citations, gold_data['evidence'], level='page') # Semantic accuracy with LLM judge (or just ANLS* if disabled) if use_llm_judge: for attempt in range(max_retries): try: llm_result = anls_star_llm(answer, gold_data['answers'], question) semantic_score = llm_result['score'] break except Exception: if attempt < max_retries - 1: _time.sleep(2 ** attempt) # Exponential backoff else: raise else: semantic_score = anls return { 'idx': idx, 'question': question, 'anls': anls, 'semantic_score': semantic_score, 'correct': semantic_score >= 0.5, 'doc_f1': doc_f1['f1'], 'page_f1': page_f1['f1'], 'steps': steps, 'llm_calls': llm_calls, 'effort': effort, 'hop_type': gold_data.get('hop_type', 'single'), 'category': gold_data['category'], 'domain': gold_data['domain'] } def evaluate_predictions( predictions: list, gold_by_text: dict, gold_by_id: dict, use_llm_judge: bool = True, progress_callback=None ) -> dict: """Evaluate predictions against gold standard (parallelized when using LLM judge). Args: predictions: List of prediction dicts gold_by_text: Gold data indexed by question text gold_by_id: Gold data indexed by question ID use_llm_judge: If True, use ANLS*+LLM for semantic accuracy (default) progress_callback: Optional callback(current, total) for progress updates """ if not EVAL_AVAILABLE: return {"error": "Evaluation module not available"} # First pass: match predictions to gold standard matched_items = [] unmatched = [] for pred in predictions: question = pred.get('question', '').strip() qid = pred.get('id', '') # Match to gold gold_data = None if question in gold_by_text: gold_data = gold_by_text[question] elif qid and qid in gold_by_id: gold_data = gold_by_id[qid] if gold_data: matched_items.append((pred, gold_data, use_llm_judge)) else: unmatched.append(question[:50] + "..." if len(question) > 50 else question) if not matched_items: return {"error": "No predictions matched the gold standard"} # Prepare items with index items_with_idx = [(i, pred, gold, llm) for i, (pred, gold, llm) in enumerate(matched_items)] total = len(items_with_idx) evals = [] completed = 0 # Parallel evaluation with ThreadPoolExecutor (much faster for LLM calls) with ThreadPoolExecutor(max_workers=MAX_EVAL_WORKERS) as executor: futures = {executor.submit(_evaluate_single_item, item): item[0] for item in items_with_idx} for future in as_completed(futures): result = future.result() # Will raise if failed after retries evals.append(result) completed += 1 if progress_callback: progress_callback(completed, total) # Aggregate overall metrics n = len(evals) semantic_scores = [e['semantic_score'] for e in evals] # Apply bias correction for semantic accuracy if use_llm_judge: agg = aggregate_anls_star_llm(semantic_scores, apply_bias_correction=True) mean_semantic = agg['adjusted_score'] * 100 semantic_ci = (agg['ci_lower'] * 100, agg['ci_upper'] * 100) else: mean_semantic = sum(semantic_scores) / n * 100 semantic_ci = None mean_anls = sum(e['anls'] for e in evals) / n * 100 accuracy = sum(e['correct'] for e in evals) / n * 100 mean_doc_f1 = sum(e['doc_f1'] for e in evals) / n * 100 mean_page_f1 = sum(e['page_f1'] for e in evals) / n * 100 # Kuiper statistic kuiper = kuiper_statistic(evals) # By hop type single_hop = [e for e in evals if e['hop_type'] == 'single'] cross_page = [e for e in evals if e['hop_type'] == 'cross_page'] cross_doc = [e for e in evals if e['hop_type'] == 'cross_doc'] # By domain by_domain = defaultdict(list) for e in evals: domain = e['domain'] or 'Other' by_domain[domain].append(e) domain_scores = {} for domain, domain_evals in sorted(by_domain.items()): domain_semantic_scores = [e['semantic_score'] for e in domain_evals] if use_llm_judge: domain_agg = aggregate_anls_star_llm(domain_semantic_scores, apply_bias_correction=True) domain_semantic = domain_agg['adjusted_score'] * 100 else: domain_semantic = sum(domain_semantic_scores) / len(domain_semantic_scores) * 100 domain_scores[domain] = { 'semantic': domain_semantic, 'anls': sum(e['anls'] for e in domain_evals) / len(domain_evals) * 100, 'n': len(domain_evals) } results = { 'n_evaluated': n, 'n_unmatched': len(unmatched), 'unmatched_samples': unmatched[:5], 'overall': { 'semantic': mean_semantic, # Primary metric (ANLS* + LLM judge) 'semantic_ci': semantic_ci, # 95% CI if LLM judge used 'anls': mean_anls, # Secondary metric (pure ANLS*) 'accuracy': accuracy, 'doc_f1': mean_doc_f1, 'page_f1': mean_page_f1, 'kuiper': kuiper['kuiper_stat'] if not kuiper.get('degenerate') else None, }, 'single_evidence': { 'semantic': ( aggregate_anls_star_llm([e['semantic_score'] for e in single_hop], apply_bias_correction=True)['adjusted_score'] * 100 if (use_llm_judge and single_hop) else (sum(e['semantic_score'] for e in single_hop) / len(single_hop) * 100 if single_hop else 0) ), 'anls': sum(e['anls'] for e in single_hop) / len(single_hop) * 100 if single_hop else 0, 'n': len(single_hop) }, 'multi_evidence_same_doc': { 'semantic': ( aggregate_anls_star_llm([e['semantic_score'] for e in cross_page], apply_bias_correction=True)['adjusted_score'] * 100 if (use_llm_judge and cross_page) else (sum(e['semantic_score'] for e in cross_page) / len(cross_page) * 100 if cross_page else 0) ), 'anls': sum(e['anls'] for e in cross_page) / len(cross_page) * 100 if cross_page else 0, 'n': len(cross_page) }, 'multi_evidence_multi_doc': { 'semantic': ( aggregate_anls_star_llm([e['semantic_score'] for e in cross_doc], apply_bias_correction=True)['adjusted_score'] * 100 if (use_llm_judge and cross_doc) else (sum(e['semantic_score'] for e in cross_doc) / len(cross_doc) * 100 if cross_doc else 0) ), 'anls': sum(e['anls'] for e in cross_doc) / len(cross_doc) * 100 if cross_doc else 0, 'n': len(cross_doc) }, 'by_domain': domain_scores, 'used_llm_judge': use_llm_judge } return results @st.fragment def submit_results_fragment(): """Fragment for file upload and evaluation.""" # Check HuggingFace login hf_user = get_hf_user() if not hf_user: st.warning("**Login Required**: Please sign in with your HuggingFace account to submit results.") # Show login button if not show_login_button(): st.info(""" **Login not available.** This feature requires deployment on HuggingFace Spaces with `hf_oauth: true` in the Space's README.md metadata. For local testing, set: `TEST_HF_USER=your_username` """) return # Show logged-in user st.success(f"Logged in as **{hf_user['username']}**") # Check submission rate limit can_submit, limit_msg, hours_left = can_user_submit(hf_user['username']) if not can_submit: st.warning(f"**Rate Limit**: {limit_msg}") st.info(""" This limit helps prevent overfitting to the test set. You can still evaluate locally on the **dev set**: ```bash python evaluate.py your_predictions.jsonl --dataset agentic-document-ai/dataset --split dev ``` """) return # Step 1: Upload and Evaluate st.markdown("#### Step 1: Upload Predictions") # Two options: file upload or paste text upload_tab, paste_tab = st.tabs(["Upload File", "Paste JSONL"]) with upload_tab: uploaded_file = st.file_uploader( "Upload your predictions JSONL file", type=["jsonl"], help="One prediction per line with 'question' and 'answer' fields", ) with paste_tab: pasted_content = st.text_area( "Paste your JSONL content", height=200, help="One JSON object per line", placeholder='{"question": "...", "answer": "...", "citations": [...]}\n{"question": "...", "answer": "...", "citations": [...]}', ) with st.expander("Expected JSONL format"): st.code('''{"question": "What is the total revenue?", "answer": "$1.2M", "citations": [{"file": "report.pdf", "page": 5}], "iterations": 3} {"question": "Who signed the contract?", "answer": ["John Smith", "Jane Doe"], "citations": [{"file": "contract.pdf", "page": 12}], "iterations": 2}''', language="json") st.markdown(""" **Required fields:** - `question`: The question text (must match dataset) - `answer`: Predicted answer (string or list) **Optional fields (for full metrics):** - `citations`: List of `{"file": "...", "page": N}` for attribution metrics - `id`: Question ID (fallback matching) **Effort fields (required for Agentic submissions, at least one per sample):** - `steps`: Number of agentic steps taken (positive integer) - `search_history`: List of search queries performed (e.g. `["query1", "query2"]`) - `effort`: Generic effort measure (positive number), should be proportional to the number of searches, LLM calls, or reasoning tokens generated, in this order of preference """) # Initialize session state for evaluation results if 'eval_results' not in st.session_state: st.session_state.eval_results = None if 'predictions' not in st.session_state: st.session_state.predictions = None # Get content from either file upload or paste file_content = None if uploaded_file is not None: file_content = uploaded_file.read().decode("utf-8") elif pasted_content and pasted_content.strip(): file_content = pasted_content.strip() if file_content: is_valid, error_msg, predictions = validate_jsonl_submission(file_content) if not is_valid: st.error(f"Invalid input: {error_msg}") else: st.success(f"Loaded {len(predictions)} predictions") st.session_state.predictions = predictions st.session_state.predictions_raw = file_content # Store raw content for upload # Evaluate button if st.button("Run Evaluation", type="primary"): with st.spinner("Loading gold standard..."): gold_by_text, gold_by_id = load_gold_standard() if not gold_by_text: st.error("Failed to load gold standard dataset") else: # Progress bar for evaluation progress_bar = st.progress(0, text="Evaluating predictions with semantic accuracy...") status_text = st.empty() def update_progress(current, total): progress_bar.progress(current / total, text=f"Evaluating {current}/{total}...") results = evaluate_predictions( predictions, gold_by_text, gold_by_id, use_llm_judge=True, progress_callback=update_progress ) progress_bar.empty() status_text.empty() st.session_state.eval_results = results # Show evaluation results if st.session_state.eval_results: results = st.session_state.eval_results if 'error' in results: st.error(results['error']) else: st.markdown("#### Evaluation Results") # Summary metrics - use semantic accuracy as primary if available col1, col2, col3, col4 = st.columns(4) with col1: if 'semantic' in results['overall']: ci = results['overall'].get('semantic_ci') ci_text = f" [{ci[0]:.1f}-{ci[1]:.1f}]" if ci else "" st.metric("Accuracy (LLM judge)", f"{results['overall']['semantic']:.1f}{ci_text}") else: st.metric("Accuracy (ANLS*)", f"{results['overall']['anls']:.1f}") with col2: st.metric("Attribution (Page F1)", f"{results['overall']['page_f1']:.1f}") with col3: kuiper_val = results['overall']['kuiper'] st.metric("Effort (Kuiper)", f"{kuiper_val:.3f}" if kuiper_val else "N/A") with col4: st.metric("Evaluated", f"{results['n_evaluated']} / {results['n_evaluated'] + results['n_unmatched']}") # Detailed breakdown with st.expander("Detailed Breakdown"): # Check which metrics are available has_semantic = 'semantic' in results['overall'] if has_semantic: st.markdown(f""" | Metric | Value | |--------|-------| | **Accuracy (LLM judge)** | {results['overall']['semantic']:.1f} | | **ANLS*** (string match) | {results['overall']['anls']:.1f} | | **Acc. Single-Hop** (n={results['single_evidence']['n']}) | {results['single_evidence'].get('semantic', results['single_evidence']['anls']):.1f} | | **Acc. Cross-Page** (n={results['multi_evidence_same_doc']['n']}) | {results['multi_evidence_same_doc'].get('semantic', results['multi_evidence_same_doc']['anls']):.1f} | | **Acc. Cross-Doc** (n={results['multi_evidence_multi_doc']['n']}) | {results['multi_evidence_multi_doc'].get('semantic', results['multi_evidence_multi_doc']['anls']):.1f} | | **Attribution (Doc F1)** | {results['overall']['doc_f1']:.1f} | | **Attribution (Page F1)** | {results['overall']['page_f1']:.1f} | """) else: st.markdown(f""" | Metric | Value | |--------|-------| | **Overall ANLS*** | {results['overall']['anls']:.1f} | | **Acc. Single-Hop** (n={results['single_evidence']['n']}) | {results['single_evidence']['anls']:.1f} | | **Acc. Cross-Page** (n={results['multi_evidence_same_doc']['n']}) | {results['multi_evidence_same_doc']['anls']:.1f} | | **Acc. Cross-Doc** (n={results['multi_evidence_multi_doc']['n']}) | {results['multi_evidence_multi_doc']['anls']:.1f} | | **Attribution (Doc F1)** | {results['overall']['doc_f1']:.1f} | | **Attribution (Page F1)** | {results['overall']['page_f1']:.1f} | """) if results['n_unmatched'] > 0: with st.expander(f"{results['n_unmatched']} unmatched questions"): for q in results['unmatched_samples']: st.text(f"• {q}") if results['n_unmatched'] > 5: st.text(f"... and {results['n_unmatched'] - 5} more") # Step 2: Model Information st.markdown("---") st.markdown("#### Step 2: Model Information") col1, col2 = st.columns(2) with col1: model_name = st.text_input("Model Name *", placeholder="e.g., GPT-4o-Agent") organization = st.text_input("Organization *", placeholder="e.g., OpenAI") model_type = st.selectbox("Model Type *", options=["", "api", "open-weight"]) with col2: description = st.text_area( "Description", placeholder="Brief description of your approach (e.g., 'Vision-language model with sparse search tool')", height=80 ) link = st.text_input("Link (Optional)", placeholder="https://arxiv.org/abs/... or https://github.com/...") selected_tags = st.multiselect( "Tags", options=AVAILABLE_TAGS, default=["Agentic"], help="Select tags that describe your approach" ) # Step 3: Submit st.markdown("---") st.markdown("#### Step 3: Submit to Leaderboard") if st.button("Submit to Leaderboard", type="primary", disabled=not (model_name and organization and model_type)): # Validate required fields submit_error = None if not model_name or not organization or not model_type: submit_error = "Please fill in all required fields (Model Name, Organization, Model Type)" elif "Agentic" in selected_tags and st.session_state.predictions: missing_effort = [ (i + 1, p.get('question', '')[:60]) for i, p in enumerate(st.session_state.predictions) if not _prediction_has_effort(p) ] if missing_effort: samples = "; ".join(f"line {ln}: {q}..." for ln, q in missing_effort[:5]) extra = f" (and {len(missing_effort) - 5} more)" if len(missing_effort) > 5 else "" submit_error = ( f"**Agentic submissions require effort data for every sample.** " f"{len(missing_effort)} prediction(s) are missing effort information " f"(e.g. `iterations`, `steps`, `llm_calls`, `effort`, or `search_history`). " f"Examples: {samples}{extra}" ) if submit_error: st.error(submit_error) else: # Get current user for submission tracking hf_user = get_hf_user() # Prepare submission data submission = { "model_name": model_name.strip(), "organization": organization.strip(), "description": description.strip() if description else "", "link": link.strip() if link else "", "tags": selected_tags, "submitted_by": hf_user['username'] if hf_user else "anonymous", "metadata": { "model_type": model_type, }, "results": { "overall": { "semantic": results['overall'].get('semantic'), "semantic_ci": results['overall'].get('semantic_ci'), "anls": results['overall']['anls'], "page_f1": results['overall']['page_f1'], "doc_f1": results['overall']['doc_f1'], "kuiper": results['overall']['kuiper'], }, "single_evidence": results['single_evidence'], "multi_evidence_same_doc": results['multi_evidence_same_doc'], "multi_evidence_multi_doc": results['multi_evidence_multi_doc'], "by_domain": results.get('by_domain', {}), }, "submission_date": datetime.now(timezone.utc).isoformat(), } # Upload to HuggingFace Hub with st.spinner("Uploading to leaderboard..."): try: # Create path matching expected structure: {org}/{model}_results_{timestamp}.json safe_org = organization.strip().replace(" ", "_").replace("/", "-") safe_model = model_name.strip().replace(" ", "_").replace("/", "-") timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") filename = f"{safe_model}_results_{timestamp}.json" path_in_repo = f"{safe_org}/{filename}" # Upload using HfApi api = HfApi() # Upload results JSON api.upload_file( path_or_fileobj=json.dumps(submission, indent=2).encode("utf-8"), path_in_repo=path_in_repo, repo_id=RESULTS_REPO, repo_type="dataset", token=TOKEN, commit_message=f"Add results for {organization}/{model_name}" ) # Upload predictions file if st.session_state.get('predictions_raw'): predictions_filename = f"{safe_model}_predictions_{timestamp}.jsonl" predictions_path = f"{safe_org}/{predictions_filename}" api.upload_file( path_or_fileobj=st.session_state.predictions_raw.encode("utf-8"), path_in_repo=predictions_path, repo_id=RESULTS_REPO, repo_type="dataset", token=TOKEN, commit_message=f"Add predictions for {organization}/{model_name}" ) st.success("Successfully submitted to leaderboard!") st.balloons() # Record submission for rate limiting record_submission(hf_user['username']) # Clear cache to force refresh on next load download_data.clear() load_eval_results.clear() # Clear form state st.session_state.eval_results = None st.session_state.predictions = None st.session_state.predictions_raw = None st.info("Your submission has been saved! The leaderboard will update shortly.") # Auto-refresh after a moment st.rerun(scope="app") except Exception as e: st.error(f"Upload failed: {str(e)}") st.warning("Please ensure HF_TOKEN environment variable is set with write access to the repository.") with st.expander("Submission JSON (for manual upload)"): st.code(json.dumps(submission, indent=2), language="json") st.info(f""" **To submit manually:** 1. Copy the JSON above 2. Save as `{path_in_repo}` 3. Upload to `{RESULTS_REPO}` on HuggingFace Hub Or contact lukasz.borchmann@snowflake.com """) def get_all_submissions() -> list[dict]: """Get all submission files with their metadata.""" submissions = [] results_path = Path(EVAL_RESULTS_PATH) if not results_path.exists(): return submissions for org_dir in results_path.iterdir(): if org_dir.is_dir() and not org_dir.name.startswith('.'): for result_file in org_dir.glob("*_results_*.json"): try: with open(result_file) as f: data = json.load(f) submission_date = data.get("submission_date") if not isinstance(submission_date, str): submission_date = "" submissions.append({ "file_path": str(result_file), "relative_path": f"{org_dir.name}/{result_file.name}", "model_name": data.get("model_name", "Unknown"), "organization": data.get("organization", org_dir.name), "submitted_by": data.get("submitted_by", "Unknown"), "submission_date": submission_date, "accuracy": data.get("results", {}).get("overall", {}).get("anls", 0.0), "raw_json": json.dumps(data, indent=2), }) except Exception as e: submissions.append({ "file_path": str(result_file), "relative_path": f"{org_dir.name}/{result_file.name}", "model_name": "Error loading", "organization": org_dir.name, "submitted_by": "Unknown", "submission_date": "Unknown", "accuracy": 0.0, "raw_json": f"Error: {e}", }) # Sort by submission date (newest first), fallback to empty string def _submission_sort_key(item: dict) -> str: date_val = item.get("submission_date") return date_val if isinstance(date_val, str) else "" submissions.sort(key=_submission_sort_key, reverse=True) return submissions def delete_submission_from_hub(relative_path: str) -> tuple[bool, str]: """Delete a submission file from the HuggingFace Hub.""" try: api = HfApi(token=TOKEN) api.delete_file( path_in_repo=relative_path, repo_id=RESULTS_REPO, repo_type="dataset", ) return True, f"Successfully deleted {relative_path}" except Exception as e: return False, f"Failed to delete: {str(e)}" def update_submission_on_hub(relative_path: str, json_content: str) -> tuple[bool, str]: """Update a submission file on HuggingFace Hub.""" import tempfile try: # Validate JSON data = json.loads(json_content) # Create temp file with updated content with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: json.dump(data, f, indent=2) temp_path = f.name api = HfApi(token=TOKEN) api.upload_file( path_or_fileobj=temp_path, path_in_repo=relative_path, repo_id=RESULTS_REPO, repo_type="dataset", token=TOKEN, commit_message=f"Admin edit: {relative_path}" ) os.unlink(temp_path) # Clean up return True, f"Successfully updated {relative_path}" except json.JSONDecodeError as e: return False, f"Invalid JSON: {str(e)}" except Exception as e: return False, f"Failed to update: {str(e)}" @st.fragment def admin_panel(): """Admin panel for managing submissions.""" st.markdown("#### Admin Panel") st.markdown("Manage leaderboard submissions. Changes are permanent.") # Admin action buttons col1, col2 = st.columns(2) with col1: if st.button("Refresh Submissions", use_container_width=True): st.rerun() with col2: if st.button("Reload from HuggingFace", type="primary", use_container_width=True): # Clear all caches download_data.clear() load_eval_results.clear() # Delete local cached files to force fresh download if EVAL_RESULTS_PATH.exists(): shutil.rmtree(EVAL_RESULTS_PATH) if EVAL_REQUESTS_PATH.exists(): shutil.rmtree(EVAL_REQUESTS_PATH) # Re-download data with st.spinner("Re-downloading data from HuggingFace Hub..."): download_data() st.success("Leaderboard data reloaded from source!") st.rerun(scope="app") st.divider() submissions = get_all_submissions() if not submissions: st.info("No submissions found.") return st.markdown(f"**{len(submissions)} submissions found**") # Display each submission for i, sub in enumerate(submissions): with st.expander(f"{sub['model_name']} ({sub['organization']}) - {sub['submission_date'][:10] if len(sub['submission_date']) > 10 else sub['submission_date']}"): col1, col2 = st.columns([3, 1]) with col1: st.markdown(f""" **Model:** {sub['model_name']} **Organization:** {sub['organization']} **Submitted by:** {sub['submitted_by']} **Date:** {sub['submission_date']} **Accuracy:** {sub['accuracy']:.1%} **File:** `{sub['relative_path']}` """) with col2: # Edit button if st.button("Edit", key=f"edit_{i}"): st.session_state[f"editing_{i}"] = True st.session_state[f"confirm_delete_{i}"] = False # Delete button with confirmation if st.button("Delete", key=f"delete_{i}", type="secondary"): st.session_state[f"confirm_delete_{i}"] = True st.session_state[f"editing_{i}"] = False if st.session_state.get(f"confirm_delete_{i}", False): st.warning("Are you sure?") col_yes, col_no = st.columns(2) with col_yes: if st.button("Yes", key=f"confirm_yes_{i}", type="primary"): success, message = delete_submission_from_hub(sub['relative_path']) if success: st.success(message) # Clear caches and refresh download_data.clear() load_eval_results.clear() st.session_state[f"confirm_delete_{i}"] = False st.rerun() else: st.error(message) with col_no: if st.button("No", key=f"confirm_no_{i}"): st.session_state[f"confirm_delete_{i}"] = False st.rerun() # Edit mode if st.session_state.get(f"editing_{i}", False): st.markdown("**Edit JSON:**") edited_json = st.text_area( "Edit submission JSON", value=sub['raw_json'], height=400, key=f"json_editor_{i}", label_visibility="collapsed" ) col_save, col_cancel = st.columns(2) with col_save: if st.button("Save Changes", key=f"save_{i}", type="primary"): success, message = update_submission_on_hub(sub['relative_path'], edited_json) if success: st.success(message) # Clear caches and refresh download_data.clear() load_eval_results.clear() st.session_state[f"editing_{i}"] = False st.rerun() else: st.error(message) with col_cancel: if st.button("Cancel", key=f"cancel_{i}"): st.session_state[f"editing_{i}"] = False st.rerun() else: # Show raw JSON (read-only) - use checkbox instead of expander to avoid nesting if st.checkbox("Show JSON", key=f"show_json_{i}"): st.code(sub['raw_json'], language="json") # News management section st.divider() st.markdown("#### News Management") news_items = get_news() news_json = json.dumps(news_items, indent=2) with st.expander("Edit News (JSON)", expanded=False): st.markdown(""" **Format:** Array of objects with `date` (YYYY-MM-DD) and `text` fields. ```json [ {"date": "2025-01-04", "text": "Your update message here"}, ... ] ``` """) edited_news = st.text_area( "News JSON", value=news_json, height=300, key="news_editor", label_visibility="collapsed" ) if st.button("Save News", type="primary"): try: parsed_news = json.loads(edited_news) if not isinstance(parsed_news, list): st.error("News must be a JSON array") else: success, message = save_news(parsed_news) if success: st.success(message) st.rerun() else: st.error(message) except json.JSONDecodeError as e: st.error(f"Invalid JSON: {e}") def main(): # Handle OAuth callback (if returning from HuggingFace login) handle_oauth_callback() # Handle "analyze" query parameter from leaderboard analyze_model = st.query_params.get("analyze") if analyze_model: st.session_state.selected_model_for_analysis = unquote(analyze_model) st.session_state.go_to_analysis_tab = True # Clear the query param to avoid re-triggering st.query_params.clear() # Inject JavaScript to click on the Analysis tab import streamlit.components.v1 as components components.html(""" """, height=0) # Download data from HuggingFace Hub with st.spinner("Loading data from HuggingFace Hub..."): download_data() # Load data df = load_eval_results() # Check if admin user is logged in hf_user = get_hf_user() is_admin = hf_user and hf_user.get('username', '').lower() == 'borchmann' # Tabs - show Admin tab only for admin users if is_admin: tab1, tab2, tab3, tab4, tab5 = st.tabs(["Leaderboard", "Analysis", "About", "Submit Results", "Admin"]) else: tab1, tab2, tab3, tab4 = st.tabs(["Leaderboard", "Analysis", "About", "Submit Results"]) # ===== LEADERBOARD TAB ===== with tab1: # Header with icon (fallback to emoji if icon doesn't load) if ICON_MEDAL: icon_html = f'' else: icon_html = f'🏆' st.markdown(f'

{icon_html} Leaderboard

', unsafe_allow_html=True) if df.empty: st.warning("No evaluation results found. Submit your results to appear on the leaderboard!") else: # ===== FILTERS SIDE BY SIDE ===== filter_col1, filter_col2 = st.columns(2) with filter_col1: # TAG FILTER - chips use MID_BLUE (darker, gradient start) tags_in_data = get_all_tags_from_df(df) all_available_tags = sorted(list(set(AVAILABLE_TAGS + tags_in_data))) selected_tags = st.multiselect( "Filter by techniques/features:", options=all_available_tags, default=[], placeholder="Click to filter by tags...", key="tag_filter", ) with filter_col2: # COLUMN SELECTOR - chips use SNOWFLAKE_BLUE (lighter, gradient end) # Mapping: short chip name -> full column name COLUMN_CHIP_NAMES = { "Accuracy": "Accuracy (LLM judge)", "Acc. Single-Hop": "Acc. Single-Hop", "Acc. Cross-Page": "Acc. Cross-Page", "Acc. Cross-Doc": "Acc. Cross-Doc", "ANLS*": "ANLS* (string)", "Attribution": "Attribution (Page F1)", "Attribution (Doc)": "Attribution (Doc F1)", "Effort": "Effort (Kuiper)", "Model Type": "Model Type", "Tags": "Tags", } # Reverse mapping for lookup CHIP_TO_COLUMN = COLUMN_CHIP_NAMES COLUMN_TO_CHIP = {v: k for k, v in COLUMN_CHIP_NAMES.items()} all_columns = list(df.columns) # Model and Organization are always visible (not in selector) always_visible = ["Model", "Organization"] # Hidden columns (used internally but not shown as separate columns) hidden_cols = ["Link", "Submission Date", "Description", "_by_domain", "_Accuracy_CI", "_Accuracy_SE"] # Full column names that are optional (Tags moved to end) optional_full_cols = [c for c in all_columns if c not in hidden_cols + always_visible and c != "Tags"] optional_full_cols.append("Tags") # Add Tags at the end # Convert to chip names for display optional_chips = [COLUMN_TO_CHIP.get(c, c) for c in optional_full_cols] default_chips = ["Model Type", "Tags", "Accuracy", "Attribution", "Effort"] default_selected = [c for c in default_chips if c in optional_chips] selected_chips = st.multiselect( "Select columns to display:", options=optional_chips, default=default_selected, key="column_selector", ) # Convert selected chips back to full column names selected_optional = [CHIP_TO_COLUMN.get(c, c) for c in selected_chips] # Apply tag filter filtered_df = filter_df_by_tags(df, selected_tags) # Show filter status if selected_tags: st.caption(f"Showing {len(filtered_df)} of {len(df)} models matching selected tags") # Model and Organization are always included first selected_columns = ["Model", "Organization"] + [c for c in optional_full_cols if c in selected_optional] # Initialize uncertainty mode in session state if not present if "uncertainty_mode" not in st.session_state: st.session_state.uncertainty_mode = "± SE" if selected_columns: # Render HTML table with proper styling render_leaderboard_table(filtered_df, selected_columns, uncertainty_mode=st.session_state.uncertainty_mode) # Bottom row: Uncertainty toggle (left) and Download button (right) st.markdown("") # Small spacing col1, col2 = st.columns([3, 1]) with col1: st.radio( "Uncertainty:", options=["± SE", "90% CI", "95% CI", "None"], key="uncertainty_mode", horizontal=True, help="Display uncertainty estimates for accuracy and attribution metrics" ) with col2: # Right-align the download button but keep its natural width st.markdown('''''', unsafe_allow_html=True) csv_df = build_csv_download_df(filtered_df, selected_columns, st.session_state.uncertainty_mode) csv = csv_df.to_csv(index=False) st.download_button( label="Download as CSV", data=csv, file_name="leaderboard.csv", mime="text/csv", key="download_csv_btn", ) # News and Paper section (two columns) st.markdown("
", unsafe_allow_html=True) # Spacing news_col, paper_col = st.columns([2, 1]) with news_col: st.markdown("Updates", unsafe_allow_html=True) news_items = get_news()[:NEWS_MAX_DISPLAY] if news_items: for item in news_items: date_str = item.get('date', '') text = item.get('text', '') # Use full date (YYYY-MM-DD) formatted_date = date_str[:10] if len(date_str) >= 10 else date_str st.caption(f"**{formatted_date}**: {text}") else: st.caption("No updates yet.") with paper_col: st.markdown("""
Strategic Navigation or Stochastic Search?
How Agents and Humans Reason Over Document Collections
""", unsafe_allow_html=True) # ===== VISUALIZATIONS TAB ===== with tab2: if ICON_EYE: icon_html = f'' else: icon_html = f'📈' st.markdown(f'

{icon_html} Analysis

', unsafe_allow_html=True) if df.empty: st.warning("No data available for visualization.") else: # Check if user came from leaderboard with a specific model if st.session_state.get('go_to_analysis_tab'): st.info(f"Showing analysis for: **{st.session_state.get('selected_model_for_analysis', '')}**") st.session_state.go_to_analysis_tab = False # Model details selector - at the top st.markdown("#### Model Details") model_names = df["Model"].tolist() # Use session state to allow setting model from leaderboard if 'selected_model_for_analysis' not in st.session_state: st.session_state.selected_model_for_analysis = model_names[0] if model_names else None # Ensure selected model exists in current data selected_index = 0 if st.session_state.selected_model_for_analysis in model_names: selected_index = model_names.index(st.session_state.selected_model_for_analysis) selected_model = st.selectbox( "Select a model to view detailed breakdown:", model_names, index=selected_index, key="analysis_model_selector" ) if selected_model: st.session_state.selected_model_for_analysis = selected_model show_model_details(selected_model) # Plots below st.markdown("---") st.markdown("#### Comparative Plots") # Two plots side by side col1, col2 = st.columns(2) with col1: fig_attribution = create_accuracy_vs_attribution_plot(df) st.plotly_chart(fig_attribution, width="stretch") with col2: fig_effort = create_accuracy_vs_effort_plot(df) st.plotly_chart(fig_effort, width="stretch") st.markdown(""" **Understanding the plots:** - Each point represents a model submission - **Orange points**: API-based models - **Blue points**: Open-weight models - Hover over points to see model details - **Left plot**: Upper-right = high accuracy with good attribution (optimal) - **Right plot**: Upper-left = high accuracy with good effort calibration (optimal) """) # ===== ABOUT TAB ===== with tab3: if ICON_DOCS: icon_html = f'' else: icon_html = f'📖' st.markdown(f'

{icon_html} About

', unsafe_allow_html=True) about_col1, about_col2 = st.columns(2) with about_col1: st.markdown(""" #### MADQA Benchmark This benchmark evaluates AI systems on **Agentic Document Collection Visual Question Answering** — a task requiring systems to navigate, retrieve, reason over, and aggregate information from heterogeneous document collections. 📄 [Read the paper: *Strategic Navigation or Stochastic Search?*](https://arxiv.org/abs/2603.12180) ##### Dataset - **2,250** human-authored question-answer pairs - **800** multi-page PDF documents from diverse real-world domains - **18,619** total pages with rich visual layouts - **17.3%** multi-hop questions (cross-page and cross-document) - **63** document categories across **13** high-level domains ##### Task Properties The task is characterized by six formal properties: 1. **Extractive**: Answers are drawn from evidence pages, not generated abstractly 2. **Multi-Hop**: Evidence may span multiple disjoint pages requiring aggregation 3. **Closed-World**: Answers must be derivable solely from the corpus 4. **Grounded**: Answers must be faithfully attributed to minimal evidence 5. **Agentic**: Requires iterative retrieval and reasoning (planning, navigation, aggregation) 6. **Visual**: Answering may require non-textual information (layout, tables, figures) """) with about_col2: st.markdown(""" #### Metrics ##### Accuracy (LLM judge) - **Accuracy (LLM judge)**: Primary metric combining ANLS* string matching with an LLM judge (G-Eval framework). Captures semantic correctness beyond exact string matching, with statistical bias correction - **ANLS* (string)**: Pure string-based score using Average Normalized Levenshtein Similarity with optimal element alignment for lists/sets - **Acc. Single-Hop**: Accuracy on questions requiring a single evidence page - **Acc. Cross-Page**: Accuracy on multi-hop questions within the same document - **Acc. Cross-Doc**: Accuracy on multi-hop questions spanning multiple documents ##### Attribution (Page F1) - **Attribution (Page F1)**: F1 score measuring overlap between cited pages and gold evidence pages (penalizes both missing and spurious citations) - **Attribution (Doc F1)**: Document-level attribution accuracy (whether the correct documents were identified) ##### Effort (Kuiper) - **Effort (Kuiper)**: Measures whether computational effort correlates with problem difficulty. Lower values indicate better calibration—the system "knows what it knows" and doesn't waste effort on unsolvable queries --- **Contact:** [lukasz.borchmann@snowflake.com](mailto:lukasz.borchmann@snowflake.com) """) # ===== SUBMIT TAB ===== with tab4: if ICON_WRITE: icon_html = f'' else: icon_html = f'📝' st.markdown(f'

{icon_html} Submit Results

', unsafe_allow_html=True) if not EVAL_AVAILABLE: st.warning("Evaluation module not available. Please install dependencies: `pip install anls-star datasets`") # Use fragment to prevent tab switch on file upload submit_results_fragment() # ===== ADMIN TAB (only for admin users) ===== if is_admin: with tab5: admin_panel() if __name__ == "__main__": main()