AMA-bench-Leaderboard / submission.py
uuuhjb's picture
update push_to_hf
2581d07
"""
Submission handling module for AMA-Bench Leaderboard
Submission format:
{
"episode_id": str,
"question_uuid_list": list[str], # required - UUIDs that map answers to groundtruth
"answer_list": list[str], # required - same length as question_uuid_list
"llm_as_judge_score_list": list[bool] # optional - same length as answer_list
}
Scoring logic:
- Uses llm_as_judge_score_list (true/false) from the submission
- Maps each question to its domain and capability (A/B/C/D) via groundtruth metadata
- Computes per-domain, per-capability accuracy
- Writes entry to data/agent.jsonl or data/model.jsonl (verified=False by default)
"""
import json
import os
import datetime
from email.utils import parseaddr
from collections import defaultdict
from typing import Dict, List, Tuple, Optional
try:
from content import format_error, format_warning, format_log
except ImportError:
def format_error(msg): return f"❌ **Error:** {msg}"
def format_warning(msg): return f"⚠️ **Warning:** {msg}"
def format_log(msg): return f"✅ {msg}"
# ---------------------------------------------------------------------------
# Validation
# ---------------------------------------------------------------------------
def validate_submission_format(data: dict) -> Tuple[bool, str]:
"""
Validate a single submission record.
Required fields: episode_id, question_uuid_list, answer_list
Optional fields: llm_as_judge_score_list, reasoning_trace
"""
if not isinstance(data, dict):
return False, "Submission must be a JSON object"
# episode_id
if "episode_id" not in data:
return False, "Missing required field: episode_id"
if not isinstance(data["episode_id"], str) or not data["episode_id"].strip():
return False, "episode_id must be a non-empty string"
# answer_list
if "answer_list" not in data:
return False, "Missing required field: answer_list"
if not isinstance(data["answer_list"], list) or not data["answer_list"]:
return False, "answer_list must be a non-empty list"
# question_uuid_list
if "question_uuid_list" not in data:
return False, "Missing required field: question_uuid_list"
if not isinstance(data["question_uuid_list"], list):
return False, "question_uuid_list must be a list"
if len(data["question_uuid_list"]) != len(data["answer_list"]):
return False, (
f"question_uuid_list length ({len(data['question_uuid_list'])}) must match "
f"answer_list length ({len(data['answer_list'])})"
)
for i, q in enumerate(data["question_uuid_list"]):
if not isinstance(q, str) or not q.strip():
return False, f"question_uuid_list[{i}] must be a non-empty string"
# llm_as_judge_score_list (required)
if "llm_as_judge_score_list" not in data:
return False, "Missing required field: llm_as_judge_score_list"
score_list = data["llm_as_judge_score_list"]
if not isinstance(score_list, list):
return False, "llm_as_judge_score_list must be a list"
if len(score_list) != len(data["answer_list"]):
return False, (
f"llm_as_judge_score_list length ({len(score_list)}) must match "
f"answer_list length ({len(data['answer_list'])})"
)
for i, score in enumerate(score_list):
if not isinstance(score, bool):
return False, f"llm_as_judge_score_list[{i}] must be true or false (boolean)"
return True, ""
def validate_submission_file(file_path: str) -> Tuple[bool, str, List[dict]]:
"""Validate and load a JSONL submission file."""
if not os.path.exists(file_path):
return False, "File not found", []
if not file_path.endswith(".jsonl"):
return False, "File must be in JSONL format (.jsonl)", []
submissions = []
seen_ids = set()
try:
with open(file_path, "r", encoding="utf-8") as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
data = json.loads(line)
except json.JSONDecodeError as e:
return False, f"JSON parse error on line {line_num}: {e}", []
is_valid, error_msg = validate_submission_format(data)
if not is_valid:
return False, f"Validation error on line {line_num}: {error_msg}", []
episode_id = data["episode_id"]
if episode_id in seen_ids:
return False, f"Duplicate episode_id '{episode_id}' on line {line_num}", []
seen_ids.add(episode_id)
submissions.append(data)
if not submissions:
return False, "File is empty or contains no valid submissions", []
return True, "", submissions
except Exception as e:
return False, f"Error reading file: {e}", []
# ---------------------------------------------------------------------------
# Groundtruth loading
# ---------------------------------------------------------------------------
def load_groundtruth_metadata(dataset_name: str = "AMA-bench/AMA-bench",
token: str = None) -> Dict[str, dict]:
"""
Load groundtruth metadata. Returns a dict with two sub-dicts:
{
"episode_domain": {
"episode_id": "GAME" | "TEXT2SQL" | ...
},
"question_cap": {
"question_uuid": "A" | "B" | "C" | "D"
}
}
- episode_domain: episode_id -> domain (used even when question uuid doesn't match)
- question_cap: question_uuid -> capability letter
"""
episode_domain: Dict[str, str] = {}
question_cap: Dict[str, str] = {}
def _index_rows(rows):
for row in rows:
episode_id = str(row.get("episode_id", ""))
domain = row.get("domain", "UNKNOWN").upper()
episode_domain[episode_id] = domain
for qa in row.get("qa_pairs", []):
question_uuid = qa.get("question_uuid", "").strip()
if not question_uuid:
continue
cap_letter = _normalize_cap(qa.get("type", "A"))
question_cap[question_uuid] = cap_letter
# --- Try HuggingFace ---
try:
from datasets import load_dataset, VerificationMode
dataset = load_dataset(
dataset_name, split="test", token=token,
verification_mode=VerificationMode.NO_CHECKS,
)
_index_rows(dataset)
print(f"[groundtruth] Loaded {len(episode_domain)} episodes, "
f"{len(question_cap)} Q&A entries from HuggingFace (indexed by question_uuid).")
return {"episode_domain": episode_domain, "question_cap": question_cap}
except Exception as hf_err:
print(f"[groundtruth] HuggingFace failed ({hf_err}), trying local fallback…")
# --- Local fallback ---
for local_path in ["test/open_end_qa_set.jsonl", "data/open_end_qa_set.jsonl"]:
if not os.path.exists(local_path):
continue
try:
rows = []
with open(local_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
rows.append(json.loads(line))
_index_rows(rows)
print(f"[groundtruth] Loaded {len(episode_domain)} episodes, "
f"{len(question_cap)} Q&A entries from {local_path} (indexed by question_uuid).")
return {"episode_domain": episode_domain, "question_cap": question_cap}
except Exception as e:
print(f"[groundtruth] Error reading {local_path}: {e}")
print("[groundtruth] WARNING: No groundtruth metadata available.")
return {"episode_domain": {}, "question_cap": {}}
def _normalize_cap(cap: str) -> str:
"""Normalize capability label to single letter A/B/C/D."""
mapping = {
"A": "A", "Recall": "A",
"B": "B", "Causal Inference": "B", "Causal": "B",
"C": "C", "State Updating": "C", "State": "C",
"D": "D", "State Abstraction": "D", "Abstraction": "D",
}
return mapping.get(cap.strip(), "A")
# ---------------------------------------------------------------------------
# Scoring
# ---------------------------------------------------------------------------
VALID_DOMAINS = {"TEXT2SQL", "SOFTWARE", "WEB", "GAME", "EMBODIED_AI", "OPENWORLD_QA"}
VALID_CAPS = ["A", "B", "C", "D"]
def compute_scores_from_submissions(
submissions: List[dict],
groundtruth_meta: Dict[str, dict],
) -> Dict:
"""
Compute per-domain, per-capability accuracy using llm_as_judge_score_list.
Each question is matched to groundtruth via question_uuid.
Score structure matches agent.jsonl / model.jsonl:
{
"TEXT2SQL": [{"A": 0.xx}, {"B": 0.xx}, {"C": 0.xx}, {"D": 0.xx}],
...
}
"""
# domain -> capability -> [scores]
domain_cap_scores: Dict[str, Dict[str, List[float]]] = defaultdict(
lambda: defaultdict(list)
)
scored_questions = 0
skipped_episodes = 0 # no judge scores
unmatched_questions = 0 # question cap not found (domain still resolved via episode)
# Unpack the two indexes from groundtruth metadata
episode_domain: Dict[str, str] = groundtruth_meta.get("episode_domain", {})
question_cap: Dict[str, str] = groundtruth_meta.get("question_cap", {})
for sub in submissions:
episode_id = str(sub["episode_id"])
question_uuid_list = sub["question_uuid_list"]
judge_scores = sub.get("llm_as_judge_score_list")
# Resolve domain via episode_id
domain = episode_domain.get(episode_id, "UNKNOWN").upper()
for i, question_uuid in enumerate(question_uuid_list):
if i >= len(judge_scores):
break
# Resolve capability via question_uuid
cap = question_cap.get(question_uuid.strip())
if cap is None:
unmatched_questions += 1
continue
score = 1.0 if judge_scores[i] is True else 0.0
domain_cap_scores[domain][cap].append(score)
scored_questions += 1
# Build Score dict — always include all 6 known domains
score_dict: Dict[str, List[dict]] = {}
for domain in sorted(VALID_DOMAINS | set(domain_cap_scores.keys())):
cap_data = domain_cap_scores.get(domain, {})
score_dict[domain] = [
{cap: round(sum(cap_data[cap]) / len(cap_data[cap]), 4)
if cap_data.get(cap) else 0.0}
for cap in VALID_CAPS
]
# Coverage warning
coverage_warning = None
parts = []
if skipped_episodes:
parts.append(f"{skipped_episodes} episode(s) had no llm_as_judge_score_list")
if unmatched_questions:
parts.append(f"{unmatched_questions} question(s) not matched in groundtruth")
if parts:
coverage_warning = "; ".join(parts)
return {
"Score": score_dict,
"scored_questions": scored_questions,
"skipped_episodes": skipped_episodes,
"unmatched_questions": unmatched_questions,
"coverage_warning": coverage_warning,
}
# ---------------------------------------------------------------------------
# Leaderboard update
# ---------------------------------------------------------------------------
def update_leaderboard_data(
model_or_agent_name: str,
model_family: str,
submission_type: str,
organisation: str,
score_dict: Dict,
verified: bool = False,
) -> bool:
"""Append a scored entry to data/agent.jsonl or data/model.jsonl."""
try:
os.makedirs("data", exist_ok=True)
data_file = "data/agent.jsonl" if submission_type == "agent" else "data/model.jsonl"
name_key = "agent_name" if submission_type == "agent" else "model"
entry = {
name_key: model_or_agent_name,
"model_family": model_family,
"Date": datetime.datetime.today().strftime("%Y-%m-%d"),
"verified": verified,
"Score": score_dict,
}
with open(data_file, "a", encoding="utf-8") as f:
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
print(f"[leaderboard] Appended to {data_file}: {model_or_agent_name}")
return True
except Exception as e:
print(f"[leaderboard] Error: {e}")
import traceback; traceback.print_exc()
return False
# ---------------------------------------------------------------------------
# HuggingFace submission push
# ---------------------------------------------------------------------------
HF_SUBMISSIONS_DATASET = "AMA-bench/AMA_submissions_internal"
def push_submission_to_hf(
submissions: List[dict],
metadata: dict,
score_dict: Dict,
token: str,
timestamp: str,
) -> Tuple[bool, str]:
"""
Push raw submission + metadata + scores to the private HuggingFace dataset
``AMA-bench/AMA_submissions_internal``.
The dataset is expected (or will be created) with a single ``data`` config.
Each call appends one row per episode, using a Parquet shard named by
``{organisation}_{model}_{timestamp}``.
Row schema
----------
submission_id : str – "{organisation}_{model}_{timestamp}"
organisation : str
model_name : str
submission_type : str – "agent" | "model"
timestamp : str – "YYYYMMDD_HHMMSS"
date : str – "YYYY-MM-DD"
episode_id : str
question_uuid_list : str – JSON-encoded list
answer_list : str – JSON-encoded list
llm_as_judge_score_list : str – JSON-encoded list
reasoning_trace : str – optional, empty string if absent
score_json : str – JSON-encoded per-domain score dict
metadata_json : str – JSON-encoded full metadata dict
"""
try:
from huggingface_hub import HfApi
import pandas as pd
import io
api = HfApi(token=token)
organisation = metadata.get("organisation", "unknown")
model_name = metadata.get("model", metadata.get("agent_name", "unknown"))
submission_id = f"{organisation}_{model_name}_{timestamp}"
# Build one row per episode submission
rows = []
for sub in submissions:
rows.append({
"submission_id": submission_id,
"organisation": organisation,
"model_name": model_name,
"submission_type": metadata.get("submission_type", ""),
"timestamp": timestamp,
"date": metadata.get("Date", ""),
"episode_id": str(sub.get("episode_id", "")),
"question_uuid_list": json.dumps(sub.get("question_uuid_list", []), ensure_ascii=False),
"answer_list": json.dumps(sub.get("answer_list", []), ensure_ascii=False),
"llm_as_judge_score_list": json.dumps(sub.get("llm_as_judge_score_list", []), ensure_ascii=False),
"reasoning_trace": str(sub.get("reasoning_trace", "")),
"score_json": json.dumps(score_dict, ensure_ascii=False),
"metadata_json": json.dumps(metadata, ensure_ascii=False),
})
df = pd.DataFrame(rows)
# Serialise to Parquet in memory
buf = io.BytesIO()
df.to_parquet(buf, index=False)
buf.seek(0)
# Upload as a new shard under data/
path_in_repo = f"data/{submission_id}.parquet"
api.upload_file(
path_or_fileobj=buf,
path_in_repo=path_in_repo,
repo_id=HF_SUBMISSIONS_DATASET,
repo_type="dataset",
commit_message=f"Add submission: {submission_id}",
)
print(f"[hf_push] Pushed {len(rows)} row(s) to {HF_SUBMISSIONS_DATASET}/{path_in_repo}")
return True, submission_id
except Exception as e:
import traceback
traceback.print_exc()
print(f"[hf_push] ERROR: {e}")
return False, str(e)
# ---------------------------------------------------------------------------
# Main entry point
# ---------------------------------------------------------------------------
def add_new_submission(
model: str,
submission_type: str,
url: str,
file,
organisation: str,
mail: str,
model_family: str = "",
) -> str:
"""Validate, score, and record a new submission."""
try:
if file is None:
return format_warning("Please attach a submission file.")
_, parsed_mail = parseaddr(mail)
if "@" not in parsed_mail:
return format_warning("Please provide a valid email address.")
if not model or not submission_type or not organisation:
return format_warning("Please fill in all required fields.")
print(f"[submission] Processing {organisation}/{model} ({submission_type})")
is_valid, error_msg, submissions = validate_submission_file(file.name)
if not is_valid:
return format_error(error_msg)
print(f"[submission] Validated {len(submissions)} episode submissions")
groundtruth_meta = load_groundtruth_metadata(token=os.environ.get("HF_TOKEN") or os.environ.get("TOKEN"))
score_result = compute_scores_from_submissions(submissions, groundtruth_meta)
score_dict = score_result["Score"]
# Save raw submission
submission_dir = f"submissions/{organisation}_{model}"
os.makedirs(submission_dir, exist_ok=True)
timestamp = datetime.datetime.today().strftime("%Y%m%d_%H%M%S")
saved_file = f"{submission_dir}/submission_{timestamp}.jsonl"
with open(saved_file, "w", encoding="utf-8") as f_out:
for sub in submissions:
f_out.write(json.dumps(sub, ensure_ascii=False) + "\n")
# Save metadata
metadata = {
"model" if submission_type.lower() == "model" else "agent_name": model,
"model_family": model_family,
"submission_type": submission_type.lower(),
"organisation": organisation,
"url": url,
"mail": parsed_mail,
"Date": datetime.datetime.today().strftime("%Y-%m-%d"),
"timestamp": timestamp,
"verified": False,
"submission_count": len(submissions),
"scored_questions": score_result["scored_questions"],
"skipped_episodes": score_result["skipped_episodes"],
"unmatched_questions": score_result["unmatched_questions"],
"file_path": saved_file,
}
with open(f"{submission_dir}/metadata_{timestamp}.json", "w", encoding="utf-8") as f_meta:
json.dump(metadata, f_meta, indent=2, ensure_ascii=False)
# Push to HuggingFace private submissions dataset
hf_token = os.environ.get("HF_TOKEN") or os.environ.get("TOKEN")
if hf_token:
hf_ok, hf_result = push_submission_to_hf(
submissions=submissions,
metadata=metadata,
score_dict=score_dict,
token=hf_token,
timestamp=timestamp,
)
if not hf_ok:
print(f"[hf_push] WARNING: Push to HuggingFace failed: {hf_result}")
# Non-fatal — we continue even if HF push fails
else:
print("[hf_push] WARNING: No HF_TOKEN found, skipping HuggingFace push.")
# Update leaderboard
updated = update_leaderboard_data(
model_or_agent_name=model,
model_family=model_family,
submission_type=submission_type.lower(),
organisation=organisation,
score_dict=score_dict,
verified=False,
)
if not updated:
return format_error("Submission validated but failed to update leaderboard data.")
type_label = "Agent" if submission_type.lower() == "agent" else "Model"
# Compute per-domain averages and overall avg
domain_order = ["TEXT2SQL", "SOFTWARE", "WEB", "GAME", "EMBODIED_AI", "OPENWORLD_QA"]
domain_avgs = {}
for dom in domain_order:
caps = score_dict.get(dom, [])
vals = [list(c.values())[0] for c in caps if c]
domain_avgs[dom] = sum(vals) / len(vals) if vals else 0.0
overall_avg = sum(domain_avgs.values()) / len(domain_avgs) if domain_avgs else 0.0
# Build domain rows — all colors explicit to override Gradio dark theme
TD = 'style="padding:8px 12px;text-align:center;color:#2c3e50;background:#ffffff;"'
TD_NAME = 'style="padding:8px 12px;font-weight:600;color:#1a1a2e;background:#ffffff;"'
TD_AVG = 'style="padding:8px 12px;text-align:center;font-weight:700;color:#0e9e7a;background:#ffffff;"'
dom_rows_html = ""
for i, dom in enumerate(domain_order):
caps = score_dict.get(dom, [])
cap_cells = "".join(
f'<td {TD}>{list(c.values())[0]*100:.1f}%</td>'
for c in caps
)
avg = domain_avgs.get(dom, 0.0)
row_bg = "#f9fbff" if i % 2 == 0 else "#ffffff"
dom_rows_html += (
f'<tr style="border-bottom:1px solid #e8eef3;">'
f'<td style="padding:8px 12px;font-weight:600;color:#1a1a2e;background:{row_bg};">{dom}</td>'
f'<td style="padding:8px 12px;text-align:center;font-weight:700;color:#0e9e7a;background:{row_bg};">{avg*100:.2f}%</td>'
+ "".join(
f'<td style="padding:8px 12px;text-align:center;color:#2c3e50;background:{row_bg};">{list(c.values())[0]*100:.1f}%</td>'
for c in caps
)
+ '</tr>'
)
warning_html = (
'<div style="margin-top:12px;padding:10px 14px;background:#fff8e1;'
'border-left:4px solid #f0ad4e;border-radius:6px;font-size:13px;color:#7d5a00;">'
f'&#x26A0;&#xFE0F; {score_result["coverage_warning"]}</div>'
if score_result["coverage_warning"] else ""
)
result_html = (
'<div style="border:1px solid #c8e6c9;border-radius:12px;overflow:hidden;'
'margin-top:16px;font-family:-apple-system,BlinkMacSystemFont,sans-serif;'
'background:#ffffff;color:#1a1a2e;">'
# Header
'<div style="background:linear-gradient(135deg,#1abc9c,#16a085);padding:16px 22px;'
'display:flex;align-items:center;gap:12px;">'
'<span style="font-size:24px;">&#x2705;</span>'
'<span style="color:#ffffff;font-size:18px;font-weight:700;letter-spacing:0.3px;">'
'Submission Received Successfully</span>'
'</div>'
# Meta row
'<div style="padding:18px 22px;background:#f0faf7;display:flex;flex-wrap:wrap;'
'gap:28px;border-bottom:1px solid #d5eee8;">'
+ "".join(
f'<div><div style="color:#6b8f85;font-size:11px;font-weight:600;'
f'letter-spacing:0.8px;text-transform:uppercase;">{label}</div>'
f'<div style="font-weight:700;font-size:15px;color:{color};margin-top:3px;">{value}</div></div>'
for label, value, color in [
(type_label, model, "#1a1a2e"),
("Organisation", organisation, "#1a1a2e"),
("Episodes", str(len(submissions)), "#1a1a2e"),
("Questions Scored", str(score_result["scored_questions"]), "#1a1a2e"),
("Overall Avg", f"{overall_avg*100:.2f}%", "#0e9e7a"),
("Submission ID", timestamp, "#666"),
]
)
+ '</div>'
# Score table
'<div style="padding:18px 22px;background:#ffffff;">'
'<div style="font-size:13px;font-weight:600;color:#444;margin-bottom:12px;">'
'&#x1F4CA;&nbsp; Score Preview '
'<span style="font-weight:400;color:#888;">(self-reported · pending official verification)</span>'
'</div>'
'<div style="border-radius:8px;overflow:hidden;border:1px solid #e0eaf0;">'
'<table style="width:100%;border-collapse:collapse;font-size:13px;">'
'<thead>'
'<tr style="background:#e8f4f0;">'
'<th style="padding:9px 12px;text-align:left;color:#1a1a2e;font-weight:600;">Domain</th>'
'<th style="padding:9px 12px;text-align:center;color:#1a1a2e;font-weight:600;">Avg</th>'
'<th style="padding:9px 12px;text-align:center;color:#1a1a2e;font-weight:600;">Recall (A)</th>'
'<th style="padding:9px 12px;text-align:center;color:#1a1a2e;font-weight:600;">Causal Inf. (B)</th>'
'<th style="padding:9px 12px;text-align:center;color:#1a1a2e;font-weight:600;">State Upd. (C)</th>'
'<th style="padding:9px 12px;text-align:center;color:#1a1a2e;font-weight:600;">State Abs. (D)</th>'
'</tr>'
'</thead>'
f'<tbody>{dom_rows_html}</tbody>'
'</table>'
'</div>'
+ warning_html +
'<div style="margin-top:14px;padding:10px 14px;background:#fffbea;border-radius:6px;'
'font-size:12px;color:#7d5a00;line-height:1.7;border-left:3px solid #f5c518;">'
'&#x2139;&#xFE0F;&nbsp; This is a <strong style="color:#5a3e00;">self-reported preview</strong> based on your '
'<code style="background:#f5e9b8;color:#5a3e00;padding:1px 4px;border-radius:3px;">llm_as_judge_score_list</code>. '
'Official scores will be recomputed by LLM-as-Judge — your entry will appear on the leaderboard after weekly verification.'
'</div>'
'</div>'
'</div>'
'</div>'
)
return result_html
except Exception as e:
import traceback; traceback.print_exc()
return format_error(f"An error occurred: {str(e)}")