|
|
""" |
|
|
evaluate.py - DISBench Evaluation Script |
|
|
|
|
|
Responsibilities: |
|
|
1. Scan all JSON submission files in submissions/ directory |
|
|
2. For each unevaluated submission, compare with groundtruth.jsonl to calculate scores |
|
|
3. Append new results to leaderboard_data.json |
|
|
4. (Optional) Commit updated leaderboard_data.json back to HF repository |
|
|
|
|
|
Execution: |
|
|
- Automatic: Called when app.py starts (automatically triggered on Space rebuild) |
|
|
- Manual: python evaluate.py |
|
|
""" |
|
|
|
|
|
import os |
|
|
import json |
|
|
import logging |
|
|
from datetime import datetime |
|
|
from typing import Dict, List, Set, Tuple, Optional |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
SUBMISSIONS_DIR = "submissions" |
|
|
GROUND_TRUTH_FILE = "groundtruth.jsonl" |
|
|
LEADERBOARD_FILE = "leaderboard_data.json" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def compute_em(predicted: Set[str], gold: Set[str]) -> float: |
|
|
"""Exact Match: returns 1 if predicted set exactly matches gold set, otherwise 0""" |
|
|
return 1.0 if predicted == gold else 0.0 |
|
|
|
|
|
|
|
|
def compute_f1(predicted: Set[str], gold: Set[str]) -> float: |
|
|
"""F1 Score: harmonic mean of set-based precision and recall""" |
|
|
if not predicted and not gold: |
|
|
return 1.0 |
|
|
if not predicted or not gold: |
|
|
return 0.0 |
|
|
|
|
|
tp = len(predicted & gold) |
|
|
precision = tp / len(predicted) |
|
|
recall = tp / len(gold) |
|
|
|
|
|
if precision + recall == 0: |
|
|
return 0.0 |
|
|
return 2 * precision * recall / (precision + recall) |
|
|
|
|
|
|
|
|
def load_ground_truth() -> Dict: |
|
|
""" |
|
|
Load ground truth file (JSONL format). |
|
|
|
|
|
Input format (groundtruth.jsonl): |
|
|
Each line is a JSON object: |
|
|
{ |
|
|
"query_id": "1", |
|
|
"user_id": "...", |
|
|
"query": "...", |
|
|
"answer": ["photo_id_1", "photo_id_2"], |
|
|
"event_type": "intra-event" // "intra-event" or "inter-event" |
|
|
} |
|
|
|
|
|
Converted to internal format: |
|
|
{ |
|
|
"queries": { |
|
|
"1": { |
|
|
"type": "intra", // "intra" or "inter" |
|
|
"gold_photos": ["photo_id_1", "photo_id_2"] |
|
|
}, |
|
|
... |
|
|
} |
|
|
} |
|
|
""" |
|
|
if not os.path.exists(GROUND_TRUTH_FILE): |
|
|
logger.warning(f"Ground truth file not found: {GROUND_TRUTH_FILE}") |
|
|
return {} |
|
|
|
|
|
queries = {} |
|
|
with open(GROUND_TRUTH_FILE, 'r', encoding='utf-8') as f: |
|
|
for line_num, line in enumerate(f, 1): |
|
|
line = line.strip() |
|
|
if not line: |
|
|
continue |
|
|
|
|
|
try: |
|
|
entry = json.loads(line) |
|
|
query_id = entry.get("query_id") |
|
|
answer = entry.get("answer", []) |
|
|
event_type = entry.get("event_type", "intra-event") |
|
|
|
|
|
|
|
|
query_type = event_type.replace("-event", "") |
|
|
|
|
|
queries[query_id] = { |
|
|
"type": query_type, |
|
|
"gold_photos": answer |
|
|
} |
|
|
except json.JSONDecodeError as e: |
|
|
logger.warning(f"Invalid JSON at line {line_num}: {e}") |
|
|
continue |
|
|
except Exception as e: |
|
|
logger.warning(f"Error processing line {line_num}: {e}") |
|
|
continue |
|
|
|
|
|
return {"queries": queries} |
|
|
|
|
|
|
|
|
def evaluate_predictions( |
|
|
predictions: Dict[str, List[str]], |
|
|
ground_truth: Dict |
|
|
) -> Dict[str, float]: |
|
|
""" |
|
|
Calculate all metrics for a submission's predictions. |
|
|
|
|
|
Returns: |
|
|
{ |
|
|
"overall_em": float, |
|
|
"overall_f1": float, |
|
|
"intra_em": float, |
|
|
"intra_f1": float, |
|
|
"inter_em": float, |
|
|
"inter_f1": float |
|
|
} |
|
|
""" |
|
|
queries = ground_truth.get("queries", {}) |
|
|
|
|
|
if not queries: |
|
|
logger.warning("Ground truth has no queries, returning zeros.") |
|
|
return { |
|
|
"overall_em": 0.0, "overall_f1": 0.0, |
|
|
"intra_em": 0.0, "intra_f1": 0.0, |
|
|
"inter_em": 0.0, "inter_f1": 0.0, |
|
|
} |
|
|
|
|
|
|
|
|
scores_by_type = {"intra": {"em": [], "f1": []}, "inter": {"em": [], "f1": []}} |
|
|
all_em, all_f1 = [], [] |
|
|
|
|
|
for query_id, query_info in queries.items(): |
|
|
gold_set = set(query_info.get("gold_photos", [])) |
|
|
pred_set = set(predictions.get(query_id, [])) |
|
|
query_type = query_info.get("type", "intra") |
|
|
|
|
|
em = compute_em(pred_set, gold_set) |
|
|
f1 = compute_f1(pred_set, gold_set) |
|
|
|
|
|
all_em.append(em) |
|
|
all_f1.append(f1) |
|
|
|
|
|
if query_type in scores_by_type: |
|
|
scores_by_type[query_type]["em"].append(em) |
|
|
scores_by_type[query_type]["f1"].append(f1) |
|
|
|
|
|
def safe_mean(lst): |
|
|
return round(sum(lst) / len(lst) * 100, 1) if lst else 0.0 |
|
|
|
|
|
return { |
|
|
"overall_em": safe_mean(all_em), |
|
|
"overall_f1": safe_mean(all_f1), |
|
|
"intra_em": safe_mean(scores_by_type["intra"]["em"]), |
|
|
"intra_f1": safe_mean(scores_by_type["intra"]["f1"]), |
|
|
"inter_em": safe_mean(scores_by_type["inter"]["em"]), |
|
|
"inter_f1": safe_mean(scores_by_type["inter"]["f1"]), |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_entry_key(entry: Dict) -> Tuple: |
|
|
""" |
|
|
Generate unique identifier key for an entry. |
|
|
|
|
|
The same method may have multiple different configurations (different backbone, retriever, etc.), |
|
|
Only when all key configuration fields are the same, they are considered the same submission. |
|
|
|
|
|
Returns: (method, agent, backbone, retriever, track) |
|
|
""" |
|
|
return ( |
|
|
entry.get("method", ""), |
|
|
entry.get("agent", ""), |
|
|
entry.get("backbone", ""), |
|
|
entry.get("retriever", ""), |
|
|
entry.get("track", "Standard"), |
|
|
) |
|
|
|
|
|
|
|
|
def load_leaderboard() -> list: |
|
|
if os.path.exists(LEADERBOARD_FILE): |
|
|
with open(LEADERBOARD_FILE, 'r', encoding='utf-8') as f: |
|
|
return json.load(f) |
|
|
return [] |
|
|
|
|
|
|
|
|
def save_leaderboard(data: list): |
|
|
with open(LEADERBOARD_FILE, 'w', encoding='utf-8') as f: |
|
|
json.dump(data, f, indent=2, ensure_ascii=False) |
|
|
|
|
|
|
|
|
def process_submission(filepath: str, ground_truth: Dict) -> Optional[Dict]: |
|
|
""" |
|
|
Process a single submission file, return leaderboard entry (or None if error). |
|
|
""" |
|
|
try: |
|
|
with open(filepath, 'r', encoding='utf-8') as f: |
|
|
submission = json.load(f) |
|
|
|
|
|
meta = submission.get("meta", {}) |
|
|
predictions = submission.get("predictions", {}) |
|
|
|
|
|
if not meta.get("method_name"): |
|
|
logger.warning(f"Skipping {filepath}: missing method_name") |
|
|
return None |
|
|
|
|
|
if not predictions: |
|
|
logger.warning(f"Skipping {filepath}: empty predictions") |
|
|
return None |
|
|
|
|
|
|
|
|
scores = evaluate_predictions(predictions, ground_truth) |
|
|
|
|
|
entry = { |
|
|
"method": meta.get("method_name", "Unknown"), |
|
|
"url": meta.get("project_url", "#"), |
|
|
"org": meta.get("organization", "Anonymous"), |
|
|
"agent": meta.get("agent_framework", "Unknown"), |
|
|
"backbone": meta.get("backbone_model", "Unknown"), |
|
|
"retriever": meta.get("retriever_model", "Unknown"), |
|
|
"track": meta.get("track", "Standard"), |
|
|
"date": datetime.now().strftime("%Y-%m-%d"), |
|
|
**scores, |
|
|
} |
|
|
|
|
|
logger.info( |
|
|
f"Evaluated '{entry['method']}': " |
|
|
f"Overall EM={scores['overall_em']}, F1={scores['overall_f1']}" |
|
|
) |
|
|
return entry |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error processing {filepath}: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
def run_evaluation(): |
|
|
""" |
|
|
Main evaluation pipeline: |
|
|
1. Load ground truth |
|
|
2. Scan all files in submissions/ and re-evaluate |
|
|
3. Deduplicate using configuration combinations (method, agent, backbone, retriever, track) |
|
|
4. If multiple submissions exist for the same configuration, keep the latest (sorted by filename, last file is considered latest) |
|
|
5. Return (number of entries, total entries) |
|
|
|
|
|
Notes: |
|
|
- No evaluated.json is maintained, all files are re-evaluated on each startup |
|
|
- submissions/ is the single source of truth |
|
|
- Benefits: simple logic, no state inconsistency, automatic recalculation when evaluation logic changes |
|
|
""" |
|
|
|
|
|
ground_truth = load_ground_truth() |
|
|
if not ground_truth: |
|
|
logger.info("No ground truth file found. Skipping evaluation.") |
|
|
return 0, 0 |
|
|
|
|
|
|
|
|
if not os.path.exists(SUBMISSIONS_DIR): |
|
|
logger.info("No submissions directory found.") |
|
|
return 0, 0 |
|
|
|
|
|
|
|
|
|
|
|
entries_by_config = {} |
|
|
|
|
|
for filename in sorted(os.listdir(SUBMISSIONS_DIR)): |
|
|
if not filename.endswith(".json"): |
|
|
continue |
|
|
|
|
|
filepath = os.path.join(SUBMISSIONS_DIR, filename) |
|
|
logger.info(f"Processing submission: {filename}") |
|
|
|
|
|
entry = process_submission(filepath, ground_truth) |
|
|
if entry is not None: |
|
|
config_key = get_entry_key(entry) |
|
|
|
|
|
|
|
|
if config_key in entries_by_config: |
|
|
old_filename = entries_by_config[config_key][1] |
|
|
logger.info( |
|
|
f"Config {config_key} already exists (from {old_filename}), " |
|
|
f"replacing with {filename}" |
|
|
) |
|
|
|
|
|
entries_by_config[config_key] = (entry, filename) |
|
|
|
|
|
|
|
|
leaderboard = [entry for entry, _ in entries_by_config.values()] |
|
|
|
|
|
|
|
|
save_leaderboard(leaderboard) |
|
|
logger.info(f"Leaderboard updated: {len(leaderboard)} unique configurations.") |
|
|
|
|
|
return len(leaderboard), len(leaderboard) |
|
|
|
|
|
|
|
|
def commit_leaderboard_to_repo(): |
|
|
""" |
|
|
(Optional) Commit the updated leaderboard_data.json back to HF repository, |
|
|
to persist data (avoid re-evaluation on every restart). |
|
|
|
|
|
Note: We no longer commit evaluated.json, as we re-evaluate from submissions/ on each startup. |
|
|
""" |
|
|
hf_token = os.environ.get("HF_TOKEN") |
|
|
space_id = os.environ.get("SPACE_ID") |
|
|
|
|
|
if not hf_token or not space_id: |
|
|
logger.info("HF_TOKEN or SPACE_ID not set, skipping repo commit.") |
|
|
return |
|
|
|
|
|
try: |
|
|
from huggingface_hub import HfApi, CommitOperationAdd |
|
|
|
|
|
api = HfApi(token=hf_token) |
|
|
|
|
|
|
|
|
if not os.path.exists(LEADERBOARD_FILE): |
|
|
logger.warning(f"Leaderboard file {LEADERBOARD_FILE} not found, skipping commit.") |
|
|
return |
|
|
|
|
|
with open(LEADERBOARD_FILE, 'rb') as f: |
|
|
api.create_commit( |
|
|
repo_id=space_id, |
|
|
repo_type="space", |
|
|
operations=[ |
|
|
CommitOperationAdd( |
|
|
path_in_repo=LEADERBOARD_FILE, |
|
|
path_or_fileobj=f.read(), |
|
|
) |
|
|
], |
|
|
commit_message="[Auto] Update leaderboard scores", |
|
|
) |
|
|
logger.info("Leaderboard committed to repo successfully.") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to commit to repo: {e}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
logger.info("=" * 60) |
|
|
logger.info("DISBench Evaluation Pipeline - Manual Run") |
|
|
logger.info("=" * 60) |
|
|
|
|
|
total, _ = run_evaluation() |
|
|
|
|
|
if total > 0: |
|
|
logger.info(f"Evaluated all submissions. Committing to repo...") |
|
|
commit_leaderboard_to_repo() |
|
|
else: |
|
|
logger.info("No submissions found.") |
|
|
|
|
|
logger.info(f"Leaderboard has {total} unique configurations.") |