import gradio as gr
import pandas as pd
import json
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import numpy as np
import os
import datetime
# Import submission handling functions
from submission import add_new_submission
# Optional imports with fallbacks
try:
from content import format_error, format_warning, format_log
except ImportError:
def format_error(msg): return f"â **Error:** {msg}"
def format_warning(msg): return f"â ī¸ **Warning:** {msg}"
def format_log(msg): return f"â
{msg}"
# Configuration
TOKEN = os.environ.get("HF_TOKEN") or os.environ.get("TOKEN", None)
OWNER = "Pettingllms"
GROUNDTRUTH_PATH = f"{OWNER}/AMA-bench"
LOCAL_DEBUG = True
# ---------------------------------------------------------------------------
# Data loading
# ---------------------------------------------------------------------------
def load_jsonl_data(path):
"""Load JSONL data from file."""
data = []
if not os.path.exists(path):
print(f"Warning: {path} not found, returning empty list")
return data
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
data.append(json.loads(line))
return data
def load_qa_distribution():
"""Load QA distribution data."""
qa_dist_path = "data/qa_distribution.json"
if os.path.exists(qa_dist_path):
with open(qa_dist_path, "r", encoding="utf-8") as f:
return json.load(f)
return None
def convert_jsonl_to_dict(jsonl_data, is_agent=False):
"""
Convert JSONL data to the dictionary format used by visualization functions.
Args:
jsonl_data: List of dictionaries from JSONL file
is_agent: Boolean indicating if this is agent data
Returns:
Three dictionaries: capability_dict, domain_dict, and verified_dict
"""
capability_dict = {
"Recall": {},
"Causal Inference": {},
"State Updating": {},
"State Abstraction": {}
}
domain_dict = {
"TEXT2SQL": {},
"SOFTWARE": {},
"WEB": {},
"GAME": {},
"EMBODIED_AI": {},
"OPENWORLD_QA": {}
}
# Store verified status for each model/agent
verified_dict = {}
capability_mapping = {
"A": "Recall",
"B": "Causal Inference",
"C": "State Updating",
"D": "State Abstraction"
}
for entry in jsonl_data:
name = entry.get("agent_name") if is_agent else entry.get("model")
if not name:
continue
model_family = entry.get("model_family", "")
verified = entry.get("verified", False)
scores = entry.get("Score", {})
# Store verified status
verified_dict[name] = verified
# Process each domain
for domain, domain_scores in scores.items():
# domain_scores is a list like [{"A": 0.5}, {"B": 0.6}, {"C": 0.7}, {"D": 0.8}]
if domain not in domain_dict:
continue
# Extract capability scores for this domain
capability_scores_for_domain = {}
for score_dict in domain_scores:
for cap_letter, score_value in score_dict.items():
capability_scores_for_domain[cap_letter] = score_value
# Calculate weighted average score for this domain using ratio_in_domain
avg_domain_score = 0
if QA_DISTRIBUTION and domain in QA_DISTRIBUTION.get("domain_distribution", {}):
domain_info = QA_DISTRIBUTION["domain_distribution"][domain]
problem_types = domain_info.get("problem_types", {})
weighted_sum = 0
weight_total = 0
for cap_letter, score_value in capability_scores_for_domain.items():
if cap_letter in problem_types:
weight = problem_types[cap_letter].get("ratio_in_domain", 0.0)
weighted_sum += score_value * weight
weight_total += weight
avg_domain_score = weighted_sum / weight_total if weight_total > 0 else 0
else:
# Fallback to simple average if no distribution data
domain_score_values = list(capability_scores_for_domain.values())
avg_domain_score = sum(domain_score_values) / len(domain_score_values) if domain_score_values else 0
# Store in domain_dict
domain_dict[domain][name] = {
"accuracy": avg_domain_score,
"model_family": model_family,
"f1": avg_domain_score # For now, use same value for f1
}
# Store in capability_dict with ratio_overall for later weighted averaging
for cap_letter, score_value in capability_scores_for_domain.items():
capability_name = capability_mapping.get(cap_letter)
if capability_name and capability_name in capability_dict:
if name not in capability_dict[capability_name]:
capability_dict[capability_name][name] = {
"accuracy": 0,
"model_family": model_family,
"f1": 0,
"weight_sum": 0
}
# Use ratio_overall as weight for this capability score
weight = 0
if QA_DISTRIBUTION and domain in QA_DISTRIBUTION.get("domain_distribution", {}):
domain_info = QA_DISTRIBUTION["domain_distribution"][domain]
problem_types = domain_info.get("problem_types", {})
if cap_letter in problem_types:
weight = problem_types[cap_letter].get("ratio_overall", 0.0)
else:
# Fallback: equal weight across domains
weight = 1.0 / 6 # 6 domains
capability_dict[capability_name][name]["accuracy"] += score_value * weight
capability_dict[capability_name][name]["f1"] += score_value * weight
capability_dict[capability_name][name]["weight_sum"] += weight
# Calculate weighted averages for capability scores
for capability_name, models in capability_dict.items():
for model_name, model_data in models.items():
weight_sum = model_data.get("weight_sum", 1)
model_data["accuracy"] = model_data["accuracy"] / weight_sum if weight_sum > 0 else 0
model_data["f1"] = model_data["f1"] / weight_sum if weight_sum > 0 else 0
del model_data["weight_sum"]
return capability_dict, domain_dict, verified_dict
# Load all data files
AGENT_DATA = load_jsonl_data("data/agent.jsonl")
MODEL_DATA = load_jsonl_data("data/model.jsonl")
QA_DISTRIBUTION = load_qa_distribution()
# Convert to dictionary format for visualization
AGENT_CAPABILITY, AGENT_DOMAIN, AGENT_VERIFIED = convert_jsonl_to_dict(AGENT_DATA, is_agent=True)
MODEL_CAPABILITY, MODEL_DOMAIN, MODEL_VERIFIED = convert_jsonl_to_dict(MODEL_DATA, is_agent=False)
METRICS = ["Recall", "Causal Inference", "State Updating", "State Abstraction"]
# Weighted ratios (from benchmark data distribution)
# Use QA distribution if available, otherwise use hardcoded values
if QA_DISTRIBUTION:
domain_dist = QA_DISTRIBUTION.get("domain_distribution", {})
DOMAIN_RATIO = {
key: value.get("qa_ratio", 0) for key, value in domain_dist.items()
}
problem_types = QA_DISTRIBUTION.get("overall_distribution", {}).get("problem_types", {})
PROBLEM_TYPE_RATIO = {
"RECALL": problem_types.get("A", {}).get("ratio", 0.336),
"CAUSAL_INFERENCE": problem_types.get("B", {}).get("ratio", 0.239),
"STATE_UPDATING": problem_types.get("C", {}).get("ratio", 0.259),
"STATE_ABSTRACTION": problem_types.get("D", {}).get("ratio", 0.166),
}
else:
# Fallback to hardcoded values
DOMAIN_RATIO = {
"TEXT2SQL": 612 / 2496,
"SOFTWARE": 432 / 2496,
"WEB": 372 / 2496,
"GAME": 360 / 2496,
"EMBODIED_AI": 360 / 2496,
"OPENWORLD_QA": 360 / 2496,
}
PROBLEM_TYPE_RATIO = {
"RECALL": 839 / 2496,
"CAUSAL_INFERENCE": 596 / 2496,
"STATE_UPDATING": 647 / 2496,
"STATE_ABSTRACTION": 414 / 2496,
}
def _normalize_category_key(name: str) -> str:
"""Normalize category key to uppercase snake-style for matching."""
return str(name).strip().upper().replace(" ", "_").replace("-", "_")
def get_category_weights(categories):
"""Return normalized per-category weights based on configured ratios."""
if not categories:
return {}
# Normalize all categories to uppercase with underscores
normalized = [_normalize_category_key(c) for c in categories]
# Check if categories match domain keys or problem type keys
domain_hits = sum(1 for c in normalized if c in DOMAIN_RATIO)
type_hits = sum(1 for c in normalized if c in PROBLEM_TYPE_RATIO)
# Detect whether current dict is domain-based or capability/problem-type-based
use_domain = domain_hits >= type_hits
weights = {}
for original in categories:
key = _normalize_category_key(original)
if use_domain:
weight = DOMAIN_RATIO.get(key, 0.0)
else:
weight = PROBLEM_TYPE_RATIO.get(key, 0.0)
weights[original] = weight
total = sum(weights.values())
if total <= 0:
equal_weight = 1.0 / len(categories)
return {c: equal_weight for c in categories}
return {c: w / total for c, w in weights.items()}
def get_ratio_overall_weights():
"""
Get weights based on ratio_overall from qa_distribution.json.
Returns a nested dict: {domain: {capability: ratio_overall}}
"""
if not QA_DISTRIBUTION:
return {}
weights = {}
capability_mapping = {
"A": "Recall",
"B": "Causal Inference",
"C": "State Updating",
"D": "State Abstraction"
}
domain_dist = QA_DISTRIBUTION.get("domain_distribution", {})
for domain, domain_data in domain_dist.items():
weights[domain] = {}
problem_types = domain_data.get("problem_types", {})
for cap_letter, cap_data in problem_types.items():
capability_name = capability_mapping.get(cap_letter)
if capability_name:
weights[domain][capability_name] = cap_data.get("ratio_overall", 0.0)
return weights
def filter_data_by_items(data_dict, allowed_items):
"""Filter nested score dict to only keep specified items for each category."""
allowed_set = set(allowed_items)
filtered = {}
for category, category_data in data_dict.items():
filtered[category] = {
item: item_data
for item, item_data in category_data.items()
if item in allowed_set
}
return filtered
# Color palette: Distinct colors for better differentiation
COLORS = [
'rgba(135, 160, 220, 0.5)', # Light Blue
'rgba(230, 150, 120, 0.5)', # Orange
'rgba(180, 180, 180, 0.5)', # Gray
'rgba(255, 215, 100, 0.5)', # Yellow
'rgba(140, 180, 220, 0.5)', # Sky Blue
'rgba(140, 200, 150, 0.5)', # Green
'rgba(200, 160, 140, 0.5)', # Brown
'rgba(130, 140, 200, 0.5)', # Purple-Blue
'rgba(255, 180, 150, 0.5)', # Coral
'rgba(150, 220, 180, 0.5)', # Mint Green
]
# ---------------------------------------------------------------------------
# Visualization functions
# ---------------------------------------------------------------------------
def create_radar_chart_from_dict(data_dict, title="Performance Radar Chart", top_n=10):
"""
Create radar chart from dictionary data showing top N entries.
Args:
data_dict: Dictionary with structure {category: {item_name: {accuracy: x, f1: y}}}
title: Chart title
top_n: Number of top entries to display (default 10)
Returns:
Plotly Figure with radar chart (showing only accuracy)
"""
if not data_dict:
fig = go.Figure()
fig.update_layout(title="No data available")
return fig
# Extract categories and items
categories = list(data_dict.keys())
all_items = set()
for category_data in data_dict.values():
all_items.update(category_data.keys())
# Calculate weighted average accuracy for each item to determine top N
category_weights = get_category_weights(categories)
item_avg_scores = {}
for item in all_items:
weighted_sum = 0.0
weight_sum = 0.0
for category in categories:
item_data = data_dict[category].get(item, {})
accuracy = item_data.get('accuracy', 0) if isinstance(item_data, dict) else item_data
weight = category_weights.get(category, 0.0)
weighted_sum += accuracy * weight
weight_sum += weight
item_avg_scores[item] = (weighted_sum / weight_sum) if weight_sum > 0 else 0
# Get top N items by average accuracy
sorted_items = sorted(item_avg_scores.items(), key=lambda x: x[1], reverse=True)
top_items = [item[0] for item in sorted_items[:top_n]]
fig = go.Figure()
# Add trace for each top item
for idx, item in enumerate(top_items):
values = []
for category in categories:
item_data = data_dict[category].get(item, {})
# Extract accuracy value only
accuracy = item_data.get('accuracy', 0) if isinstance(item_data, dict) else item_data
values.append(accuracy * 100) # Convert to percentage
# Close the polygon
values_closed = values + [values[0]]
categories_closed = categories + [categories[0]]
color = COLORS[idx % len(COLORS)]
fig.add_trace(go.Scatterpolar(
r=values_closed,
theta=categories_closed,
mode='lines+markers',
fill='toself',
name=item,
line=dict(color=color, width=2),
marker=dict(color=color, size=8),
fillcolor=color.replace('0.5', '0.15'),
hovertemplate='%{fullData.name}
%{theta}: %{r:.2f}%'
))
# Update layout
fig.update_layout(
title=dict(
text=title,
x=0.5,
xanchor='center',
font=dict(size=20, color='#2c3e50')
),
polar=dict(
radialaxis=dict(
visible=True,
range=[0, 100],
ticksuffix='%',
tickfont=dict(size=11),
gridcolor='rgba(200, 200, 200, 0.3)',
gridwidth=1
),
angularaxis=dict(
tickfont=dict(size=13, weight='bold', color='#2c3e50')
),
bgcolor='rgba(245, 245, 245, 0.5)'
),
legend=dict(
font=dict(size=11),
title=dict(text="Items", font=dict(size=13)),
x=1.02,
y=1,
xanchor='left',
yanchor='top',
bgcolor='rgba(255,255,255,0.8)',
bordercolor='rgba(100,100,100,0.3)',
borderwidth=1,
itemclick="toggleothers",
itemdoubleclick="toggle"
),
height=600,
margin=dict(l=80, r=250, t=100, b=80),
paper_bgcolor='white',
font=dict(color='#2c3e50')
)
return fig
def create_capability_subplots(data_dict, title="Capability Performance", top_n=10):
"""
Create 2x2 subplot layout with one bar chart per capability, showing top N entries.
Optimized for responsive sizing with equal spacing across all subplots.
Args:
data_dict: Dictionary with structure {capability: {item_name: {accuracy: x, f1: y}}}
title: Overall chart title
top_n: Number of top entries to display per subplot (default 10)
Returns:
Plotly Figure with 2x2 subplots (showing only accuracy)
"""
if not data_dict:
fig = go.Figure()
fig.update_layout(title="No data available")
return fig
# Extract capabilities
capabilities = list(data_dict.keys())
# Create 2x2 subplot with optimized spacing for full window coverage
fig = make_subplots(
rows=2, cols=2,
subplot_titles=capabilities[:4],
vertical_spacing=0.15,
horizontal_spacing=0.12,
specs=[[{"secondary_y": False}, {"secondary_y": False}],
[{"secondary_y": False}, {"secondary_y": False}]]
)
# Position mapping for 2x2 grid
positions = [(1, 1), (1, 2), (2, 1), (2, 2)]
# Get all unique items across all capabilities for consistent coloring
all_items = set()
for capability_data in data_dict.values():
all_items.update(capability_data.keys())
all_items = sorted(list(all_items))
# Create a bar chart for each capability
for idx, capability in enumerate(capabilities[:4]):
row, col = positions[idx]
capability_data = data_dict[capability]
# Sort items by accuracy score for this capability and get top N
sorted_items = sorted(
capability_data.items(),
key=lambda x: x[1].get('accuracy', 0) if isinstance(x[1], dict) else x[1],
reverse=True
)[:top_n]
item_names = [item[0] for item in sorted_items]
item_scores = [
(item[1].get('accuracy', 0) if isinstance(item[1], dict) else item[1]) * 100
for item in sorted_items
]
# Assign colors based on global item index
colors = [COLORS[all_items.index(name) % len(COLORS)] for name in item_names]
fig.add_trace(
go.Bar(
x=item_names,
y=item_scores,
marker=dict(
color=colors,
line=dict(color='rgba(50, 50, 50, 0.5)', width=1)
),
showlegend=False,
hovertemplate='%{x}
Score: %{y:.2f}%',
width=0.7
),
row=row, col=col
)
# Update axes with consistent styling
fig.update_xaxes(
tickangle=-45,
tickfont=dict(size=9),
tickmode='linear',
row=row, col=col,
showgrid=False,
showline=True,
linewidth=1,
linecolor='rgba(200, 200, 200, 0.5)'
)
fig.update_yaxes(
range=[0, 100],
title_text="Performance (%)",
title_font=dict(size=12),
tickfont=dict(size=10),
gridcolor='rgba(200, 200, 200, 0.3)',
row=row, col=col,
showline=True,
linewidth=1,
linecolor='rgba(200, 200, 200, 0.5)'
)
# Update overall layout with fully responsive sizing
fig.update_layout(
title=dict(
text=title,
x=0.5,
xanchor='center',
font=dict(size=20, color='#2c3e50')
),
height=900,
autosize=True,
showlegend=False,
plot_bgcolor='rgba(245, 245, 245, 0.5)',
paper_bgcolor='white',
font=dict(color='#2c3e50', family="Arial, sans-serif"),
margin=dict(l=80, r=80, t=100, b=120),
hovermode='closest'
)
# Update subplot titles styling
for annotation in fig['layout']['annotations']:
annotation['font'] = dict(size=14, color='#2c3e50')
annotation['xanchor'] = 'center'
annotation['showarrow'] = False
return fig
def _rank_prefix(i):
medals = ["đĨ", "đĨ", "đĨ"]
return f"{medals[i]} {i+1}" if i < 3 else str(i + 1)
def _fmt(v):
return f"{v * 100:.2f}%"
def _build_rows_sorted(items, verified_dict, score_fn, type_name):
"""
Build rows for verified entries only (verified=True).
Unverified submissions are excluded from the leaderboard display.
"""
rows = []
for item in sorted(items):
if not verified_dict.get(item, False):
continue
row = score_fn(item, True, type_name)
rows.append(row)
rows.sort(key=lambda r: r["_sort"], reverse=True)
for i, r in enumerate(rows):
r["Rank"] = _rank_prefix(i)
return rows
def create_capability_table(capability_dict, domain_dict, verified_dict, type_name="Agent"):
"""
Summary table grouped by capability (A/B/C/D).
verified=True â ranked by official score
verified=False â appended unranked, scores marked with * (self-reported)
"""
items = set()
for d in domain_dict.values():
items.update(d.keys())
if not items:
return pd.DataFrame()
cap_cols = {
"Recall": "Recall (A)",
"Causal Inference": "Causal Inf. (B)",
"State Updating": "State Upd. (C)",
"State Abstraction": "State Abs. (D)",
}
cap_weights = {}
if QA_DISTRIBUTION:
pt = QA_DISTRIBUTION.get("overall_distribution", {}).get("problem_types", {})
letter_to_cap = {"A": "Recall", "B": "Causal Inference",
"C": "State Updating", "D": "State Abstraction"}
for letter, info in pt.items():
cap_weights[letter_to_cap.get(letter, "")] = info.get("ratio", 0.0)
def score_fn(item, is_verified, type_name):
model_family = ""
for cd in capability_dict.values():
if item in cd and isinstance(cd[item], dict):
model_family = cd[item].get("model_family", "")
if model_family:
break
cap_scores = {}
for cap_name in cap_cols:
d = capability_dict.get(cap_name, {}).get(item, {})
cap_scores[cap_name] = d.get("accuracy", 0.0) if isinstance(d, dict) else 0.0
w_sum = sum(cap_scores[c] * cap_weights.get(c, 0.0) for c in cap_cols)
w_tot = sum(cap_weights.get(c, 0.0) for c in cap_cols)
avg = w_sum / w_tot if w_tot > 0 else sum(cap_scores.values()) / len(cap_scores)
row = {
type_name: f"{item} {'â' if is_verified else 'â'}",
"Model Family": model_family,
"Avg Score": _fmt(avg),
"_sort": avg,
}
for cap_name, col_label in cap_cols.items():
row[f"{col_label}_score"] = _fmt(cap_scores[cap_name])
return row
rows = _build_rows_sorted(items, verified_dict, score_fn, type_name)
return pd.DataFrame([
{"Rank": r["Rank"], **{k: v for k, v in r.items() if k not in ("Rank", "_sort")}}
for r in rows
])
def create_domain_table(capability_dict, domain_dict, verified_dict, type_name="Agent"):
"""
Summary table grouped by domain.
verified=True â ranked by official score
verified=False â appended unranked, scores marked with * (self-reported)
"""
items = set()
for d in domain_dict.values():
items.update(d.keys())
if not items:
return pd.DataFrame()
domain_order = ["TEXT2SQL", "SOFTWARE", "WEB", "GAME", "EMBODIED_AI", "OPENWORLD_QA"]
domain_weights = {}
if QA_DISTRIBUTION:
for dom, info in QA_DISTRIBUTION.get("domain_distribution", {}).items():
domain_weights[dom] = info.get("qa_ratio", 0.0)
def score_fn(item, is_verified, type_name):
model_family = ""
for cd in capability_dict.values():
if item in cd and isinstance(cd[item], dict):
model_family = cd[item].get("model_family", "")
if model_family:
break
dom_scores = {}
for dom in domain_order:
d = domain_dict.get(dom, {}).get(item, {})
dom_scores[dom] = d.get("accuracy", 0.0) if isinstance(d, dict) else 0.0
w_sum = sum(dom_scores[d] * domain_weights.get(d, 0.0) for d in domain_order)
w_tot = sum(domain_weights.get(d, 0.0) for d in domain_order)
avg = w_sum / w_tot if w_tot > 0 else sum(dom_scores.values()) / len(dom_scores)
row = {
type_name: f"{item} {'â' if is_verified else 'â'}",
"Model Family": model_family,
"Avg Score": _fmt(avg),
"_sort": avg,
}
for dom in domain_order:
row[f"{dom}_score"] = _fmt(dom_scores[dom])
return row
rows = _build_rows_sorted(items, verified_dict, score_fn, type_name)
return pd.DataFrame([{"Rank": r["Rank"], **{k: v for k, v in r.items() if k != "Rank" and k != "_sort"}}
for r in rows])
def create_summary_table(capability_dict, domain_dict, verified_dict, type_name="Agent"):
"""
Create summary table showing rank, average accuracy and F1 scores.
Uses ratio_overall from qa_distribution.json for weighting.
Args:
capability_dict: Dictionary with capability scores
domain_dict: Dictionary with domain scores
verified_dict: Dictionary mapping item names to verified status
type_name: "Agent" or "Model"
Returns:
pandas DataFrame with rank, verified status, accuracy and F1 columns
"""
if not capability_dict and not domain_dict:
return pd.DataFrame()
# Calculate average scores for each item using ratio_overall
items = set()
for category_data in domain_dict.values():
items.update(category_data.keys())
rows = []
for item in sorted(items):
weighted_accuracy_sum = 0.0
weighted_f1_sum = 0.0
total_weight = 0.0
model_family = ""
# Get model family from capability dict
for cap_data in capability_dict.values():
if item in cap_data:
item_data = cap_data[item]
if isinstance(item_data, dict) and not model_family:
model_family = item_data.get('model_family', '')
break
# Calculate scores by capability
capability_scores = {}
for capability, cap_data in capability_dict.items():
if item in cap_data:
item_data = cap_data[item]
if isinstance(item_data, dict):
capability_scores[capability] = item_data.get('accuracy', 0)
# Calculate weighted average using ratio from overall problem type distribution
if QA_DISTRIBUTION:
problem_types = QA_DISTRIBUTION.get("overall_distribution", {}).get("problem_types", {})
capability_to_letter = {
"Recall": "A",
"Causal Inference": "B",
"State Updating": "C",
"State Abstraction": "D"
}
for capability, score in capability_scores.items():
letter = capability_to_letter.get(capability)
if letter and letter in problem_types:
weight = problem_types[letter].get("ratio", 0)
weighted_accuracy_sum += score * weight
weighted_f1_sum += score * weight # Using same for f1
total_weight += weight
else:
# Fallback: equal weights
for score in capability_scores.values():
weighted_accuracy_sum += score
weighted_f1_sum += score
total_weight += 1
avg_accuracy = (weighted_accuracy_sum / total_weight) if total_weight > 0 else 0
avg_f1 = (weighted_f1_sum / total_weight) if total_weight > 0 else 0
# Get verified status and add icon to name
is_verified = verified_dict.get(item, False)
verified_icon = " â" if is_verified else " â"
display_name = f"{item}{verified_icon}"
rows.append({
type_name: display_name,
"Model Family": model_family,
"Avg Accuracy": avg_accuracy,
"Avg F1": avg_f1,
"_acc_sort": avg_accuracy,
"_verified": is_verified
})
df = pd.DataFrame(rows)
df = df.sort_values(by="_acc_sort", ascending=False).reset_index(drop=True)
# Add rank column with medals for top 3
medals = ["đĨ", "đĨ", "đĨ"]
ranks = []
for i in range(len(df)):
if i < 3:
ranks.append(f"{medals[i]} {i+1}")
else:
ranks.append(str(i+1))
df.insert(0, "Rank", ranks)
# Format accuracy and F1 as percentages
df["Avg Accuracy"] = df["Avg Accuracy"].apply(lambda x: f"{x * 100:.2f}%")
df["Avg F1"] = df["Avg F1"].apply(lambda x: f"{x * 100:.2f}%")
# Drop sorting columns
df = df.drop(columns=["_acc_sort", "_verified"])
return df
# ---------------------------------------------------------------------------
# Build Gradio interface
# ---------------------------------------------------------------------------
def build_app():
"""Build the Gradio application."""
CSS = """
.markdown-text {
font-size: 16px !important;
}
.intro-box {
background: linear-gradient(135deg, rgba(26, 188, 156, 0.1) 0%, rgba(52, 152, 219, 0.1) 100%);
padding: 25px;
border-radius: 10px;
margin: 20px 0;
border-left: 4px solid #1abc9c;
}
"""
# Keep Model Domain view strictly model-only
model_items = set()
for capability_data in MODEL_CAPABILITY.values():
model_items.update(capability_data.keys())
model_domain_filtered = filter_data_by_items(MODEL_DOMAIN, model_items)
if not any(len(category_data) > 0 for category_data in model_domain_filtered.values()):
model_domain_filtered = {}
import base64, pathlib
_logo_path = pathlib.Path("assets/ama_logo.jpg")
if _logo_path.exists():
_logo_b64 = base64.b64encode(_logo_path.read_bytes()).decode()
_logo_tag = (
'
'
)
else:
_logo_tag = "đ¤ "
with gr.Blocks(title="AMA-Bench Leaderboard", theme=gr.themes.Soft()) as demo:
# Header
gr.HTML(
"""
"""
+ _logo_tag
+ """
AMA-Bench: Leaderboard
Agent Memory Assessment Benchmark - Performance Visualization
"""
)
# Links bar
gr.HTML("""
""")
# Welcome Banner
gr.HTML("""
đ¯ Welcome to AMA-Bench!
Evaluate agent memory itself, not just dialogue.
Built from real agent environment streams and scalable long-horizon trajectories across
representative domains, AMA-Bench tests whether LLM agents can recall,
perform causal inference, update state, and
abstract state information over long runs.
đ Paper: https://arxiv.org/abs/2602.22769
""")
with gr.Tabs():
# ============================================================
# Tab 1: Agent Performance
# ============================================================
with gr.Tab("đ¤ Agent Performance"):
gr.Markdown("""
### Agent Performance Analysis
Explore agent performance across different domains and capabilities.
""")
with gr.Tabs():
# Domain Sub-tab (Radar Chart)
with gr.Tab("đ¯ Domain Performance"):
gr.Markdown("""
**Radar chart** showing agent performance across different domains.
Click legend items to isolate specific agents.
""")
with gr.Row():
agent_domain_top_n = gr.Slider(
minimum=1,
maximum=10,
value=8,
step=1,
label="Show Top N Agents",
info="Select how many top agents to display (1-10)"
)
agent_domain_chart = gr.Plot(
value=create_radar_chart_from_dict(
AGENT_DOMAIN,
"Agent Performance Across Domains",
top_n=8
)
)
with gr.Accordion("đ Summary Statistics", open=True):
gr.Markdown("""
**Verification Status:** Only officially verified entries (â) are shown. User-submitted results (â) will appear after weekly LLM-as-Judge evaluation.
""")
agent_domain_table = gr.Dataframe(
value=create_domain_table(AGENT_CAPABILITY, AGENT_DOMAIN, AGENT_VERIFIED, "Agent"),
label="Scores by Domain"
)
# Update chart when slider changes
agent_domain_top_n.change(
fn=lambda n: create_radar_chart_from_dict(
AGENT_DOMAIN,
"Agent Performance Across Domains",
top_n=int(n)
),
inputs=[agent_domain_top_n],
outputs=[agent_domain_chart]
)
# Capability Sub-tab (Bar Chart)
with gr.Tab("⥠Capability Performance"):
gr.Markdown("""
Showing agent performance for each capability.
Each subplot represents one capability with comparative performance across all agents.
""")
with gr.Row():
agent_capability_top_n = gr.Slider(
minimum=1,
maximum=10,
value=8,
step=1,
label="Show Top N Agents",
info="Select how many top agents to display per capability (1-10)"
)
agent_capability_chart = gr.Plot(
value=create_capability_subplots(
AGENT_CAPABILITY,
"Agent Performance by Capability",
top_n=8
)
)
with gr.Accordion("đ Summary Statistics", open=True):
gr.Markdown("""
**Verification Status:** Only officially verified entries (â) are shown. User-submitted results (â) will appear after weekly LLM-as-Judge evaluation.
""")
agent_capability_table = gr.Dataframe(
value=create_capability_table(AGENT_CAPABILITY, AGENT_DOMAIN, AGENT_VERIFIED, "Agent"),
label="Scores by Capability"
)
# Update chart when slider changes
agent_capability_top_n.change(
fn=lambda n: create_capability_subplots(
AGENT_CAPABILITY,
"Agent Performance by Capability",
top_n=int(n)
),
inputs=[agent_capability_top_n],
outputs=[agent_capability_chart]
)
# ============================================================
# Tab 2: Model Performance
# ============================================================
with gr.Tab("đŦ Model Performance"):
gr.Markdown("""
### Model Performance Analysis
Explore model performance across different domains and capabilities.
""")
with gr.Tabs():
# Domain Sub-tab (Radar Chart)
with gr.Tab("đ¯ Domain Performance"):
gr.Markdown("""
**Radar chart** showing model performance across different domains.
Click legend items to isolate specific models.
""")
with gr.Row():
model_domain_top_n = gr.Slider(
minimum=1,
maximum=10,
value=8,
step=1,
label="Show Top N Models",
info="Select how many top models to display (1-10)"
)
model_domain_chart = gr.Plot(
value=create_radar_chart_from_dict(
model_domain_filtered,
"Model Performance Across Domains",
top_n=8
)
)
with gr.Accordion("đ Summary Statistics", open=True):
gr.Markdown("""
**Verification Status:** Only officially verified entries (â) are shown. User-submitted results (â) will appear after weekly LLM-as-Judge evaluation.
""")
model_domain_table = gr.Dataframe(
value=create_domain_table(MODEL_CAPABILITY, model_domain_filtered, MODEL_VERIFIED, "Model"),
label="Scores by Domain"
)
# Update chart when slider changes
model_domain_top_n.change(
fn=lambda n: create_radar_chart_from_dict(
model_domain_filtered,
"Model Performance Across Domains",
top_n=int(n)
),
inputs=[model_domain_top_n],
outputs=[model_domain_chart]
)
# Capability Sub-tab (Bar Chart)
with gr.Tab("⥠Capability Performance"):
gr.Markdown("""
Show model performance for each capability.
Each subplot represents one capability with comparative performance across all models.
""")
with gr.Row():
model_capability_top_n = gr.Slider(
minimum=1,
maximum=10,
value=8,
step=1,
label="Show Top N Models",
info="Select how many top models to display per capability (1-10)"
)
model_capability_chart = gr.Plot(
value=create_capability_subplots(
MODEL_CAPABILITY,
"Model Performance by Capability",
top_n=8
)
)
with gr.Accordion("đ Summary Statistics", open=True):
gr.Markdown("""
**Verification Status:** Only officially verified entries (â) are shown. User-submitted results (â) will appear after weekly LLM-as-Judge evaluation.
""")
model_capability_table = gr.Dataframe(
value=create_capability_table(MODEL_CAPABILITY, MODEL_DOMAIN, MODEL_VERIFIED, "Model"),
label="Scores by Capability"
)
# Update chart when slider changes
model_capability_top_n.change(
fn=lambda n: create_capability_subplots(
MODEL_CAPABILITY,
"Model Performance by Capability",
top_n=int(n)
),
inputs=[model_capability_top_n],
outputs=[model_capability_chart]
)
# ============================================================
# Tab 3: Submit
# ============================================================
with gr.Tab("đ¤ Submit"):
gr.Markdown("""
### Submit Your Model/Agent for Evaluation
Submit your model or agent predictions to be evaluated on AMA-Bench.
Your results will be reviewed and scored weekly by our LLM-as-Judge system.
**â° Submission Policy:**
- Each user can submit **once per week**
- Submissions are evaluated **weekly** using our LLM-as-Judge system
- Official scores (`verified=true`) are computed by our evaluation system
- You can also run your own evaluation if you have access to the groundtruth data
""")
with gr.Row():
with gr.Column():
model_name_textbox = gr.Textbox(
label="Model/Agent Name",
placeholder="e.g., GPT-4 or MyAgent-v2"
)
submission_type = gr.Radio(
choices=["Model", "Agent"],
label="Submission Type",
value="Model"
)
url_textbox = gr.Textbox(
label="URL to Model/Agent Information",
placeholder="https://..."
)
with gr.Column():
organisation = gr.Textbox(
label="Organisation",
placeholder="e.g., OpenAI, Anthropic"
)
model_family_textbox = gr.Textbox(
label="Model Family",
placeholder="e.g., GPT-4, Claude-3, Qwen3-32B"
)
mail = gr.Textbox(
label="Contact Email",
placeholder="your.email@example.com"
)
file_upload = gr.File(
label="Submission File (JSONL format)",
file_types=[".jsonl"]
)
gr.Markdown("""
**đ Submission Format:**
Your JSONL file should contain one line per episode:
```json
{
"episode_id": "trajectory_id",
"question_uuid_list": ["uuid-1", "uuid-2", "uuid-3"],
"answer_list": ["The agent moved right.", "..."],
"llm_as_judge_score_list": [true, false, true]
}
```
**Field Descriptions:**
- `episode_id` *(required)*: The episode identifier â used to automatically look up the domain
- `question_uuid_list` *(required)*: UUIDs of the benchmark questions in the same order as `answer_list` â used to look up each question's capability (A/B/C/D).
- `answer_list` *(required)*: Your model/agent's answers, one per question
- `llm_as_judge_score_list` *(required)*: `true`/`false` per answer â your self-evaluated correctness scores used for leaderboard ranking.
**Important Notes:**
- `question_uuid_list`, `answer_list`, and `llm_as_judge_score_list` must all be the same length
- Domain is resolved automatically from `episode_id`; capability (A/B/C/D) is resolved from `question_uuid_list` â no need to supply them manually
- All submissions start as `verified=false` and become `verified=true` after official LLM-as-Judge evaluation
""")
with gr.Row():
submit_button = gr.Button("Submit", variant="primary", size="lg")
submission_result = gr.HTML()
submit_button.click(
fn=lambda: gr.update(interactive=False, value="âŗ Submitting..."),
inputs=[],
outputs=[submit_button],
).then(
fn=add_new_submission,
inputs=[
model_name_textbox,
submission_type,
url_textbox,
file_upload,
organisation,
mail,
model_family_textbox,
],
outputs=[submission_result],
).then(
fn=lambda: gr.update(interactive=True, value="Submit"),
inputs=[],
outputs=[submit_button],
)
# ============================================================
# Tab 4: About
# ============================================================
with gr.Tab("âšī¸ About"):
gr.Markdown("""
## AMA-Bench: Agent Memory Assessment Benchmark
AMA-Bench evaluates memory capabilities of LLMs and memory-augmented agents across four cognitive dimensions:
**Recall** (retrieving stored info), **Causal Inference** (cause-and-effect reasoning),
**State Updating** (tracking evolving states), and **State Abstraction** (forming higher-level representations).
### Benchmarks
We evaluate on two complementary subsets:
1. **Real-world Subset:** 2,496 QA pairs from real agent environment streams
2. **Synthetic Subset:** 1,200 QA pairs stratified across five trajectory lengths (8K, 16K, 32K, 64K, and 128K tokens)
### Leaderboard Tabs
- **Agent Performance**: Compares RAG and Agent Memory methods
- Domain Performance: Radar charts across 6 domains (GAME, Embodied AI, Web, Text2SQL, Openworld QA, Software Engineer)
- Capability Performance: showing performance on 4 capabilities
- **Top N Selection**: Choose to display top 1-10 performers
- **Model Performance**: Compares LLM models directly
- Domain Performance: Radar charts showing performance across different application domains
- Capability Performance: showing performance on each cognitive capability
- **Top N Selection**: Choose to display top 1-10 performers
### Metrics
Results are reported as **Accuracy** and **F1 Score**:
- Charts display **Accuracy** only for clarity
- Summary statistics tables show both **Avg Accuracy** and **Avg F1**
- Tables include **Rank** with đĨđĨđĨ medals for top 3 performers
### Problem Type Distribution
- **Type A (Recall)**: 33.6% - 839 questions
- **Type B (Causal Inference)**: 23.9% - 596 questions
- **Type C (State Updating)**: 25.9% - 647 questions
- **Type D (State Abstraction)**: 16.6% - 414 questions
### Submission Rules
**đ File Format**
- Submissions must be in **JSONL format** (`.jsonl`), one line per episode
- Each line must be a valid JSON object containing the required fields below
- `question_uuid_list`, `answer_list`, and `llm_as_judge_score_list` must all be the **same length**
- Files containing duplicate `episode_id` entries will be rejected
**đ Required Fields**
| Field | Type | Description |
|---|---|---|
| `episode_id` | string | Episode identifier, used to automatically resolve domain |
| `question_uuid_list` | list[string] | UUIDs mapping each answer to a benchmark question, used to resolve capability (A/B/C/D) |
| `answer_list` | list[string] | Your model/agent's free-text answers, in the same order as `question_uuid_list` |
| `llm_as_judge_score_list` | list[bool] | Self-evaluated correctness (`true`/`false`) per answer |
**â
Verification & Scoring**
- All submissions initially appear as `verified=false` (self-reported preview)
- The score shown immediately after submission is based on your `llm_as_judge_score_list`
- Official scores (`verified=true`) are recomputed weekly by our **LLM-as-Judge** evaluation system
- Only `verified=true` entries are displayed on the public leaderboard
**â ī¸ Important Notes**
- Domain is resolved automatically from `episode_id` â no need to supply it manually
- Capability (A/B/C/D) is resolved automatically from each `question_uuid` â no need to supply it manually
- Official scores may differ from your self-reported preview after LLM-as-Judge re-evaluation
- We reserve the right to remove submissions that appear to contain fabricated or manipulated scores
---
**Paper:** [https://arxiv.org/abs/2602.22769](https://arxiv.org/abs/2602.22769)
*For questions or submissions, please open a discussion in the Community tab.*
""")
return demo
if __name__ == "__main__":
demo_app = build_app()
demo_app.launch(debug=True, show_error=True)