Borchmann's picture
Add effort validation for agentic submissions and uniform-effort display
dfedb16
raw
history blame
128 kB
"""
MADQA Leaderboard - Streamlit Version
Benchmark for evaluating AI systems on document collection question answering.
Based on the paper: "Strategic Navigation or Stochastic Search?
How Agents and Humans Reason Over Document Collections"
Color palette: Snowflake colors
- SNOWFLAKE BLUE: #29B5E8
- MID-BLUE: #11567F
- MIDNIGHT: #000000
- MEDIUM GRAY: #5B5B5B
- STAR BLUE: #75CDD7
- VALENCIA ORANGE: #FF9F36
- FIRST LIGHT: #D45B90
- PURPLE MOON: #7254A3
"""
import base64
import json
import os
import secrets
import shutil
import sys
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone
from pathlib import Path
from urllib.parse import urlencode, quote, unquote
# Parallelization config for LLM evaluation
MAX_EVAL_WORKERS = 24
import pandas as pd
import plotly.graph_objects as go
import requests
import streamlit as st
from huggingface_hub import snapshot_download, HfApi, hf_hub_download
# Add eval module to path
sys.path.insert(0, str(Path(__file__).parent / "eval"))
try:
from metrics import (
anls_star,
anls_star_llm,
aggregate_anls_star_llm,
standard_error,
confidence_interval,
citation_f1,
kuiper_statistic,
get_effort_value,
LLM_JUDGE_SPECIFICITY,
LLM_JUDGE_SENSITIVITY
)
from datasets import load_dataset
EVAL_AVAILABLE = True
except ImportError:
EVAL_AVAILABLE = False
# Fallback values for constants
LLM_JUDGE_SPECIFICITY = 1.0
LLM_JUDGE_SENSITIVITY = 0.98
# Page configuration
st.set_page_config(
page_title="MADQA Leaderboard",
page_icon="📄",
layout="wide",
initial_sidebar_state="collapsed",
)
# HuggingFace Hub configuration
TOKEN = os.environ.get("HF_TOKEN")
QUEUE_REPO = "agentic-document-ai/backend-requests"
RESULTS_REPO = "agentic-document-ai/backend-results"
CACHE_PATH = os.getenv("HF_HOME", ".")
# Submission rate limiting
SUBMISSION_LIMITS_FILE = "submission_limits.json"
SUBMISSION_LIMIT_HOURS = float(os.environ.get("SUBMISSION_LIMIT_HOURS", 24)) # Configurable, default 24 hours
NEWS_FILE = "news.json"
NEWS_MAX_DISPLAY = 5
def get_submission_limits() -> dict:
"""Download submission limits from HF Hub."""
try:
# Try to download the limits file
file_path = hf_hub_download(
repo_id=RESULTS_REPO,
filename=SUBMISSION_LIMITS_FILE,
repo_type="dataset",
token=TOKEN,
)
with open(file_path) as f:
return json.load(f)
except Exception:
return {} # File doesn't exist yet
def can_user_submit(username: str) -> tuple[bool, str, float]:
"""Check if user can submit based on rate limit.
Returns: (can_submit, message, hours_remaining)
"""
limits = get_submission_limits()
if username not in limits:
return True, "", 0
last_submission_str = limits[username]
last_submission = datetime.fromisoformat(last_submission_str)
now = datetime.now(timezone.utc)
time_since = now - last_submission
hours_since = time_since.total_seconds() / 3600
if hours_since < SUBMISSION_LIMIT_HOURS:
hours_remaining = SUBMISSION_LIMIT_HOURS - hours_since
hours = int(hours_remaining)
minutes = int((hours_remaining - hours) * 60)
return False, f"Please wait {hours}h {minutes}m before your next test set submission.", hours_remaining
return True, "", 0
def record_submission(username: str):
"""Record a new submission timestamp for the user."""
import tempfile
# Get current limits (fresh, not cached)
limits = get_submission_limits()
# Update with new timestamp
limits[username] = datetime.now(timezone.utc).isoformat()
# Upload updated file
try:
api = HfApi(token=TOKEN)
# Create temp file with updated limits
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
json.dump(limits, f, indent=2)
temp_path = f.name
api.upload_file(
path_or_fileobj=temp_path,
path_in_repo=SUBMISSION_LIMITS_FILE,
repo_id=RESULTS_REPO,
repo_type="dataset",
token=TOKEN,
)
os.unlink(temp_path) # Clean up
except Exception as e:
st.warning(f"Could not record submission time: {e}")
def get_news() -> list:
"""Load news items from HF Hub."""
try:
file_path = hf_hub_download(
repo_id=RESULTS_REPO,
filename=NEWS_FILE,
repo_type="dataset",
token=TOKEN,
)
with open(file_path) as f:
news = json.load(f)
# Sort by date descending
news.sort(key=lambda x: x.get('date', ''), reverse=True)
return news
except Exception:
# Return default news if file doesn't exist
return [
{"date": "2025-01-04", "text": "Leaderboard launched! Submit your results to appear on the board."}
]
def save_news(news: list) -> tuple[bool, str]:
"""Save news items to HF Hub."""
import tempfile
try:
# Sort by date descending before saving
news.sort(key=lambda x: x.get('date', ''), reverse=True)
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
json.dump(news, f, indent=2)
temp_path = f.name
api = HfApi(token=TOKEN)
api.upload_file(
path_or_fileobj=temp_path,
path_in_repo=NEWS_FILE,
repo_id=RESULTS_REPO,
repo_type="dataset",
token=TOKEN,
commit_message="Update news"
)
os.unlink(temp_path)
return True, "News updated successfully"
except Exception as e:
return False, f"Failed to save news: {str(e)}"
def get_oauth_config() -> dict | None:
"""Get HuggingFace OAuth configuration from environment variables.
These are automatically set by HuggingFace Spaces when hf_oauth: true is in README.md.
See: https://huggingface.co/docs/hub/en/spaces-oauth
"""
client_id = os.environ.get("OAUTH_CLIENT_ID")
client_secret = os.environ.get("OAUTH_CLIENT_SECRET")
if client_id and client_secret:
return {
"client_id": client_id,
"client_secret": client_secret,
"scopes": os.environ.get("OAUTH_SCOPES", "openid profile"),
"provider_url": os.environ.get("OPENID_PROVIDER_URL", "https://huggingface.co"),
}
return None
def get_hf_user() -> dict | None:
"""Get the logged-in HuggingFace user info from OAuth.
Returns dict with 'username', 'name', 'picture' if logged in, None otherwise.
Works on HuggingFace Spaces with hf_oauth: true in README.md
For local testing, set environment variable:
TEST_HF_USER=your_username
"""
# Check for test user (local development)
test_user = os.environ.get("TEST_HF_USER")
if test_user:
return {
'username': test_user,
'name': test_user,
'picture': '',
}
# Check session state for logged in user (from OAuth callback)
if 'hf_user' in st.session_state and st.session_state.hf_user:
return st.session_state.hf_user
return None
def handle_oauth_callback():
"""Handle OAuth callback from HuggingFace.
After user authorizes, HF redirects back with 'code' and 'state' query params.
We exchange the code for tokens and store user info in session state.
Note: We don't strictly validate state because Streamlit session state is lost
during the redirect flow. The OAuth is still secure because:
1. The code can only be used once
2. The code is tied to our client_id
3. We're on HTTPS in production
"""
try:
query_params = st.query_params
except Exception:
# SessionInfo not yet initialized - skip OAuth handling on this run
return False
# Check if this is an OAuth callback
code = query_params.get("code")
if not code:
return False
# If user is already logged in, just clear the query params
try:
if 'hf_user' in st.session_state and st.session_state.hf_user:
st.query_params.clear()
return True
except Exception:
pass
oauth_config = get_oauth_config()
if not oauth_config:
st.query_params.clear()
return False
# Get redirect URI - must match what HuggingFace expects (.hf.space domain)
space_host = os.environ.get("SPACE_HOST", "")
if space_host:
redirect_uri = f"https://{space_host}"
else:
redirect_uri = "http://localhost:8501"
# Exchange code for tokens
token_url = f"{oauth_config['provider_url']}/oauth/token"
try:
# Prepare auth header
credentials = f"{oauth_config['client_id']}:{oauth_config['client_secret']}"
auth_header = base64.b64encode(credentials.encode()).decode()
response = requests.post(
token_url,
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": redirect_uri,
"client_id": oauth_config["client_id"],
},
headers={
"Authorization": f"Basic {auth_header}",
"Content-Type": "application/x-www-form-urlencoded",
},
timeout=10,
)
if response.status_code != 200:
# Code might have been used already or expired - clear and let user retry
st.query_params.clear()
return False
tokens = response.json()
access_token = tokens.get("access_token")
# Get user info
userinfo_url = f"{oauth_config['provider_url']}/oauth/userinfo"
userinfo_response = requests.get(
userinfo_url,
headers={"Authorization": f"Bearer {access_token}"},
timeout=10,
)
if userinfo_response.status_code == 200:
userinfo = userinfo_response.json()
st.session_state.hf_user = {
'username': userinfo.get('preferred_username', userinfo.get('name', '')),
'name': userinfo.get('name', ''),
'picture': userinfo.get('picture', ''),
}
# Clean up query params
st.query_params.clear()
return True
except Exception as e:
# Silent failure - user can retry login
pass
st.query_params.clear()
return False
def is_running_on_hf_spaces() -> bool:
"""Check if the app is running on HuggingFace Spaces."""
return os.environ.get("SPACE_ID") is not None
def get_login_url() -> str | None:
"""Generate the HuggingFace OAuth login URL."""
oauth_config = get_oauth_config()
if not oauth_config:
return None
# Get redirect URI - must use .hf.space domain (required by HuggingFace OAuth)
space_host = os.environ.get("SPACE_HOST", "")
if space_host:
redirect_uri = f"https://{space_host}"
else:
redirect_uri = "http://localhost:8501"
# Generate a random state (required by OAuth spec, but we can't validate it
# reliably due to Streamlit session loss during redirect)
state = secrets.token_urlsafe(16)
# Build authorization URL
params = {
"client_id": oauth_config["client_id"],
"redirect_uri": redirect_uri,
"scope": oauth_config["scopes"],
"state": state,
"response_type": "code",
}
return f"{oauth_config['provider_url']}/oauth/authorize?{urlencode(params)}"
def show_login_button():
"""Show the HuggingFace login button."""
login_url = get_login_url()
if login_url:
# Use custom HTML styled like Streamlit's default button, aligned left
st.markdown(f'''
<a href="{login_url}" target="_self" style="
display: inline-flex;
align-items: center;
justify-content: center;
padding: 0.25rem 0.75rem;
background-color: transparent;
color: inherit;
border: 1px solid rgba(250, 250, 250, 0.2);
border-radius: 0.5rem;
text-decoration: none;
font-size: 0.875rem;
font-weight: 400;
line-height: 1.6;
cursor: pointer;
transition: border-color 0.2s, background-color 0.2s;
" onmouseover="this.style.borderColor='rgba(250,250,250,0.6)'; this.style.backgroundColor='rgba(250,250,250,0.05)';"
onmouseout="this.style.borderColor='rgba(250,250,250,0.2)'; this.style.backgroundColor='transparent';">
Sign in with Hugging Face
</a>
''', unsafe_allow_html=True)
return True
return False
def logout():
"""Log out the current user."""
if 'hf_user' in st.session_state:
del st.session_state.hf_user
# Colors
SNOWFLAKE_BLUE = "#29B5E8"
MID_BLUE = "#11567F"
VALENCIA_ORANGE = "#FF9F36"
STAR_BLUE = "#75CDD7"
FIRST_LIGHT = "#D45B90"
PURPLE_MOON = "#7254A3"
MEDIUM_GRAY = "#5B5B5B"
# Available tags for filtering - can be extended
AVAILABLE_TAGS = [
"Agentic",
"Conventional RAG",
"Sparse Search Tool",
"Semantic Search Tool",
"Vision and Language",
"Text-only",
]
# Tag colors for visual distinction (cycling through Snowflake secondary colors)
TAG_COLORS = {
"Agentic": SNOWFLAKE_BLUE,
"Conventional RAG": STAR_BLUE,
"Sparse Search Tool": VALENCIA_ORANGE,
"Semantic Search Tool": FIRST_LIGHT,
"Vision and Language": PURPLE_MOON,
"Text-only": SNOWFLAKE_BLUE,
}
# Custom CSS following Snowflake Brand Color Guide
# Primary: MID-BLUE (#11567F) for accents/sections, SNOWFLAKE BLUE (#29B5E8) sparingly
# Use white text on dark backgrounds per accessibility guidelines
st.markdown(f"""
<style>
/* Dark theme base - using near-black for good contrast */
.stApp {{
background-color: #0e1117;
}}
/* ===== TAB STYLING ===== */
.stTabs [data-baseweb="tab-list"] {{
gap: 8px;
background-color: transparent;
border-bottom: 2px solid {MID_BLUE};
padding-bottom: 0;
}}
.stTabs [data-baseweb="tab"] {{
height: 50px;
padding: 0 28px;
background-color: transparent !important;
border-radius: 0;
font-weight: 500;
font-size: 18px;
color: {MEDIUM_GRAY} !important;
border-bottom: 3px solid transparent !important;
margin-bottom: -2px;
}}
.stTabs [aria-selected="true"] {{
background-color: transparent !important;
color: {SNOWFLAKE_BLUE} !important;
border-bottom: 3px solid {SNOWFLAKE_BLUE} !important;
}}
.stTabs [data-baseweb="tab"]:hover {{
color: {SNOWFLAKE_BLUE} !important;
}}
/* Tab indicator overrides */
.stTabs [data-baseweb="tab-highlight"],
div[data-baseweb="tab-highlight"] {{
background-color: {SNOWFLAKE_BLUE} !important;
}}
.stTabs [role="tablist"] > div:last-child {{
background-color: {SNOWFLAKE_BLUE} !important;
}}
/* ===== CHECKBOX STYLING - Clean, no background highlight ===== */
.stCheckbox {{
background: transparent !important;
}}
.stCheckbox label {{
background: transparent !important;
color: white !important;
}}
.stCheckbox label span {{
background: transparent !important;
color: white !important;
}}
/* Remove any highlight/selection background from checkbox labels */
.stCheckbox > label,
.stCheckbox label > span,
.stCheckbox label > div {{
background-color: transparent !important;
background: none !important;
}}
/* The checkbox box itself - unchecked */
.stCheckbox [data-baseweb="checkbox"] > div:first-child {{
border-color: {MEDIUM_GRAY} !important;
background-color: transparent !important;
border-width: 2px !important;
}}
/* Checkbox when checked - fill with blue */
.stCheckbox [data-baseweb="checkbox"][aria-checked="true"] > div:first-child,
[data-testid="stCheckbox"] [aria-checked="true"] > div:first-child {{
background-color: {SNOWFLAKE_BLUE} !important;
border-color: {SNOWFLAKE_BLUE} !important;
}}
/* Alternative selector for checked state */
input[type="checkbox"]:checked + div {{
background-color: {SNOWFLAKE_BLUE} !important;
}}
/* Checkmark icon - make it visible */
.stCheckbox [data-baseweb="checkbox"] svg,
[data-baseweb="checkbox"] svg {{
color: white !important;
stroke: white !important;
fill: white !important;
}}
/* ===== BUTTON STYLING - MID-BLUE primary ===== */
.stButton > button {{
background-color: {MID_BLUE} !important;
color: white !important;
border: none !important;
border-radius: 6px;
font-weight: 500;
padding: 0.5rem 1.5rem;
transition: all 0.2s ease;
}}
.stButton > button:hover {{
background-color: {SNOWFLAKE_BLUE} !important;
}}
.stButton > button:active, .stButton > button:focus {{
background-color: {MID_BLUE} !important;
box-shadow: 0 0 0 2px {SNOWFLAKE_BLUE} !important;
}}
/* Download button */
.stDownloadButton > button {{
background-color: {MID_BLUE} !important;
color: white !important;
border: none !important;
}}
.stDownloadButton > button:hover {{
background-color: {SNOWFLAKE_BLUE} !important;
}}
/* ===== FORM ELEMENTS ===== */
/* Text inputs */
.stTextInput > div > div > input {{
border-color: {MEDIUM_GRAY} !important;
background-color: #1a1a2e !important;
}}
.stTextInput > div > div > input:focus {{
border-color: {SNOWFLAKE_BLUE} !important;
box-shadow: 0 0 0 1px {SNOWFLAKE_BLUE} !important;
}}
/* Select boxes */
.stSelectbox [data-baseweb="select"] > div {{
border-color: {MEDIUM_GRAY} !important;
background-color: #1a1a2e !important;
}}
/* Multiselect chips */
.stMultiSelect [data-baseweb="tag"] {{
background-color: {MID_BLUE} !important;
color: white !important;
}}
/* File uploader */
[data-testid="stFileUploader"] {{
border: 2px dashed {MEDIUM_GRAY} !important;
border-radius: 12px;
padding: 2rem 1.5rem !important;
background-color: transparent !important;
transition: all 0.2s ease;
}}
[data-testid="stFileUploader"]:hover {{
border-color: {SNOWFLAKE_BLUE} !important;
background-color: rgba(17, 86, 127, 0.08) !important;
}}
[data-testid="stFileUploaderDropzone"] {{
background-color: transparent !important;
}}
[data-testid="stFileUploader"] section {{
padding: 0 !important;
}}
[data-testid="stFileUploader"] section > div {{
padding: 0.5rem 0 !important;
}}
/* ===== LINKS - Snowflake Blue for visibility ===== */
/* Exclude link buttons from global link styling */
a:not([data-testid*="LinkButton"]):not([class*="LinkButton"]) {{
color: {SNOWFLAKE_BLUE} !important;
text-decoration: none !important;
}}
a:not([data-testid*="LinkButton"]):not([class*="LinkButton"]):hover {{
color: {STAR_BLUE} !important;
text-decoration: underline !important;
}}
/* HuggingFace login button - style for st.link_button */
[data-testid="stLinkButton"] a,
[data-testid="stLinkButton"] a *,
[data-testid="stLinkButton"] a p,
[data-testid="stLinkButton"] a span {{
background: linear-gradient(135deg, #FF9D00 0%, #FFD21E 100%) !important;
color: #000000 !important;
border: none !important;
font-weight: 700 !important;
text-decoration: none !important;
}}
[data-testid="stLinkButton"] a:hover,
[data-testid="stLinkButton"] a:hover *,
[data-testid="stLinkButton"] a:hover p,
[data-testid="stLinkButton"] a:hover span {{
background: linear-gradient(135deg, #FFD21E 0%, #FF9D00 100%) !important;
color: #000000 !important;
text-decoration: none !important;
}}
/* ===== SECTION HEADERS ===== */
h3 {{
color: white;
}}
/* ===== ALERTS/MESSAGES ===== */
/* Base alert styling */
[data-testid="stAlert"] > div {{
border-radius: 8px !important;
padding: 1rem !important;
}}
/* Info messages - Snowflake Blue */
[data-testid="stAlert"][data-baseweb="notification"] {{
background-color: rgba(41, 181, 232, 0.15) !important;
border-left: 4px solid {SNOWFLAKE_BLUE} !important;
border-radius: 8px !important;
}}
/* Target by icon type for more specific styling */
.stAlert div[role="alert"] {{
background-color: rgba(41, 181, 232, 0.15) !important;
border-left: 4px solid {SNOWFLAKE_BLUE} !important;
border-radius: 8px !important;
padding: 1rem !important;
}}
/* Success - has checkmark icon */
.stSuccess div[role="alert"],
[data-testid="stAlert"]:has([data-testid="stIconSuccess"]) div[role="alert"] {{
background-color: rgba(117, 205, 215, 0.15) !important;
border-left: 4px solid {STAR_BLUE} !important;
}}
/* Warning - has warning icon */
.stWarning div[role="alert"],
[data-testid="stAlert"]:has([data-testid="stIconWarning"]) div[role="alert"] {{
background-color: rgba(255, 159, 54, 0.15) !important;
border-left: 4px solid {VALENCIA_ORANGE} !important;
}}
/* Error - has error icon */
.stError div[role="alert"],
[data-testid="stAlert"]:has([data-testid="stIconError"]) div[role="alert"] {{
background-color: rgba(212, 91, 144, 0.15) !important;
border-left: 4px solid {FIRST_LIGHT} !important;
}}
/* Alert text colors */
[data-testid="stAlert"] p,
.stAlert p {{
color: rgba(255, 255, 255, 0.9) !important;
}}
/* ===== SPINNER ===== */
.stSpinner > div {{
border-top-color: {SNOWFLAKE_BLUE} !important;
}}
/* ===== EXPANDER ===== */
.streamlit-expanderHeader {{
border-left: 3px solid {MID_BLUE};
background-color: rgba(17, 86, 127, 0.1) !important;
}}
/* ===== CODE BLOCKS ===== */
code {{
background-color: rgba(17, 86, 127, 0.2);
padding: 0.2em 0.4em;
border-radius: 3px;
color: {STAR_BLUE};
}}
/* ===== SCROLLBAR ===== */
::-webkit-scrollbar {{
width: 8px;
height: 8px;
}}
::-webkit-scrollbar-track {{
background: #1a1a2e;
}}
::-webkit-scrollbar-thumb {{
background: {MID_BLUE};
border-radius: 4px;
}}
::-webkit-scrollbar-thumb:hover {{
background: {SNOWFLAKE_BLUE};
}}
/* ===== ROOT VARIABLES ===== */
:root {{
--primary-color: {SNOWFLAKE_BLUE} !important;
}}
/* ===== MULTISELECT STYLING ===== */
/* Tag filter multiselect - MID_BLUE (gradient start) */
div[data-testid="stHorizontalBlock"] > div:first-child .stMultiSelect [data-baseweb="tag"] {{
background-color: {MID_BLUE} !important;
color: white !important;
}}
/* Column selector multiselect - SNOWFLAKE_BLUE (gradient end) */
div[data-testid="stHorizontalBlock"] > div:last-child .stMultiSelect [data-baseweb="tag"] {{
background-color: {SNOWFLAKE_BLUE} !important;
color: white !important;
}}
/* Default multiselect styling */
.stMultiSelect [data-baseweb="tag"] {{
border-radius: 12px !important;
padding: 2px 10px !important;
margin: 2px !important;
font-weight: 500 !important;
}}
.stMultiSelect [data-baseweb="tag"] span {{
color: inherit !important;
}}
/* Remove button in tag */
.stMultiSelect [data-baseweb="tag"] svg {{
color: white !important;
opacity: 0.8;
}}
.stMultiSelect [data-baseweb="tag"] svg:hover {{
opacity: 1;
}}
/* Placeholder text */
.stMultiSelect input::placeholder {{
color: {MEDIUM_GRAY} !important;
}}
</style>
""", unsafe_allow_html=True)
# Data paths
EVAL_RESULTS_PATH = Path(CACHE_PATH) / "eval-results"
EVAL_REQUESTS_PATH = Path(CACHE_PATH) / "eval-queue"
@st.cache_data(ttl=300) # Cache for 5 minutes
def download_data():
"""Download data from HuggingFace Hub."""
try:
snapshot_download(
repo_id=QUEUE_REPO,
local_dir=str(EVAL_REQUESTS_PATH),
repo_type="dataset",
tqdm_class=None,
etag_timeout=30,
token=TOKEN,
)
except Exception as e:
st.warning(f"Could not download queue data: {e}")
try:
snapshot_download(
repo_id=RESULTS_REPO,
local_dir=str(EVAL_RESULTS_PATH),
repo_type="dataset",
tqdm_class=None,
etag_timeout=30,
token=TOKEN,
)
except Exception as e:
st.warning(f"Could not download results data: {e}")
class ModelType:
API = "api"
OPEN_WEIGHT = "open-weight"
@staticmethod
def get_color(model_type: str) -> str:
if model_type == ModelType.API:
return VALENCIA_ORANGE
elif model_type == ModelType.OPEN_WEIGHT:
return STAR_BLUE
return MEDIUM_GRAY
# Load SVG icons from local assets folder
ASSETS_PATH = Path(__file__).resolve().parent / "assets"
def load_svg_icon(icon_name: str, fill_color: str = None) -> str:
"""Load SVG icon and return as data URI with optional color replacement.
This matches the Gradio app's load_svg_data_uri function.
"""
svg_file = ASSETS_PATH / f"{icon_name}.svg"
if not svg_file.exists():
return ""
try:
with open(svg_file, "r", encoding="utf-8") as f:
svg_content = f.read()
# Replace black fill with specified color for visibility on dark background
if fill_color:
svg_content = svg_content.replace('fill="black"', f'fill="{fill_color}"')
svg_content = svg_content.replace('stroke="black"', f'stroke="{fill_color}"')
b64 = base64.b64encode(svg_content.encode()).decode()
return f"data:image/svg+xml;base64,{b64}"
except Exception:
return ""
def load_png_icon(icon_name: str) -> str:
"""Load PNG icon and return as data URI."""
png_file = ASSETS_PATH / f"{icon_name}.png"
if not png_file.exists():
return ""
try:
with open(png_file, "rb") as f:
png_bytes = f.read()
b64 = base64.b64encode(png_bytes).decode()
return f"data:image/png;base64,{b64}"
except Exception:
return ""
# Preload icons with Snowflake colors (matching Gradio app)
ICON_CLOUD = load_svg_icon("snow_cloud2", VALENCIA_ORANGE) # Orange cloud for API (same as Gradio)
ICON_CODE = load_svg_icon("snow_code", STAR_BLUE) # Blue code for open-weight (same as Gradio)
ICON_HUMAN = load_png_icon("human_performance")
# Tab header icons - use white to match header text color
HEADER_ICON_COLOR = "#FFFFFF"
ICON_MEDAL = load_svg_icon("snow_medal", HEADER_ICON_COLOR) # Leaderboard header icon
ICON_EYE = load_svg_icon("snow_eye", HEADER_ICON_COLOR) # Analysis header icon
ICON_DOCS = load_svg_icon("snow_docs", HEADER_ICON_COLOR) # About header icon
ICON_WRITE = load_svg_icon("snow_write", HEADER_ICON_COLOR) # Submit header icon
def generate_placeholder_description(model_name: str, tags: list, model_type: str) -> str:
"""Generate a placeholder description based on model metadata."""
parts = []
# Describe model type
if model_type == "api":
parts.append("API-based")
elif model_type == "open-weight":
parts.append("Open-weight")
# Describe approach based on tags
if tags:
if "Agentic" in tags:
parts.append("agentic system")
elif "Conventional RAG" in tags:
parts.append("RAG pipeline")
else:
parts.append("model")
# Add tool/capability info
capabilities = []
if "Sparse Search" in tags:
capabilities.append("sparse search")
if "Semantic Search Tool" in tags:
capabilities.append("semantic search")
if "Vision and Language" in tags:
capabilities.append("vision")
if "Text-only" in tags:
capabilities.append("text-only")
if capabilities:
parts.append(f"with {', '.join(capabilities)}")
else:
parts.append("model")
return " ".join(parts) if parts else ""
def get_model_type_html(model_type: str) -> str:
"""Get HTML for model type with icon and colored text."""
color = ModelType.get_color(model_type)
icon_uri = ICON_CLOUD if model_type == ModelType.API else ICON_CODE
# Fallback emoji if icon doesn't load
fallback_emoji = "☁️" if model_type == ModelType.API else "</>"
if icon_uri:
return f'''<div style="display: inline-flex; align-items: center; white-space: nowrap;">
<img src="{icon_uri}" style="width: 20px; height: 20px; vertical-align: middle;" />
<span style="color: {color}; font-weight: 500; margin-left: 6px;">{model_type}</span>
</div>'''
# Fallback without icon
return f'<span style="color: {color}; font-weight: 500;">{fallback_emoji} {model_type}</span>'
def _extract_timestamp_from_filename(filename: str) -> str:
"""Extract timestamp from filename like 'Model_results_20260109_152104.json'."""
import re
match = re.search(r'_(\d{8}_\d{6})\.json$', filename)
return match.group(1) if match else "00000000_000000"
def _detect_effort_uniform(result_file: Path, data: dict) -> bool:
"""Check if all predictions in the companion JSONL have the same effort value."""
pred_rel = data.get("source_predictions_file")
if pred_rel:
pred_path = Path(EVAL_RESULTS_PATH) / pred_rel
else:
pred_path = Path(str(result_file).replace("_results_", "_predictions_").replace(".json", ".jsonl"))
if not pred_path.exists():
return False
try:
effort_values = set()
with open(pred_path) as f:
for line in f:
line = line.strip()
if not line:
continue
pred = json.loads(line)
search_history = pred.get('search_history', [])
steps = len(search_history) if isinstance(search_history, list) and search_history else 0
if steps == 0:
steps = pred.get('iterations', 0)
try:
steps = float(steps) if steps else 0
except (TypeError, ValueError):
steps = 0
effort_dict = {
'steps': steps,
'llm_calls': pred.get('llm_calls') or (pred.get('trajectory', {}) or {}).get('llm_calls'),
'effort': pred.get('effort') or (pred.get('trajectory', {}) or {}).get('effort'),
}
val = get_effort_value(effort_dict)
if val > 0:
effort_values.add(val)
if len(effort_values) > 1:
return False
return len(effort_values) == 1
except Exception:
return False
@st.cache_data(ttl=300) # Cache for 5 minutes
def load_eval_results() -> pd.DataFrame:
"""Load evaluation results from JSON files, keeping only the most recent per model."""
seen_models = {} # Track: model_name -> (timestamp, result_dict, filepath)
results_path = Path(EVAL_RESULTS_PATH)
if not results_path.exists():
return pd.DataFrame()
for org_dir in results_path.iterdir():
if org_dir.is_dir() and not org_dir.name.startswith('.'):
for result_file in org_dir.glob("*_results_*.json"):
try:
with open(result_file) as f:
data = json.load(f)
# Extract data
model_name = data.get("model_name", "Unknown")
metadata = data.get("metadata", {})
result_scores = data.get("results", {})
# Get tags - default to ["Agentic"] if not specified
tags = data.get("tags", metadata.get("tags", ["Agentic"]))
if isinstance(tags, str):
tags = [tags] # Convert single tag to list
# Get per-domain scores if available
by_domain = result_scores.get("by_domain", {})
# Use semantic accuracy if available, otherwise fall back to ANLS*
overall = result_scores.get("overall", {})
single_ev = result_scores.get("single_evidence", {})
multi_page = result_scores.get("multi_evidence_same_doc", {})
multi_doc = result_scores.get("multi_evidence_multi_doc", {})
# Primary metric: semantic (ANLS* + LLM) if available, otherwise ANLS*
semantic_acc = overall.get("semantic", overall.get("anls", 0.0))
semantic_ci = overall.get("semantic_ci") # 95% CI tuple
semantic_se = None
# Calculate CI/SE on-the-fly using bias correction if not stored
if semantic_acc > 0:
try:
from metrics import confidence_interval, standard_error
n = result_scores.get("single_evidence", {}).get("n", 500)
p = semantic_acc / 100.0 # Convert to proportion
if not semantic_ci:
ci = confidence_interval(p, n) # Uses calibrated q0, q1, m0, m1
semantic_ci = (ci[0] * 100, ci[1] * 100)
if semantic_se is None:
semantic_se = standard_error(p, n) * 100 # SE in percentage points
except Exception:
semantic_ci = semantic_ci if semantic_ci else None
semantic_se = semantic_se if semantic_se is not None else None
anls_acc = overall.get("anls", 0.0)
# Detect effort uniformity for Agentic models with Kuiper
kuiper_val = overall.get("kuiper", 0.0)
is_agentic = "Agentic" in tags if isinstance(tags, list) else False
effort_uniform = False
if is_agentic and kuiper_val and EVAL_AVAILABLE:
effort_uniform = _detect_effort_uniform(result_file, data)
result_dict = {
"Model": model_name,
"Organization": data.get("organization", data.get("submitted_by", org_dir.name)),
"Model Type": metadata.get("model_type", "unknown"),
"Tags": tags, # Store as list
# Primary: Accuracy with LLM judge (ANLS* + LLM with bias correction)
"Accuracy (LLM judge)": semantic_acc,
"_Accuracy_SE": semantic_se, # Hidden: for ±SE display
"_Accuracy_CI": semantic_ci, # Hidden: for tooltip display
"Acc. Single-Hop": single_ev.get("semantic", single_ev.get("anls", 0.0)),
"Acc. Cross-Page": multi_page.get("semantic", multi_page.get("anls", 0.0)),
"Acc. Cross-Doc": multi_doc.get("semantic", multi_doc.get("anls", 0.0)),
# Secondary: Pure string-based ANLS* (hidden by default)
"ANLS* (string)": anls_acc,
# Attribution metrics
"Attribution (Page F1)": overall.get("page_f1", 0.0),
"Attribution (Doc F1)": overall.get("doc_f1", 0.0),
# Calibration metric
"Effort (Kuiper)": kuiper_val,
"_effort_uniform": effort_uniform,
"Submission Date": data.get("submission_date", ""),
"Link": data.get("link", ""),
"Description": data.get("description", metadata.get("description", "")) or
generate_placeholder_description(model_name, tags, metadata.get("model_type", "")),
# Per-domain scores (stored as JSON string for DataFrame compatibility)
"_by_domain": json.dumps(by_domain) if by_domain else "{}",
}
# Extract timestamp from filename
file_timestamp = _extract_timestamp_from_filename(result_file.name)
# Keep only the most recent result per model
if model_name not in seen_models or file_timestamp > seen_models[model_name][0]:
seen_models[model_name] = (file_timestamp, result_dict)
except Exception as e:
st.warning(f"Error loading {result_file}: {e}")
if not seen_models:
return pd.DataFrame()
# Build results list from deduplicated models
results = [result_dict for _, result_dict in seen_models.values()]
df = pd.DataFrame(results)
df = df.sort_values("Accuracy (LLM judge)", ascending=False).reset_index(drop=True)
return df
def get_all_tags_from_df(df: pd.DataFrame) -> list:
"""Extract all unique tags from the DataFrame."""
all_tags = set()
if "Tags" in df.columns:
for tags in df["Tags"]:
if isinstance(tags, list):
all_tags.update(tags)
return sorted(list(all_tags))
def filter_df_by_tags(df: pd.DataFrame, selected_tags: list) -> pd.DataFrame:
"""Filter DataFrame to show only rows that have at least one of the selected tags."""
if not selected_tags:
return df
def has_any_tag(row_tags):
if not isinstance(row_tags, list):
return False
return any(tag in row_tags for tag in selected_tags)
return df[df["Tags"].apply(has_any_tag)]
def render_tags_html(tags: list) -> str:
"""Render tags as styled badges."""
if not tags or not isinstance(tags, list):
return ""
badges = []
for tag in tags:
color = TAG_COLORS.get(tag, MID_BLUE)
# Use lighter background with colored border for better readability
badge = f'''<span style="
display: inline-block;
padding: 2px 8px;
margin: 2px 3px;
border-radius: 12px;
font-size: 11px;
font-weight: 500;
background-color: {color}20;
color: {color};
border: 1px solid {color};
white-space: nowrap;
">{tag}</span>'''
badges.append(badge)
return "".join(badges)
def format_model_name(row) -> str:
"""Format model name with optional link."""
model_name = row["Model"]
link = row.get("Link", "")
if link and link.strip():
return f'<a href="{link}" target="_blank">{model_name}</a>'
return model_name
def format_model_type(model_type: str) -> str:
"""Format model type with icon and color."""
icon = ModelType.get_icon(model_type)
color = ModelType.get_color(model_type)
return f'<span style="color: {color};">{icon} {model_type}</span>'
# Metric tooltips for table headers
METRIC_TOOLTIPS = {
"Accuracy (LLM judge)": "Answer accuracy using ANLS* + LLM judge with bias correction. Captures semantic correctness beyond string matching. Higher is better.",
"ANLS* (string)": "String-based accuracy using ANLS* (Average Normalized Levenshtein Similarity). Stricter than semantic. Higher is better.",
"Acc. Single-Hop": "Accuracy on questions requiring evidence from a single page.",
"Acc. Cross-Page": "Accuracy on multi-hop questions requiring evidence from multiple pages within the same document.",
"Acc. Cross-Doc": "Accuracy on multi-hop questions requiring evidence from multiple documents.",
"Attribution (Page F1)": "F1 score for page-level attribution. Measures overlap between cited pages and gold evidence. Higher is better.",
"Attribution (Doc F1)": "F1 score for document-level attribution. Measures whether the correct documents were identified. Higher is better.",
"Effort (Kuiper)": "Effort calibration metric (Kuiper statistic). Measures if effort correlates with problem difficulty. Lower is better.",
"Model Type": "API = cloud-based model, open-weight = downloadable weights",
"Tags": "Approach characteristics: Agentic, RAG, search tools, vision capabilities, etc.",
}
def render_leaderboard_table(df: pd.DataFrame, columns: list, show_analyze_column: bool = True, uncertainty_mode: str = "± SE"):
"""Render an HTML table matching the Gradio leaderboard style.
Args:
uncertainty_mode: One of "± SE", "90% CI", "95% CI", or "None"
"""
if df.empty:
st.warning("No data available")
return
# Build table HTML with tooltips
header_cells = []
for col in columns:
# Add line break before brackets for cleaner display
display_col = col.replace(" (", "<br>(") if " (" in col else col
tooltip = METRIC_TOOLTIPS.get(col, "")
if tooltip:
header_cells.append(f'<th title="{tooltip}" style="cursor: help;">{display_col}</th>')
else:
header_cells.append(f'<th>{display_col}</th>')
# Add "Analyze" column header
if show_analyze_column:
header_cells.append('<th style="width: 70px;">Analyze</th>')
header_cells = "".join(header_cells)
# Columns that should be merged for human performance rows
HUMAN_MERGE_COLS = ["Model", "Organization", "Model Type"]
rows_html = ""
for _, row in df.iterrows():
cells = []
model_name = row.get("Model", "")
organization = row.get("Organization", "")
hide_attrib_kuiper = model_name == "Human with Oracle Retriever"
# Check if this is a human performance row (should merge Model, Organization, Model Type)
is_human_row = organization == "Humanity"
# Calculate colspan for human rows (count how many merge columns are in selected columns)
human_colspan = sum(1 for col in HUMAN_MERGE_COLS if col in columns) if is_human_row else 1
for col in columns:
value = row.get(col, "")
# Skip Organization and Model Type for human rows (they're merged into Model)
if is_human_row and col in ["Organization", "Model Type"]:
continue
if col == "Model":
# Model name with optional link and description
link = row.get("Link", "")
description = row.get("Description", "")
human_icon_html = ""
if is_human_row and ICON_HUMAN:
human_icon_html = (
f'<img src="{ICON_HUMAN}" alt="Human baseline" '
'style="width: 20px; height: 20px; vertical-align: text-bottom; margin-right: 6px;" />'
)
if link and str(link).strip():
name_html = f'{human_icon_html}<a href="{link}" target="_blank" style="color: #29B5E8; font-weight: 500;">{value}</a>'
else:
name_html = f'{human_icon_html}<span style="font-weight: 500;">{value}</span>'
if description and str(description).strip():
cell_html = f'{name_html}<br><span style="font-size: 12px; color: {MEDIUM_GRAY}; font-weight: normal;">{description}</span>'
else:
cell_html = name_html
# For human rows, use colspan to span Model, Organization, and Model Type columns
if is_human_row and human_colspan > 1:
cells.append(f'<td colspan="{human_colspan}">{cell_html}</td>')
else:
cells.append(f'<td>{cell_html}</td>')
elif col == "Model Type":
# Model type with icon
cell_html = get_model_type_html(str(value))
cells.append(f'<td style="text-align: center;">{cell_html}</td>')
elif col == "Tags":
# Render tags as badges
cell_html = render_tags_html(value)
cells.append(f'<td>{cell_html}</td>')
elif col == "Accuracy (LLM judge)" or col == "ANLS* (string)" or col.startswith("Acc."):
# Format accuracy scores (scale 0-100)
try:
acc_val = f"{float(value):.1f}" if value else "0"
acc_float = float(value) if value else 0
except (ValueError, TypeError):
acc_val = str(value)
acc_float = 0
# Add uncertainty based on mode
cell_html = acc_val
if uncertainty_mode != "None" and col == "Accuracy (LLM judge)":
se = row.get("_Accuracy_SE")
ci = row.get("_Accuracy_CI")
if uncertainty_mode == "± SE" and se is not None and se > 0:
ci_tooltip = f"95% CI: [{ci[0]:.1f}, {ci[1]:.1f}]" if ci else ""
uncertainty_text = f'<span style="font-size: 0.85em; color: #888;" title="{ci_tooltip}"> ± {se:.1f}</span>'
cell_html = f'{acc_val}{uncertainty_text}'
elif uncertainty_mode == "95% CI" and ci:
uncertainty_text = f'<span style="font-size: 0.8em; color: #888;"> [{ci[0]:.1f}-{ci[1]:.1f}]</span>'
cell_html = f'{acc_val}{uncertainty_text}'
elif uncertainty_mode == "90% CI" and se is not None and se > 0:
# 90% CI: z=1.645 instead of 1.96, so CI is ~84% of 95% CI width
z_90 = 1.645
half_width = se * z_90
ci_90_low = max(0, acc_float - half_width)
ci_90_high = min(100, acc_float + half_width)
uncertainty_text = f'<span style="font-size: 0.8em; color: #888;"> [{ci_90_low:.1f}-{ci_90_high:.1f}]</span>'
cell_html = f'{acc_val}{uncertainty_text}'
elif uncertainty_mode != "None" and col.startswith("Acc.") and acc_float > 0:
# Compute uncertainty for breakdown accuracy columns
n_approx = 150 # Rough estimate for breakdown categories
p = acc_float / 100.0
if 0 < p < 1:
from math import sqrt
se_raw = sqrt(p * (1 - p) / n_approx)
se_adj = se_raw / (LLM_JUDGE_SPECIFICITY + LLM_JUDGE_SENSITIVITY - 1) * 100
if uncertainty_mode == "± SE":
uncertainty_text = f'<span style="font-size: 0.85em; color: #888;"> ± {se_adj:.1f}</span>'
cell_html = f'{acc_val}{uncertainty_text}'
elif uncertainty_mode == "95% CI":
half_width = se_adj * 1.96
ci_low = max(0, acc_float - half_width)
ci_high = min(100, acc_float + half_width)
uncertainty_text = f'<span style="font-size: 0.8em; color: #888;"> [{ci_low:.1f}-{ci_high:.1f}]</span>'
cell_html = f'{acc_val}{uncertainty_text}'
elif uncertainty_mode == "90% CI":
half_width = se_adj * 1.645
ci_low = max(0, acc_float - half_width)
ci_high = min(100, acc_float + half_width)
uncertainty_text = f'<span style="font-size: 0.8em; color: #888;"> [{ci_low:.1f}-{ci_high:.1f}]</span>'
cell_html = f'{acc_val}{uncertainty_text}'
cells.append(f'<td style="text-align: center;">{cell_html}</td>')
elif col.startswith("Attribution"):
# Format F1 scores (scale 0-100) - NOT bias-adjusted
if hide_attrib_kuiper:
cells.append('<td style="text-align: center;">—</td>')
continue
try:
attr_val = f"{float(value):.1f}" if value else "0"
attr_float = float(value) if value else 0
except (ValueError, TypeError):
attr_val = str(value)
attr_float = 0
cell_html = attr_val
# Add uncertainty for attribution metrics (simple binomial, no bias adjustment)
if uncertainty_mode != "None" and attr_float > 0:
n_approx = 500 # Test set size
p = attr_float / 100.0
if 0 < p < 1:
from math import sqrt
se = sqrt(p * (1 - p) / n_approx) * 100 # No bias adjustment
if uncertainty_mode == "± SE":
uncertainty_text = f'<span style="font-size: 0.85em; color: #888;"> ± {se:.1f}</span>'
cell_html = f'{attr_val}{uncertainty_text}'
elif uncertainty_mode == "95% CI":
half_width = se * 1.96
ci_low = max(0, attr_float - half_width)
ci_high = min(100, attr_float + half_width)
uncertainty_text = f'<span style="font-size: 0.8em; color: #888;"> [{ci_low:.1f}-{ci_high:.1f}]</span>'
cell_html = f'{attr_val}{uncertainty_text}'
elif uncertainty_mode == "90% CI":
half_width = se * 1.645
ci_low = max(0, attr_float - half_width)
ci_high = min(100, attr_float + half_width)
uncertainty_text = f'<span style="font-size: 0.8em; color: #888;"> [{ci_low:.1f}-{ci_high:.1f}]</span>'
cell_html = f'{attr_val}{uncertainty_text}'
cells.append(f'<td style="text-align: center;">{cell_html}</td>')
elif col == "Effort (Kuiper)":
# Format Kuiper statistic (lower is better for calibration)
# Hide for Conventional RAG models (not meaningful)
if hide_attrib_kuiper:
cells.append('<td style="text-align: center;">—</td>')
continue
tags = row.get("Tags", [])
is_conventional_rag = "Conventional RAG" in tags if isinstance(tags, list) else False
if is_conventional_rag:
cell_html = "—"
else:
try:
cell_html = f"{float(value):.1f}" if value else "0"
except (ValueError, TypeError):
cell_html = str(value)
if row.get("_effort_uniform", False) and cell_html != "0":
tooltip = "This agent uses the same effort for all samples, so effort-invariance metric is not meaningful."
cell_html = f'<span style="color: #888; cursor: help;" title="{tooltip}">({cell_html})</span>'
cells.append(f'<td style="text-align: center;">{cell_html}</td>')
elif col == "Organization":
cell_html = str(value) if value else ""
cells.append(f'<td style="text-align: center;">{cell_html}</td>')
else:
cell_html = str(value) if value else ""
cells.append(f'<td>{cell_html}</td>')
# Add "Analyze" link cell
if show_analyze_column:
# URL-encode the model name for query param
encoded_name = quote(str(model_name))
analyze_link = f'<a href="?analyze={encoded_name}" target="_self" title="View detailed analysis">View</a>'
cells.append(f'<td style="text-align: center;">{analyze_link}</td>')
rows_html += f'<tr>{"".join(cells)}</tr>'
table_html = f'''
<style>
.leaderboard-wrapper {{
border: 2px solid {MID_BLUE};
border-radius: 8px;
overflow: hidden;
font-size: 0;
}}
.leaderboard-table {{
width: 100%;
border-collapse: collapse;
border-spacing: 0;
font-size: 14px;
background-color: #0e1117;
margin: 0;
padding: 0;
border: none;
}}
.leaderboard-table thead tr {{
background: linear-gradient(135deg, {MID_BLUE} 0%, {SNOWFLAKE_BLUE} 100%);
}}
.leaderboard-table thead th {{
background: transparent;
color: white;
text-align: center;
padding: 1.2em 0.75em;
font-weight: 500;
border: none;
text-transform: none;
}}
.leaderboard-table thead th:not(:last-child) {{
border-right: 1px solid rgba(255,255,255,0.15);
}}
.leaderboard-table tbody td {{
padding: 0.75em;
border-bottom: 1px solid {MEDIUM_GRAY}40;
vertical-align: middle;
color: white;
}}
.leaderboard-table tbody tr:last-child td {{
border-bottom: none;
}}
.leaderboard-table tbody tr:nth-child(even) {{
background-color: rgba(17, 86, 127, 0.12);
}}
.leaderboard-table tbody tr:hover {{
background-color: rgba(17, 86, 127, 0.25);
}}
.leaderboard-table td:first-child {{
min-width: 280px;
max-width: 350px;
word-wrap: break-word;
}}
/* Links in table use Snowflake Blue */
.leaderboard-table a {{
color: {SNOWFLAKE_BLUE};
text-decoration: none;
}}
.leaderboard-table a:hover {{
color: {STAR_BLUE};
text-decoration: underline;
}}
</style>
<div class="leaderboard-wrapper">
<table class="leaderboard-table">
<thead>
<tr>{header_cells}</tr>
</thead>
<tbody>
{rows_html}
</tbody>
</table>
</div>
'''
st.markdown(table_html, unsafe_allow_html=True)
def build_csv_download_df(df: pd.DataFrame, columns: list, uncertainty_mode: str) -> pd.DataFrame:
"""Build a CSV-friendly DataFrame with uncertainty text included."""
if df.empty or not columns:
return pd.DataFrame()
export_df = df[columns].copy()
for idx in export_df.index:
row = df.loc[idx]
for col in columns:
value = row.get(col, "")
if col == "Accuracy (LLM judge)" or col == "ANLS* (string)" or col.startswith("Acc."):
try:
acc_float = float(value) if value else 0.0
acc_val = f"{acc_float:.1f}"
except (ValueError, TypeError):
export_df.at[idx, col] = value
continue
text = acc_val
if uncertainty_mode != "None":
if col == "Accuracy (LLM judge)":
se = row.get("_Accuracy_SE")
ci = row.get("_Accuracy_CI")
if uncertainty_mode == "± SE" and se is not None and se > 0:
text = f"{acc_val} ± {se:.1f}"
elif uncertainty_mode == "95% CI":
if ci:
text = f"{acc_val} [{ci[0]:.1f}-{ci[1]:.1f}]"
elif se is not None and se > 0:
half_width = se * 1.96
text = f"{acc_val} [{max(0, acc_float - half_width):.1f}-{min(100, acc_float + half_width):.1f}]"
elif uncertainty_mode == "90% CI" and se is not None and se > 0:
half_width = se * 1.645
text = f"{acc_val} [{max(0, acc_float - half_width):.1f}-{min(100, acc_float + half_width):.1f}]"
elif col.startswith("Acc.") and acc_float > 0:
n_approx = 150
p = acc_float / 100.0
if 0 < p < 1:
from math import sqrt
se_raw = sqrt(p * (1 - p) / n_approx)
se_adj = se_raw / (LLM_JUDGE_SPECIFICITY + LLM_JUDGE_SENSITIVITY - 1) * 100
if uncertainty_mode == "± SE":
text = f"{acc_val} ± {se_adj:.1f}"
elif uncertainty_mode == "95% CI":
half_width = se_adj * 1.96
text = f"{acc_val} [{max(0, acc_float - half_width):.1f}-{min(100, acc_float + half_width):.1f}]"
elif uncertainty_mode == "90% CI":
half_width = se_adj * 1.645
text = f"{acc_val} [{max(0, acc_float - half_width):.1f}-{min(100, acc_float + half_width):.1f}]"
export_df.at[idx, col] = text
elif col.startswith("Attribution"):
try:
attr_float = float(value) if value else 0.0
attr_val = f"{attr_float:.1f}"
except (ValueError, TypeError):
export_df.at[idx, col] = value
continue
text = attr_val
if uncertainty_mode != "None" and attr_float > 0:
n_approx = 500
p = attr_float / 100.0
if 0 < p < 1:
from math import sqrt
se = sqrt(p * (1 - p) / n_approx) * 100
if uncertainty_mode == "± SE":
text = f"{attr_val} ± {se:.1f}"
elif uncertainty_mode == "95% CI":
half_width = se * 1.96
text = f"{attr_val} [{max(0, attr_float - half_width):.1f}-{min(100, attr_float + half_width):.1f}]"
elif uncertainty_mode == "90% CI":
half_width = se * 1.645
text = f"{attr_val} [{max(0, attr_float - half_width):.1f}-{min(100, attr_float + half_width):.1f}]"
export_df.at[idx, col] = text
return export_df
def create_accuracy_vs_attribution_plot(df: pd.DataFrame) -> go.Figure:
"""Create scatter plot of Accuracy vs Attribution."""
if df.empty:
fig = go.Figure()
fig.add_annotation(
text="No data available",
xref="paper", yref="paper",
x=0.5, y=0.5, showarrow=False,
font=dict(size=20, color="white")
)
return fig
color_map = {
"api": VALENCIA_ORANGE, # Orange for API
"open-weight": STAR_BLUE, # Star Blue for open-weight
}
fig = go.Figure()
for model_type in df["Model Type"].unique():
df_type = df[df["Model Type"] == model_type]
fig.add_trace(go.Scatter(
x=df_type["Attribution (Page F1)"],
y=df_type["Accuracy (LLM judge)"],
mode="markers",
name=model_type,
text=df_type["Model"],
marker=dict(
size=12,
color=color_map.get(model_type, MEDIUM_GRAY),
line=dict(width=1.5, color="white")
),
hovertemplate="<b>%{text}</b><br>Attribution: %{x:.1f}<br>Accuracy: %{y:.1f}<extra></extra>",
))
fig.update_layout(
title=dict(text="Accuracy vs Attribution", font=dict(color="white")),
xaxis_title="Attribution (Page F1)",
yaxis_title="Accuracy (LLM judge)",
hovermode="closest",
template="plotly_dark",
height=650,
showlegend=True,
legend=dict(title="Model Type", yanchor="top", y=0.99, xanchor="left", x=0.01, font=dict(color="#ccc")),
paper_bgcolor="rgba(0,0,0,0)",
plot_bgcolor="rgba(14,17,23,0.8)",
xaxis=dict(gridcolor=MID_BLUE, zerolinecolor=MID_BLUE),
yaxis=dict(gridcolor=MID_BLUE, zerolinecolor=MID_BLUE),
)
return fig
def create_accuracy_vs_effort_plot(df: pd.DataFrame) -> go.Figure:
"""Create scatter plot of Accuracy vs Effort (Kuiper)."""
# Filter out Conventional RAG models (Kuiper not meaningful for them)
def is_not_conventional_rag(tags):
if isinstance(tags, list):
return "Conventional RAG" not in tags
return True
df_filtered = df[df["Tags"].apply(is_not_conventional_rag)]
if df_filtered.empty:
fig = go.Figure()
fig.add_annotation(
text="No data available",
xref="paper", yref="paper",
x=0.5, y=0.5, showarrow=False,
font=dict(size=20, color="white")
)
return fig
color_map = {
"api": VALENCIA_ORANGE, # Orange for API
"open-weight": STAR_BLUE, # Star Blue for open-weight
}
fig = go.Figure()
for model_type in df_filtered["Model Type"].unique():
df_type = df_filtered[df_filtered["Model Type"] == model_type]
fig.add_trace(go.Scatter(
x=df_type["Effort (Kuiper)"],
y=df_type["Accuracy (LLM judge)"],
mode="markers",
name=model_type,
text=df_type["Model"],
marker=dict(
size=12,
color=color_map.get(model_type, MEDIUM_GRAY),
line=dict(width=1.5, color="white")
),
hovertemplate="<b>%{text}</b><br>Effort: %{x:.1f}<br>Accuracy: %{y:.1f}<extra></extra>",
))
fig.update_layout(
title=dict(text="Accuracy vs Effort", font=dict(color="white")),
xaxis_title="Effort (Kuiper) — lower is better",
yaxis_title="Accuracy (LLM judge)",
hovermode="closest",
template="plotly_dark",
height=650,
showlegend=True,
legend=dict(title="Model Type", yanchor="top", y=0.99, xanchor="left", x=0.01, font=dict(color="#ccc")),
paper_bgcolor="rgba(0,0,0,0)",
plot_bgcolor="rgba(14,17,23,0.8)",
xaxis=dict(gridcolor=MID_BLUE, zerolinecolor=MID_BLUE),
yaxis=dict(gridcolor=MID_BLUE, zerolinecolor=MID_BLUE),
)
return fig
def create_domain_accuracy_chart(by_domain: dict, model_name: str, overall_accuracy: float = 0) -> go.Figure:
"""Create a horizontal bar chart showing accuracy by domain."""
# Filter out "Other" category
filtered_domain = {k: v for k, v in by_domain.items() if k.lower() != 'other'}
if not filtered_domain:
fig = go.Figure()
fig.add_annotation(
text="No per-domain data available",
xref="paper", yref="paper",
x=0.5, y=0.5, showarrow=False,
font=dict(size=16, color="white")
)
fig.update_layout(
template="plotly_dark",
paper_bgcolor="rgba(0,0,0,0)",
plot_bgcolor="rgba(14,17,23,0.8)",
)
return fig
# Sort domains by accuracy (descending)
sorted_domains = sorted(filtered_domain.items(), key=lambda x: x[1].get('anls', 0), reverse=True)
domains = [d[0] for d in sorted_domains]
accuracies = [d[1].get('anls', 0) for d in sorted_domains]
counts = [d[1].get('n', 0) for d in sorted_domains]
# Color based on above/below overall accuracy
colors = [SNOWFLAKE_BLUE if acc >= overall_accuracy else VALENCIA_ORANGE for acc in accuracies]
fig = go.Figure()
fig.add_trace(go.Bar(
y=domains,
x=accuracies,
orientation='h',
marker=dict(
color=colors,
line=dict(width=1, color='white')
),
text=[f"{acc:.1f}% (n={n})" for acc, n in zip(accuracies, counts)],
textposition='auto',
textfont=dict(color='white', size=11),
hovertemplate="<b>%{y}</b><br>Accuracy: %{x:.1f}%<extra></extra>",
))
fig.update_layout(
title=dict(
text=f"Accuracy by Domain: {model_name}",
font=dict(color="white", size=16)
),
xaxis_title="Accuracy (ANLS* %)",
yaxis_title="",
template="plotly_dark",
height=max(400, len(domains) * 35), # Dynamic height based on number of domains
paper_bgcolor="rgba(0,0,0,0)",
plot_bgcolor="rgba(14,17,23,0.8)",
xaxis=dict(
gridcolor=MID_BLUE,
zerolinecolor=MID_BLUE,
range=[0, 100]
),
yaxis=dict(
gridcolor=MID_BLUE,
autorange="reversed" # Keep highest at top
),
margin=dict(l=150, r=50, t=60, b=50),
)
return fig
def show_model_details(model_name: str):
"""Show detailed per-domain breakdown for a model."""
# Load model data from cached DataFrame
df = load_eval_results()
if df.empty:
st.warning("No model data available")
return
model_row = df[df["Model"] == model_name]
if model_row.empty:
st.warning(f"Model '{model_name}' not found")
return
model_data = model_row.iloc[0]
# Check if this is a Conventional RAG model
tags = model_data.get('Tags', [])
is_conventional_rag = "Conventional RAG" in tags if isinstance(tags, list) else False
# Display main metrics
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Accuracy (LLM judge)", f"{model_data['Accuracy (LLM judge)']:.1f}%")
with col2:
st.metric("Attribution (Page F1)", f"{model_data['Attribution (Page F1)']:.1f}%")
with col3:
if is_conventional_rag:
st.metric("Effort (Kuiper)", "—")
elif model_data.get('_effort_uniform', False):
kuiper = model_data.get('Effort (Kuiper)', 0)
st.metric("Effort (Kuiper)", f"({kuiper:.2f})" if kuiper else "N/A", help="This agent uses the same effort for all samples, so effort-invariance metric is not meaningful.")
else:
kuiper = model_data.get('Effort (Kuiper)', 0)
st.metric("Effort (Kuiper)", f"{kuiper:.2f}" if kuiper else "N/A")
# Show note for Conventional RAG models
if is_conventional_rag:
st.caption("*Effort (Kuiper) is only meaningful for Agentic systems with iterative search behavior.*")
# Display breakdown by hop type
col1, col2, col3 = st.columns(3)
with col1:
single_hop = model_data.get('Acc. Single-Hop', 0)
st.metric("Acc. Single-Hop", f"{single_hop:.1f}%" if single_hop else "N/A")
with col2:
cross_page = model_data.get('Acc. Cross-Page', 0)
st.metric("Acc. Cross-Page", f"{cross_page:.1f}%" if cross_page else "N/A")
with col3:
cross_doc = model_data.get('Acc. Cross-Doc', 0)
st.metric("Acc. Cross-Doc", f"{cross_doc:.1f}%" if cross_doc else "N/A")
# Get per-domain data
by_domain_str = model_data.get('_by_domain', '{}')
try:
by_domain = json.loads(by_domain_str) if isinstance(by_domain_str, str) else by_domain_str
except (json.JSONDecodeError, TypeError):
by_domain = {}
if by_domain:
# Show per-domain chart (use overall accuracy as threshold for coloring)
overall_accuracy = model_data.get('Accuracy (LLM judge)', 0)
fig = create_domain_accuracy_chart(by_domain, model_name, overall_accuracy)
st.plotly_chart(fig, width="stretch")
else:
st.info("Per-domain breakdown not available for this submission. Newer submissions will include this data.")
def _prediction_has_effort(pred: dict) -> bool:
"""Check if a prediction contains at least one valid effort measure."""
search_history = pred.get('search_history', [])
if isinstance(search_history, list) and len(search_history) > 0:
return True
for key in ('iterations', 'steps', 'llm_calls', 'effort'):
val = pred.get(key)
if val is not None:
try:
if float(val) > 0:
return True
except (TypeError, ValueError):
pass
trajectory = pred.get('trajectory', {})
if isinstance(trajectory, dict):
for key in ('llm_calls', 'effort'):
val = trajectory.get(key)
if val is not None:
try:
if float(val) > 0:
return True
except (TypeError, ValueError):
pass
return False
def validate_jsonl_submission(file_content: str) -> tuple[bool, str, list]:
"""Validate JSONL submission format and return parsed predictions."""
try:
lines = file_content.strip().split("\n")
if not lines or (len(lines) == 1 and not lines[0].strip()):
return False, "File is empty", []
predictions = []
for line_num, line in enumerate(lines, 1):
line = line.strip()
if not line:
continue
try:
pred = json.loads(line)
except json.JSONDecodeError as e:
return False, f"Line {line_num}: Invalid JSON - {str(e)}", []
# Required: question and answer
if "question" not in pred:
return False, f"Line {line_num}: Missing required field 'question'", []
if "answer" not in pred:
return False, f"Line {line_num}: Missing required field 'answer'", []
predictions.append(pred)
return True, "", predictions
except Exception as e:
return False, f"Error reading file: {str(e)}", []
@st.cache_data(ttl=3600) # Cache for 1 hour
def derive_hop_type(evidence: list) -> str:
"""Derive hop type from evidence list.
- single: Single page from a single document
- cross_page: Multiple pages from the same document
- cross_doc: Pages from different documents
Args:
evidence: List of dicts with 'document' and 'page' keys
Returns:
'single', 'cross_page', or 'cross_doc'
"""
if not evidence:
return 'single'
# Get unique documents and pages
documents = set()
pages = set()
for ev in evidence:
doc = ev.get('document')
page = ev.get('page')
if doc is not None:
documents.add(doc)
if doc is not None and page is not None:
pages.add((doc, page))
# Determine hop type based on evidence structure
if len(documents) > 1:
return 'cross_doc' # Multiple documents
elif len(pages) > 1:
return 'cross_page' # Multiple pages from same document
else:
return 'single' # Single page
def load_gold_standard(dataset_name: str = "agentic-document-ai/dataset-PRIVATE", split: str = "test"):
"""Load gold standard from HuggingFace dataset.
Note: Uses dataset-PRIVATE for test split (contains gold answers).
"""
if not EVAL_AVAILABLE:
return {}, {}
try:
dataset = load_dataset(dataset_name, split=split)
by_text = {}
by_id = {}
for ex in dataset:
question = ex['question'].strip()
qid = ex.get('id', '')
# Try multiple field names for answers (different splits may use different names)
answers = ex.get('answer_variants') or ex.get('answers') or []
# If answers is a string, wrap it in a list
if isinstance(answers, str):
answers = [[answers]]
# If answers is a flat list of strings, wrap each in a list
elif answers and isinstance(answers[0], str):
answers = [answers]
evidence = ex.get('evidence', [])
gold_data = {
'answers': answers,
'evidence': evidence,
'category': ex.get('document_category', ''),
'domain': ex.get('domain', ''),
# Derive hop type from evidence structure
'hop_type': derive_hop_type(evidence)
}
by_text[question] = gold_data
if qid:
by_id[qid] = gold_data
return by_text, by_id
except Exception as e:
st.error(f"Error loading dataset: {e}")
return {}, {}
def _evaluate_single_item(args, max_retries=3):
"""Evaluate a single prediction item (for parallel processing)."""
import time as _time
idx, pred, gold_data, use_llm_judge = args
question = pred.get('question', '').strip()
answer = pred.get('answer', '')
citations = pred.get('citations', [])
search_history = pred.get('search_history', [])
steps = len(search_history) if search_history else pred.get('iterations', 0)
# Look for effort metrics at top level or nested in 'trajectory'
trajectory = pred.get('trajectory', {})
# Ensure trajectory is a dict before calling .get() on it
if not isinstance(trajectory, dict):
trajectory = {}
llm_calls = pred.get('llm_calls') or trajectory.get('llm_calls')
effort = pred.get('effort') or trajectory.get('effort')
# Calculate non-LLM metrics first
anls = anls_star(answer, gold_data['answers'])
doc_f1 = citation_f1(citations, gold_data['evidence'], level='document')
page_f1 = citation_f1(citations, gold_data['evidence'], level='page')
# Semantic accuracy with LLM judge (or just ANLS* if disabled)
if use_llm_judge:
for attempt in range(max_retries):
try:
llm_result = anls_star_llm(answer, gold_data['answers'], question)
semantic_score = llm_result['score']
break
except Exception:
if attempt < max_retries - 1:
_time.sleep(2 ** attempt) # Exponential backoff
else:
raise
else:
semantic_score = anls
return {
'idx': idx,
'question': question,
'anls': anls,
'semantic_score': semantic_score,
'correct': semantic_score >= 0.5,
'doc_f1': doc_f1['f1'],
'page_f1': page_f1['f1'],
'steps': steps,
'llm_calls': llm_calls,
'effort': effort,
'hop_type': gold_data.get('hop_type', 'single'),
'category': gold_data['category'],
'domain': gold_data['domain']
}
def evaluate_predictions(
predictions: list,
gold_by_text: dict,
gold_by_id: dict,
use_llm_judge: bool = True,
progress_callback=None
) -> dict:
"""Evaluate predictions against gold standard (parallelized when using LLM judge).
Args:
predictions: List of prediction dicts
gold_by_text: Gold data indexed by question text
gold_by_id: Gold data indexed by question ID
use_llm_judge: If True, use ANLS*+LLM for semantic accuracy (default)
progress_callback: Optional callback(current, total) for progress updates
"""
if not EVAL_AVAILABLE:
return {"error": "Evaluation module not available"}
# First pass: match predictions to gold standard
matched_items = []
unmatched = []
for pred in predictions:
question = pred.get('question', '').strip()
qid = pred.get('id', '')
# Match to gold
gold_data = None
if question in gold_by_text:
gold_data = gold_by_text[question]
elif qid and qid in gold_by_id:
gold_data = gold_by_id[qid]
if gold_data:
matched_items.append((pred, gold_data, use_llm_judge))
else:
unmatched.append(question[:50] + "..." if len(question) > 50 else question)
if not matched_items:
return {"error": "No predictions matched the gold standard"}
# Prepare items with index
items_with_idx = [(i, pred, gold, llm) for i, (pred, gold, llm) in enumerate(matched_items)]
total = len(items_with_idx)
evals = []
completed = 0
# Parallel evaluation with ThreadPoolExecutor (much faster for LLM calls)
with ThreadPoolExecutor(max_workers=MAX_EVAL_WORKERS) as executor:
futures = {executor.submit(_evaluate_single_item, item): item[0]
for item in items_with_idx}
for future in as_completed(futures):
result = future.result() # Will raise if failed after retries
evals.append(result)
completed += 1
if progress_callback:
progress_callback(completed, total)
# Aggregate overall metrics
n = len(evals)
semantic_scores = [e['semantic_score'] for e in evals]
# Apply bias correction for semantic accuracy
if use_llm_judge:
agg = aggregate_anls_star_llm(semantic_scores, apply_bias_correction=True)
mean_semantic = agg['adjusted_score'] * 100
semantic_ci = (agg['ci_lower'] * 100, agg['ci_upper'] * 100)
else:
mean_semantic = sum(semantic_scores) / n * 100
semantic_ci = None
mean_anls = sum(e['anls'] for e in evals) / n * 100
accuracy = sum(e['correct'] for e in evals) / n * 100
mean_doc_f1 = sum(e['doc_f1'] for e in evals) / n * 100
mean_page_f1 = sum(e['page_f1'] for e in evals) / n * 100
# Kuiper statistic
kuiper = kuiper_statistic(evals)
# By hop type
single_hop = [e for e in evals if e['hop_type'] == 'single']
cross_page = [e for e in evals if e['hop_type'] == 'cross_page']
cross_doc = [e for e in evals if e['hop_type'] == 'cross_doc']
# By domain
by_domain = defaultdict(list)
for e in evals:
domain = e['domain'] or 'Other'
by_domain[domain].append(e)
domain_scores = {}
for domain, domain_evals in sorted(by_domain.items()):
domain_semantic_scores = [e['semantic_score'] for e in domain_evals]
if use_llm_judge:
domain_agg = aggregate_anls_star_llm(domain_semantic_scores, apply_bias_correction=True)
domain_semantic = domain_agg['adjusted_score'] * 100
else:
domain_semantic = sum(domain_semantic_scores) / len(domain_semantic_scores) * 100
domain_scores[domain] = {
'semantic': domain_semantic,
'anls': sum(e['anls'] for e in domain_evals) / len(domain_evals) * 100,
'n': len(domain_evals)
}
results = {
'n_evaluated': n,
'n_unmatched': len(unmatched),
'unmatched_samples': unmatched[:5],
'overall': {
'semantic': mean_semantic, # Primary metric (ANLS* + LLM judge)
'semantic_ci': semantic_ci, # 95% CI if LLM judge used
'anls': mean_anls, # Secondary metric (pure ANLS*)
'accuracy': accuracy,
'doc_f1': mean_doc_f1,
'page_f1': mean_page_f1,
'kuiper': kuiper['kuiper_stat'] if not kuiper.get('degenerate') else None,
},
'single_evidence': {
'semantic': (
aggregate_anls_star_llm([e['semantic_score'] for e in single_hop], apply_bias_correction=True)['adjusted_score'] * 100
if (use_llm_judge and single_hop) else (sum(e['semantic_score'] for e in single_hop) / len(single_hop) * 100 if single_hop else 0)
),
'anls': sum(e['anls'] for e in single_hop) / len(single_hop) * 100 if single_hop else 0,
'n': len(single_hop)
},
'multi_evidence_same_doc': {
'semantic': (
aggregate_anls_star_llm([e['semantic_score'] for e in cross_page], apply_bias_correction=True)['adjusted_score'] * 100
if (use_llm_judge and cross_page) else (sum(e['semantic_score'] for e in cross_page) / len(cross_page) * 100 if cross_page else 0)
),
'anls': sum(e['anls'] for e in cross_page) / len(cross_page) * 100 if cross_page else 0,
'n': len(cross_page)
},
'multi_evidence_multi_doc': {
'semantic': (
aggregate_anls_star_llm([e['semantic_score'] for e in cross_doc], apply_bias_correction=True)['adjusted_score'] * 100
if (use_llm_judge and cross_doc) else (sum(e['semantic_score'] for e in cross_doc) / len(cross_doc) * 100 if cross_doc else 0)
),
'anls': sum(e['anls'] for e in cross_doc) / len(cross_doc) * 100 if cross_doc else 0,
'n': len(cross_doc)
},
'by_domain': domain_scores,
'used_llm_judge': use_llm_judge
}
return results
@st.fragment
def submit_results_fragment():
"""Fragment for file upload and evaluation."""
# Check HuggingFace login
hf_user = get_hf_user()
if not hf_user:
st.warning("**Login Required**: Please sign in with your HuggingFace account to submit results.")
# Show login button
if not show_login_button():
st.info("""
**Login not available.** This feature requires deployment on HuggingFace Spaces
with `hf_oauth: true` in the Space's README.md metadata.
For local testing, set: `TEST_HF_USER=your_username`
""")
return
# Show logged-in user
st.success(f"Logged in as **{hf_user['username']}**")
# Check submission rate limit
can_submit, limit_msg, hours_left = can_user_submit(hf_user['username'])
if not can_submit:
st.warning(f"**Rate Limit**: {limit_msg}")
st.info("""
This limit helps prevent overfitting to the test set.
You can still evaluate locally on the **dev set**:
```bash
python evaluate.py your_predictions.jsonl --dataset agentic-document-ai/dataset --split dev
```
""")
return
# Step 1: Upload and Evaluate
st.markdown("#### Step 1: Upload Predictions")
# Two options: file upload or paste text
upload_tab, paste_tab = st.tabs(["Upload File", "Paste JSONL"])
with upload_tab:
uploaded_file = st.file_uploader(
"Upload your predictions JSONL file",
type=["jsonl"],
help="One prediction per line with 'question' and 'answer' fields",
)
with paste_tab:
pasted_content = st.text_area(
"Paste your JSONL content",
height=200,
help="One JSON object per line",
placeholder='{"question": "...", "answer": "...", "citations": [...]}\n{"question": "...", "answer": "...", "citations": [...]}',
)
with st.expander("Expected JSONL format"):
st.code('''{"question": "What is the total revenue?", "answer": "$1.2M", "citations": [{"file": "report.pdf", "page": 5}], "iterations": 3}
{"question": "Who signed the contract?", "answer": ["John Smith", "Jane Doe"], "citations": [{"file": "contract.pdf", "page": 12}], "iterations": 2}''', language="json")
st.markdown("""
**Required fields:**
- `question`: The question text (must match dataset)
- `answer`: Predicted answer (string or list)
**Optional fields (for full metrics):**
- `citations`: List of `{"file": "...", "page": N}` for attribution metrics
- `id`: Question ID (fallback matching)
**Effort fields (required for Agentic submissions, at least one per sample):**
- `steps`: Number of agentic steps taken (positive integer)
- `search_history`: List of search queries performed (e.g. `["query1", "query2"]`)
- `effort`: Generic effort measure (positive number), should be proportional to the number of searches, LLM calls, or reasoning tokens generated, in this order of preference
""")
# Initialize session state for evaluation results
if 'eval_results' not in st.session_state:
st.session_state.eval_results = None
if 'predictions' not in st.session_state:
st.session_state.predictions = None
# Get content from either file upload or paste
file_content = None
if uploaded_file is not None:
file_content = uploaded_file.read().decode("utf-8")
elif pasted_content and pasted_content.strip():
file_content = pasted_content.strip()
if file_content:
is_valid, error_msg, predictions = validate_jsonl_submission(file_content)
if not is_valid:
st.error(f"Invalid input: {error_msg}")
else:
st.success(f"Loaded {len(predictions)} predictions")
st.session_state.predictions = predictions
st.session_state.predictions_raw = file_content # Store raw content for upload
# Evaluate button
if st.button("Run Evaluation", type="primary"):
with st.spinner("Loading gold standard..."):
gold_by_text, gold_by_id = load_gold_standard()
if not gold_by_text:
st.error("Failed to load gold standard dataset")
else:
# Progress bar for evaluation
progress_bar = st.progress(0, text="Evaluating predictions with semantic accuracy...")
status_text = st.empty()
def update_progress(current, total):
progress_bar.progress(current / total, text=f"Evaluating {current}/{total}...")
results = evaluate_predictions(
predictions,
gold_by_text,
gold_by_id,
use_llm_judge=True,
progress_callback=update_progress
)
progress_bar.empty()
status_text.empty()
st.session_state.eval_results = results
# Show evaluation results
if st.session_state.eval_results:
results = st.session_state.eval_results
if 'error' in results:
st.error(results['error'])
else:
st.markdown("#### Evaluation Results")
# Summary metrics - use semantic accuracy as primary if available
col1, col2, col3, col4 = st.columns(4)
with col1:
if 'semantic' in results['overall']:
ci = results['overall'].get('semantic_ci')
ci_text = f" [{ci[0]:.1f}-{ci[1]:.1f}]" if ci else ""
st.metric("Accuracy (LLM judge)", f"{results['overall']['semantic']:.1f}{ci_text}")
else:
st.metric("Accuracy (ANLS*)", f"{results['overall']['anls']:.1f}")
with col2:
st.metric("Attribution (Page F1)", f"{results['overall']['page_f1']:.1f}")
with col3:
kuiper_val = results['overall']['kuiper']
st.metric("Effort (Kuiper)", f"{kuiper_val:.3f}" if kuiper_val else "N/A")
with col4:
st.metric("Evaluated", f"{results['n_evaluated']} / {results['n_evaluated'] + results['n_unmatched']}")
# Detailed breakdown
with st.expander("Detailed Breakdown"):
# Check which metrics are available
has_semantic = 'semantic' in results['overall']
if has_semantic:
st.markdown(f"""
| Metric | Value |
|--------|-------|
| **Accuracy (LLM judge)** | {results['overall']['semantic']:.1f} |
| **ANLS*** (string match) | {results['overall']['anls']:.1f} |
| **Acc. Single-Hop** (n={results['single_evidence']['n']}) | {results['single_evidence'].get('semantic', results['single_evidence']['anls']):.1f} |
| **Acc. Cross-Page** (n={results['multi_evidence_same_doc']['n']}) | {results['multi_evidence_same_doc'].get('semantic', results['multi_evidence_same_doc']['anls']):.1f} |
| **Acc. Cross-Doc** (n={results['multi_evidence_multi_doc']['n']}) | {results['multi_evidence_multi_doc'].get('semantic', results['multi_evidence_multi_doc']['anls']):.1f} |
| **Attribution (Doc F1)** | {results['overall']['doc_f1']:.1f} |
| **Attribution (Page F1)** | {results['overall']['page_f1']:.1f} |
""")
else:
st.markdown(f"""
| Metric | Value |
|--------|-------|
| **Overall ANLS*** | {results['overall']['anls']:.1f} |
| **Acc. Single-Hop** (n={results['single_evidence']['n']}) | {results['single_evidence']['anls']:.1f} |
| **Acc. Cross-Page** (n={results['multi_evidence_same_doc']['n']}) | {results['multi_evidence_same_doc']['anls']:.1f} |
| **Acc. Cross-Doc** (n={results['multi_evidence_multi_doc']['n']}) | {results['multi_evidence_multi_doc']['anls']:.1f} |
| **Attribution (Doc F1)** | {results['overall']['doc_f1']:.1f} |
| **Attribution (Page F1)** | {results['overall']['page_f1']:.1f} |
""")
if results['n_unmatched'] > 0:
with st.expander(f"{results['n_unmatched']} unmatched questions"):
for q in results['unmatched_samples']:
st.text(f"• {q}")
if results['n_unmatched'] > 5:
st.text(f"... and {results['n_unmatched'] - 5} more")
# Step 2: Model Information
st.markdown("---")
st.markdown("#### Step 2: Model Information")
col1, col2 = st.columns(2)
with col1:
model_name = st.text_input("Model Name *", placeholder="e.g., GPT-4o-Agent")
organization = st.text_input("Organization *", placeholder="e.g., OpenAI")
model_type = st.selectbox("Model Type *", options=["", "api", "open-weight"])
with col2:
description = st.text_area(
"Description",
placeholder="Brief description of your approach (e.g., 'Vision-language model with sparse search tool')",
height=80
)
link = st.text_input("Link (Optional)", placeholder="https://arxiv.org/abs/... or https://github.com/...")
selected_tags = st.multiselect(
"Tags",
options=AVAILABLE_TAGS,
default=["Agentic"],
help="Select tags that describe your approach"
)
# Step 3: Submit
st.markdown("---")
st.markdown("#### Step 3: Submit to Leaderboard")
if st.button("Submit to Leaderboard", type="primary", disabled=not (model_name and organization and model_type)):
# Validate required fields
submit_error = None
if not model_name or not organization or not model_type:
submit_error = "Please fill in all required fields (Model Name, Organization, Model Type)"
elif "Agentic" in selected_tags and st.session_state.predictions:
missing_effort = [
(i + 1, p.get('question', '')[:60])
for i, p in enumerate(st.session_state.predictions)
if not _prediction_has_effort(p)
]
if missing_effort:
samples = "; ".join(f"line {ln}: {q}..." for ln, q in missing_effort[:5])
extra = f" (and {len(missing_effort) - 5} more)" if len(missing_effort) > 5 else ""
submit_error = (
f"**Agentic submissions require effort data for every sample.** "
f"{len(missing_effort)} prediction(s) are missing effort information "
f"(e.g. `iterations`, `steps`, `llm_calls`, `effort`, or `search_history`). "
f"Examples: {samples}{extra}"
)
if submit_error:
st.error(submit_error)
else:
# Get current user for submission tracking
hf_user = get_hf_user()
# Prepare submission data
submission = {
"model_name": model_name.strip(),
"organization": organization.strip(),
"description": description.strip() if description else "",
"link": link.strip() if link else "",
"tags": selected_tags,
"submitted_by": hf_user['username'] if hf_user else "anonymous",
"metadata": {
"model_type": model_type,
},
"results": {
"overall": {
"semantic": results['overall'].get('semantic'),
"semantic_ci": results['overall'].get('semantic_ci'),
"anls": results['overall']['anls'],
"page_f1": results['overall']['page_f1'],
"doc_f1": results['overall']['doc_f1'],
"kuiper": results['overall']['kuiper'],
},
"single_evidence": results['single_evidence'],
"multi_evidence_same_doc": results['multi_evidence_same_doc'],
"multi_evidence_multi_doc": results['multi_evidence_multi_doc'],
"by_domain": results.get('by_domain', {}),
},
"submission_date": datetime.now(timezone.utc).isoformat(),
}
# Upload to HuggingFace Hub
with st.spinner("Uploading to leaderboard..."):
try:
# Create path matching expected structure: {org}/{model}_results_{timestamp}.json
safe_org = organization.strip().replace(" ", "_").replace("/", "-")
safe_model = model_name.strip().replace(" ", "_").replace("/", "-")
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
filename = f"{safe_model}_results_{timestamp}.json"
path_in_repo = f"{safe_org}/{filename}"
# Upload using HfApi
api = HfApi()
# Upload results JSON
api.upload_file(
path_or_fileobj=json.dumps(submission, indent=2).encode("utf-8"),
path_in_repo=path_in_repo,
repo_id=RESULTS_REPO,
repo_type="dataset",
token=TOKEN,
commit_message=f"Add results for {organization}/{model_name}"
)
# Upload predictions file
if st.session_state.get('predictions_raw'):
predictions_filename = f"{safe_model}_predictions_{timestamp}.jsonl"
predictions_path = f"{safe_org}/{predictions_filename}"
api.upload_file(
path_or_fileobj=st.session_state.predictions_raw.encode("utf-8"),
path_in_repo=predictions_path,
repo_id=RESULTS_REPO,
repo_type="dataset",
token=TOKEN,
commit_message=f"Add predictions for {organization}/{model_name}"
)
st.success("Successfully submitted to leaderboard!")
st.balloons()
# Record submission for rate limiting
record_submission(hf_user['username'])
# Clear cache to force refresh on next load
download_data.clear()
load_eval_results.clear()
# Clear form state
st.session_state.eval_results = None
st.session_state.predictions = None
st.session_state.predictions_raw = None
st.info("Your submission has been saved! The leaderboard will update shortly.")
# Auto-refresh after a moment
st.rerun(scope="app")
except Exception as e:
st.error(f"Upload failed: {str(e)}")
st.warning("Please ensure HF_TOKEN environment variable is set with write access to the repository.")
with st.expander("Submission JSON (for manual upload)"):
st.code(json.dumps(submission, indent=2), language="json")
st.info(f"""
**To submit manually:**
1. Copy the JSON above
2. Save as `{path_in_repo}`
3. Upload to `{RESULTS_REPO}` on HuggingFace Hub
Or contact lukasz.borchmann@snowflake.com
""")
def get_all_submissions() -> list[dict]:
"""Get all submission files with their metadata."""
submissions = []
results_path = Path(EVAL_RESULTS_PATH)
if not results_path.exists():
return submissions
for org_dir in results_path.iterdir():
if org_dir.is_dir() and not org_dir.name.startswith('.'):
for result_file in org_dir.glob("*_results_*.json"):
try:
with open(result_file) as f:
data = json.load(f)
submission_date = data.get("submission_date")
if not isinstance(submission_date, str):
submission_date = ""
submissions.append({
"file_path": str(result_file),
"relative_path": f"{org_dir.name}/{result_file.name}",
"model_name": data.get("model_name", "Unknown"),
"organization": data.get("organization", org_dir.name),
"submitted_by": data.get("submitted_by", "Unknown"),
"submission_date": submission_date,
"accuracy": data.get("results", {}).get("overall", {}).get("anls", 0.0),
"raw_json": json.dumps(data, indent=2),
})
except Exception as e:
submissions.append({
"file_path": str(result_file),
"relative_path": f"{org_dir.name}/{result_file.name}",
"model_name": "Error loading",
"organization": org_dir.name,
"submitted_by": "Unknown",
"submission_date": "Unknown",
"accuracy": 0.0,
"raw_json": f"Error: {e}",
})
# Sort by submission date (newest first), fallback to empty string
def _submission_sort_key(item: dict) -> str:
date_val = item.get("submission_date")
return date_val if isinstance(date_val, str) else ""
submissions.sort(key=_submission_sort_key, reverse=True)
return submissions
def delete_submission_from_hub(relative_path: str) -> tuple[bool, str]:
"""Delete a submission file from the HuggingFace Hub."""
try:
api = HfApi(token=TOKEN)
api.delete_file(
path_in_repo=relative_path,
repo_id=RESULTS_REPO,
repo_type="dataset",
)
return True, f"Successfully deleted {relative_path}"
except Exception as e:
return False, f"Failed to delete: {str(e)}"
def update_submission_on_hub(relative_path: str, json_content: str) -> tuple[bool, str]:
"""Update a submission file on HuggingFace Hub."""
import tempfile
try:
# Validate JSON
data = json.loads(json_content)
# Create temp file with updated content
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
json.dump(data, f, indent=2)
temp_path = f.name
api = HfApi(token=TOKEN)
api.upload_file(
path_or_fileobj=temp_path,
path_in_repo=relative_path,
repo_id=RESULTS_REPO,
repo_type="dataset",
token=TOKEN,
commit_message=f"Admin edit: {relative_path}"
)
os.unlink(temp_path) # Clean up
return True, f"Successfully updated {relative_path}"
except json.JSONDecodeError as e:
return False, f"Invalid JSON: {str(e)}"
except Exception as e:
return False, f"Failed to update: {str(e)}"
@st.fragment
def admin_panel():
"""Admin panel for managing submissions."""
st.markdown("#### Admin Panel")
st.markdown("Manage leaderboard submissions. Changes are permanent.")
# Admin action buttons
col1, col2 = st.columns(2)
with col1:
if st.button("Refresh Submissions", use_container_width=True):
st.rerun()
with col2:
if st.button("Reload from HuggingFace", type="primary", use_container_width=True):
# Clear all caches
download_data.clear()
load_eval_results.clear()
# Delete local cached files to force fresh download
if EVAL_RESULTS_PATH.exists():
shutil.rmtree(EVAL_RESULTS_PATH)
if EVAL_REQUESTS_PATH.exists():
shutil.rmtree(EVAL_REQUESTS_PATH)
# Re-download data
with st.spinner("Re-downloading data from HuggingFace Hub..."):
download_data()
st.success("Leaderboard data reloaded from source!")
st.rerun(scope="app")
st.divider()
submissions = get_all_submissions()
if not submissions:
st.info("No submissions found.")
return
st.markdown(f"**{len(submissions)} submissions found**")
# Display each submission
for i, sub in enumerate(submissions):
with st.expander(f"{sub['model_name']} ({sub['organization']}) - {sub['submission_date'][:10] if len(sub['submission_date']) > 10 else sub['submission_date']}"):
col1, col2 = st.columns([3, 1])
with col1:
st.markdown(f"""
**Model:** {sub['model_name']}
**Organization:** {sub['organization']}
**Submitted by:** {sub['submitted_by']}
**Date:** {sub['submission_date']}
**Accuracy:** {sub['accuracy']:.1%}
**File:** `{sub['relative_path']}`
""")
with col2:
# Edit button
if st.button("Edit", key=f"edit_{i}"):
st.session_state[f"editing_{i}"] = True
st.session_state[f"confirm_delete_{i}"] = False
# Delete button with confirmation
if st.button("Delete", key=f"delete_{i}", type="secondary"):
st.session_state[f"confirm_delete_{i}"] = True
st.session_state[f"editing_{i}"] = False
if st.session_state.get(f"confirm_delete_{i}", False):
st.warning("Are you sure?")
col_yes, col_no = st.columns(2)
with col_yes:
if st.button("Yes", key=f"confirm_yes_{i}", type="primary"):
success, message = delete_submission_from_hub(sub['relative_path'])
if success:
st.success(message)
# Clear caches and refresh
download_data.clear()
load_eval_results.clear()
st.session_state[f"confirm_delete_{i}"] = False
st.rerun()
else:
st.error(message)
with col_no:
if st.button("No", key=f"confirm_no_{i}"):
st.session_state[f"confirm_delete_{i}"] = False
st.rerun()
# Edit mode
if st.session_state.get(f"editing_{i}", False):
st.markdown("**Edit JSON:**")
edited_json = st.text_area(
"Edit submission JSON",
value=sub['raw_json'],
height=400,
key=f"json_editor_{i}",
label_visibility="collapsed"
)
col_save, col_cancel = st.columns(2)
with col_save:
if st.button("Save Changes", key=f"save_{i}", type="primary"):
success, message = update_submission_on_hub(sub['relative_path'], edited_json)
if success:
st.success(message)
# Clear caches and refresh
download_data.clear()
load_eval_results.clear()
st.session_state[f"editing_{i}"] = False
st.rerun()
else:
st.error(message)
with col_cancel:
if st.button("Cancel", key=f"cancel_{i}"):
st.session_state[f"editing_{i}"] = False
st.rerun()
else:
# Show raw JSON (read-only) - use checkbox instead of expander to avoid nesting
if st.checkbox("Show JSON", key=f"show_json_{i}"):
st.code(sub['raw_json'], language="json")
# News management section
st.divider()
st.markdown("#### News Management")
news_items = get_news()
news_json = json.dumps(news_items, indent=2)
with st.expander("Edit News (JSON)", expanded=False):
st.markdown("""
**Format:** Array of objects with `date` (YYYY-MM-DD) and `text` fields.
```json
[
{"date": "2025-01-04", "text": "Your update message here"},
...
]
```
""")
edited_news = st.text_area(
"News JSON",
value=news_json,
height=300,
key="news_editor",
label_visibility="collapsed"
)
if st.button("Save News", type="primary"):
try:
parsed_news = json.loads(edited_news)
if not isinstance(parsed_news, list):
st.error("News must be a JSON array")
else:
success, message = save_news(parsed_news)
if success:
st.success(message)
st.rerun()
else:
st.error(message)
except json.JSONDecodeError as e:
st.error(f"Invalid JSON: {e}")
def main():
# Handle OAuth callback (if returning from HuggingFace login)
handle_oauth_callback()
# Handle "analyze" query parameter from leaderboard
analyze_model = st.query_params.get("analyze")
if analyze_model:
st.session_state.selected_model_for_analysis = unquote(analyze_model)
st.session_state.go_to_analysis_tab = True
# Clear the query param to avoid re-triggering
st.query_params.clear()
# Inject JavaScript to click on the Analysis tab
import streamlit.components.v1 as components
components.html("""
<script>
// Wait for Streamlit to render, then click Analysis tab
function clickAnalysisTab() {
const tabs = window.parent.document.querySelectorAll('[data-baseweb="tab"]');
if (tabs.length > 1) {
tabs[1].click(); // Analysis is the second tab (index 1)
} else {
// Retry if tabs not yet rendered
setTimeout(clickAnalysisTab, 100);
}
}
setTimeout(clickAnalysisTab, 200);
</script>
""", height=0)
# Download data from HuggingFace Hub
with st.spinner("Loading data from HuggingFace Hub..."):
download_data()
# Load data
df = load_eval_results()
# Check if admin user is logged in
hf_user = get_hf_user()
is_admin = hf_user and hf_user.get('username', '').lower() == 'borchmann'
# Tabs - show Admin tab only for admin users
if is_admin:
tab1, tab2, tab3, tab4, tab5 = st.tabs(["Leaderboard", "Analysis", "About", "Submit Results", "Admin"])
else:
tab1, tab2, tab3, tab4 = st.tabs(["Leaderboard", "Analysis", "About", "Submit Results"])
# ===== LEADERBOARD TAB =====
with tab1:
# Header with icon (fallback to emoji if icon doesn't load)
if ICON_MEDAL:
icon_html = f'<img src="{ICON_MEDAL}" style="width: 40px; height: 40px; vertical-align: middle; margin-right: 12px;" />'
else:
icon_html = f'<span style="font-size: 36px; margin-right: 12px;">🏆</span>'
st.markdown(f'<h3 style="display: flex; align-items: center; margin-top: 1.5rem; margin-bottom: 1.2rem;">{icon_html} Leaderboard</h3>', unsafe_allow_html=True)
if df.empty:
st.warning("No evaluation results found. Submit your results to appear on the leaderboard!")
else:
# ===== FILTERS SIDE BY SIDE =====
filter_col1, filter_col2 = st.columns(2)
with filter_col1:
# TAG FILTER - chips use MID_BLUE (darker, gradient start)
tags_in_data = get_all_tags_from_df(df)
all_available_tags = sorted(list(set(AVAILABLE_TAGS + tags_in_data)))
selected_tags = st.multiselect(
"Filter by techniques/features:",
options=all_available_tags,
default=[],
placeholder="Click to filter by tags...",
key="tag_filter",
)
with filter_col2:
# COLUMN SELECTOR - chips use SNOWFLAKE_BLUE (lighter, gradient end)
# Mapping: short chip name -> full column name
COLUMN_CHIP_NAMES = {
"Accuracy": "Accuracy (LLM judge)",
"Acc. Single-Hop": "Acc. Single-Hop",
"Acc. Cross-Page": "Acc. Cross-Page",
"Acc. Cross-Doc": "Acc. Cross-Doc",
"ANLS*": "ANLS* (string)",
"Attribution": "Attribution (Page F1)",
"Attribution (Doc)": "Attribution (Doc F1)",
"Effort": "Effort (Kuiper)",
"Model Type": "Model Type",
"Tags": "Tags",
}
# Reverse mapping for lookup
CHIP_TO_COLUMN = COLUMN_CHIP_NAMES
COLUMN_TO_CHIP = {v: k for k, v in COLUMN_CHIP_NAMES.items()}
all_columns = list(df.columns)
# Model and Organization are always visible (not in selector)
always_visible = ["Model", "Organization"]
# Hidden columns (used internally but not shown as separate columns)
hidden_cols = ["Link", "Submission Date", "Description", "_by_domain", "_Accuracy_CI", "_Accuracy_SE"]
# Full column names that are optional (Tags moved to end)
optional_full_cols = [c for c in all_columns if c not in hidden_cols + always_visible and c != "Tags"]
optional_full_cols.append("Tags") # Add Tags at the end
# Convert to chip names for display
optional_chips = [COLUMN_TO_CHIP.get(c, c) for c in optional_full_cols]
default_chips = ["Model Type", "Tags", "Accuracy", "Attribution", "Effort"]
default_selected = [c for c in default_chips if c in optional_chips]
selected_chips = st.multiselect(
"Select columns to display:",
options=optional_chips,
default=default_selected,
key="column_selector",
)
# Convert selected chips back to full column names
selected_optional = [CHIP_TO_COLUMN.get(c, c) for c in selected_chips]
# Apply tag filter
filtered_df = filter_df_by_tags(df, selected_tags)
# Show filter status
if selected_tags:
st.caption(f"Showing {len(filtered_df)} of {len(df)} models matching selected tags")
# Model and Organization are always included first
selected_columns = ["Model", "Organization"] + [c for c in optional_full_cols if c in selected_optional]
# Initialize uncertainty mode in session state if not present
if "uncertainty_mode" not in st.session_state:
st.session_state.uncertainty_mode = "± SE"
if selected_columns:
# Render HTML table with proper styling
render_leaderboard_table(filtered_df, selected_columns, uncertainty_mode=st.session_state.uncertainty_mode)
# Bottom row: Uncertainty toggle (left) and Download button (right)
st.markdown("") # Small spacing
col1, col2 = st.columns([3, 1])
with col1:
st.radio(
"Uncertainty:",
options=["± SE", "90% CI", "95% CI", "None"],
key="uncertainty_mode",
horizontal=True,
help="Display uncertainty estimates for accuracy and attribution metrics"
)
with col2:
# Right-align the download button but keep its natural width
st.markdown('''<style>
.st-key-download_csv_btn {
width: 100% !important;
display: flex;
justify-content: flex-end;
}
.st-key-download_csv_btn button {
margin-left: auto !important;
}
</style>''', unsafe_allow_html=True)
csv_df = build_csv_download_df(filtered_df, selected_columns, st.session_state.uncertainty_mode)
csv = csv_df.to_csv(index=False)
st.download_button(
label="Download as CSV",
data=csv,
file_name="leaderboard.csv",
mime="text/csv",
key="download_csv_btn",
)
# News and Paper section (two columns)
st.markdown("<br>", unsafe_allow_html=True) # Spacing
news_col, paper_col = st.columns([2, 1])
with news_col:
st.markdown("<span style='font-size: 1rem; font-weight: normal;'>Updates</span>", unsafe_allow_html=True)
news_items = get_news()[:NEWS_MAX_DISPLAY]
if news_items:
for item in news_items:
date_str = item.get('date', '')
text = item.get('text', '')
# Use full date (YYYY-MM-DD)
formatted_date = date_str[:10] if len(date_str) >= 10 else date_str
st.caption(f"**{formatted_date}**: {text}")
else:
st.caption("No updates yet.")
with paper_col:
st.markdown("""
<div style="text-align: right;">
<a href="https://arxiv.org/abs/2603.12180" target="_blank" style="color: #9CA3AF; text-decoration: none;">Strategic Navigation or Stochastic Search?<br>How Agents and Humans Reason Over Document Collections</a>
</div>
""", unsafe_allow_html=True)
# ===== VISUALIZATIONS TAB =====
with tab2:
if ICON_EYE:
icon_html = f'<img src="{ICON_EYE}" style="width: 40px; height: 40px; vertical-align: middle; margin-right: 12px;" />'
else:
icon_html = f'<span style="font-size: 36px; margin-right: 12px;">📈</span>'
st.markdown(f'<h3 style="display: flex; align-items: center; margin-top: 1.5rem; margin-bottom: 1.2rem;">{icon_html} Analysis</h3>', unsafe_allow_html=True)
if df.empty:
st.warning("No data available for visualization.")
else:
# Check if user came from leaderboard with a specific model
if st.session_state.get('go_to_analysis_tab'):
st.info(f"Showing analysis for: **{st.session_state.get('selected_model_for_analysis', '')}**")
st.session_state.go_to_analysis_tab = False
# Model details selector - at the top
st.markdown("#### Model Details")
model_names = df["Model"].tolist()
# Use session state to allow setting model from leaderboard
if 'selected_model_for_analysis' not in st.session_state:
st.session_state.selected_model_for_analysis = model_names[0] if model_names else None
# Ensure selected model exists in current data
selected_index = 0
if st.session_state.selected_model_for_analysis in model_names:
selected_index = model_names.index(st.session_state.selected_model_for_analysis)
selected_model = st.selectbox(
"Select a model to view detailed breakdown:",
model_names,
index=selected_index,
key="analysis_model_selector"
)
if selected_model:
st.session_state.selected_model_for_analysis = selected_model
show_model_details(selected_model)
# Plots below
st.markdown("---")
st.markdown("#### Comparative Plots")
# Two plots side by side
col1, col2 = st.columns(2)
with col1:
fig_attribution = create_accuracy_vs_attribution_plot(df)
st.plotly_chart(fig_attribution, width="stretch")
with col2:
fig_effort = create_accuracy_vs_effort_plot(df)
st.plotly_chart(fig_effort, width="stretch")
st.markdown("""
**Understanding the plots:**
- Each point represents a model submission
- **Orange points**: API-based models
- **Blue points**: Open-weight models
- Hover over points to see model details
- **Left plot**: Upper-right = high accuracy with good attribution (optimal)
- **Right plot**: Upper-left = high accuracy with good effort calibration (optimal)
""")
# ===== ABOUT TAB =====
with tab3:
if ICON_DOCS:
icon_html = f'<img src="{ICON_DOCS}" style="width: 40px; height: 40px; vertical-align: middle; margin-right: 12px;" />'
else:
icon_html = f'<span style="font-size: 36px; margin-right: 12px;">📖</span>'
st.markdown(f'<h3 style="display: flex; align-items: center; margin-top: 1.5rem; margin-bottom: 1.2rem;">{icon_html} About</h3>', unsafe_allow_html=True)
about_col1, about_col2 = st.columns(2)
with about_col1:
st.markdown("""
#### MADQA Benchmark
This benchmark evaluates AI systems on **Agentic Document Collection Visual Question Answering** —
a task requiring systems to navigate, retrieve, reason over, and aggregate information from
heterogeneous document collections.
📄 [Read the paper: *Strategic Navigation or Stochastic Search?*](https://arxiv.org/abs/2603.12180)
##### Dataset
- **2,250** human-authored question-answer pairs
- **800** multi-page PDF documents from diverse real-world domains
- **18,619** total pages with rich visual layouts
- **17.3%** multi-hop questions (cross-page and cross-document)
- **63** document categories across **13** high-level domains
##### Task Properties
The task is characterized by six formal properties:
1. **Extractive**: Answers are drawn from evidence pages, not generated abstractly
2. **Multi-Hop**: Evidence may span multiple disjoint pages requiring aggregation
3. **Closed-World**: Answers must be derivable solely from the corpus
4. **Grounded**: Answers must be faithfully attributed to minimal evidence
5. **Agentic**: Requires iterative retrieval and reasoning (planning, navigation, aggregation)
6. **Visual**: Answering may require non-textual information (layout, tables, figures)
""")
with about_col2:
st.markdown("""
#### Metrics
##### Accuracy (LLM judge)
- **Accuracy (LLM judge)**: Primary metric combining ANLS* string matching with an LLM judge (G-Eval framework). Captures semantic correctness beyond exact string matching, with statistical bias correction
- **ANLS* (string)**: Pure string-based score using Average Normalized Levenshtein Similarity with optimal element alignment for lists/sets
- **Acc. Single-Hop**: Accuracy on questions requiring a single evidence page
- **Acc. Cross-Page**: Accuracy on multi-hop questions within the same document
- **Acc. Cross-Doc**: Accuracy on multi-hop questions spanning multiple documents
##### Attribution (Page F1)
- **Attribution (Page F1)**: F1 score measuring overlap between cited pages and gold evidence pages (penalizes both missing and spurious citations)
- **Attribution (Doc F1)**: Document-level attribution accuracy (whether the correct documents were identified)
##### Effort (Kuiper)
- **Effort (Kuiper)**: Measures whether computational effort correlates with problem difficulty. Lower values indicate better calibration—the system "knows what it knows" and doesn't waste effort on unsolvable queries
---
**Contact:** [lukasz.borchmann@snowflake.com](mailto:lukasz.borchmann@snowflake.com)
""")
# ===== SUBMIT TAB =====
with tab4:
if ICON_WRITE:
icon_html = f'<img src="{ICON_WRITE}" style="width: 40px; height: 40px; vertical-align: middle; margin-right: 12px;" />'
else:
icon_html = f'<span style="font-size: 36px; margin-right: 12px;">📝</span>'
st.markdown(f'<h3 style="display: flex; align-items: center; margin-top: 1.5rem; margin-bottom: 1.2rem;">{icon_html} Submit Results</h3>', unsafe_allow_html=True)
if not EVAL_AVAILABLE:
st.warning("Evaluation module not available. Please install dependencies: `pip install anls-star datasets`")
# Use fragment to prevent tab switch on file upload
submit_results_fragment()
# ===== ADMIN TAB (only for admin users) =====
if is_admin:
with tab5:
admin_panel()
if __name__ == "__main__":
main()