Spaces:
Runtime error
Runtime error
| """ | |
| evaluate.py - DISBench Evaluation Script | |
| Responsibilities: | |
| 1. Scan all JSON submission files in submissions/ directory | |
| 2. For each unevaluated submission, compare with groundtruth.jsonl to calculate scores | |
| 3. Append new results to leaderboard_data.json | |
| 4. (Optional) Commit updated leaderboard_data.json back to HF repository | |
| Execution: | |
| - Automatic: Called when app.py starts (automatically triggered on Space rebuild) | |
| - Manual: python evaluate.py | |
| """ | |
| import os | |
| import json | |
| import logging | |
| from datetime import datetime | |
| from typing import Dict, List, Set, Tuple, Optional | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") | |
| logger = logging.getLogger(__name__) | |
| # --- Path Configuration --- | |
| SUBMISSIONS_DIR = "submissions" | |
| GROUND_TRUTH_FILE = "groundtruth.jsonl" | |
| LEADERBOARD_FILE = "leaderboard_data.json" | |
| # ============================================================ | |
| # Core Evaluation Logic | |
| # ============================================================ | |
| def compute_em(predicted: Set[str], gold: Set[str]) -> float: | |
| """Exact Match: returns 1 if predicted set exactly matches gold set, otherwise 0""" | |
| return 1.0 if predicted == gold else 0.0 | |
| def compute_f1(predicted: Set[str], gold: Set[str]) -> float: | |
| """F1 Score: harmonic mean of set-based precision and recall""" | |
| if not predicted and not gold: | |
| return 1.0 | |
| if not predicted or not gold: | |
| return 0.0 | |
| tp = len(predicted & gold) | |
| precision = tp / len(predicted) | |
| recall = tp / len(gold) | |
| if precision + recall == 0: | |
| return 0.0 | |
| return 2 * precision * recall / (precision + recall) | |
| def load_ground_truth() -> Dict: | |
| """ | |
| Load ground truth file (JSONL format). | |
| Input format (groundtruth.jsonl): | |
| Each line is a JSON object: | |
| { | |
| "query_id": "1", | |
| "user_id": "...", | |
| "query": "...", | |
| "answer": ["photo_id_1", "photo_id_2"], | |
| "event_type": "intra-event" // "intra-event" or "inter-event" | |
| } | |
| Converted to internal format: | |
| { | |
| "queries": { | |
| "1": { | |
| "type": "intra", // "intra" or "inter" | |
| "gold_photos": ["photo_id_1", "photo_id_2"] | |
| }, | |
| ... | |
| } | |
| } | |
| """ | |
| if not os.path.exists(GROUND_TRUTH_FILE): | |
| logger.warning(f"Ground truth file not found: {GROUND_TRUTH_FILE}") | |
| return {} | |
| queries = {} | |
| with open(GROUND_TRUTH_FILE, 'r', encoding='utf-8') as f: | |
| for line_num, line in enumerate(f, 1): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| entry = json.loads(line) | |
| query_id = entry.get("query_id") | |
| answer = entry.get("answer", []) | |
| event_type = entry.get("event_type", "intra-event") | |
| # Convert event_type: "intra-event" -> "intra", "inter-event" -> "inter" | |
| query_type = event_type.replace("-event", "") | |
| queries[query_id] = { | |
| "type": query_type, | |
| "gold_photos": answer | |
| } | |
| except json.JSONDecodeError as e: | |
| logger.warning(f"Invalid JSON at line {line_num}: {e}") | |
| continue | |
| except Exception as e: | |
| logger.warning(f"Error processing line {line_num}: {e}") | |
| continue | |
| return {"queries": queries} | |
| def evaluate_predictions( | |
| predictions: Dict[str, List[str]], | |
| ground_truth: Dict | |
| ) -> Dict[str, float]: | |
| """ | |
| Calculate all metrics for a submission's predictions. | |
| Returns: | |
| { | |
| "overall_em": float, | |
| "overall_f1": float, | |
| "intra_em": float, | |
| "intra_f1": float, | |
| "inter_em": float, | |
| "inter_f1": float | |
| } | |
| """ | |
| queries = ground_truth.get("queries", {}) | |
| if not queries: | |
| logger.warning("Ground truth has no queries, returning zeros.") | |
| return { | |
| "overall_em": 0.0, "overall_f1": 0.0, | |
| "intra_em": 0.0, "intra_f1": 0.0, | |
| "inter_em": 0.0, "inter_f1": 0.0, | |
| } | |
| # Collect scores by type | |
| scores_by_type = {"intra": {"em": [], "f1": []}, "inter": {"em": [], "f1": []}} | |
| all_em, all_f1 = [], [] | |
| for query_id, query_info in queries.items(): | |
| gold_set = set(query_info.get("gold_photos", [])) | |
| pred_set = set(predictions.get(query_id, [])) | |
| query_type = query_info.get("type", "intra") # Default to intra | |
| em = compute_em(pred_set, gold_set) | |
| f1 = compute_f1(pred_set, gold_set) | |
| all_em.append(em) | |
| all_f1.append(f1) | |
| if query_type in scores_by_type: | |
| scores_by_type[query_type]["em"].append(em) | |
| scores_by_type[query_type]["f1"].append(f1) | |
| def safe_mean(lst): | |
| return round(sum(lst) / len(lst) * 100, 1) if lst else 0.0 | |
| return { | |
| "overall_em": safe_mean(all_em), | |
| "overall_f1": safe_mean(all_f1), | |
| "intra_em": safe_mean(scores_by_type["intra"]["em"]), | |
| "intra_f1": safe_mean(scores_by_type["intra"]["f1"]), | |
| "inter_em": safe_mean(scores_by_type["inter"]["em"]), | |
| "inter_f1": safe_mean(scores_by_type["inter"]["f1"]), | |
| } | |
| # ============================================================ | |
| # Submission Management | |
| # ============================================================ | |
| def get_entry_key(entry: Dict) -> Tuple: | |
| """ | |
| Generate unique identifier key for an entry. | |
| The same method may have multiple different configurations (different backbone, retriever, etc.), | |
| Only when all key configuration fields are the same, they are considered the same submission. | |
| Returns: (method, agent, backbone, retriever, track) | |
| """ | |
| return ( | |
| entry.get("method", ""), | |
| entry.get("agent", ""), | |
| entry.get("backbone", ""), | |
| entry.get("retriever", ""), | |
| entry.get("track", "Standard"), | |
| ) | |
| def load_leaderboard() -> list: | |
| if os.path.exists(LEADERBOARD_FILE): | |
| with open(LEADERBOARD_FILE, 'r', encoding='utf-8') as f: | |
| return json.load(f) | |
| return [] | |
| def save_leaderboard(data: list): | |
| with open(LEADERBOARD_FILE, 'w', encoding='utf-8') as f: | |
| json.dump(data, f, indent=2, ensure_ascii=False) | |
| def process_submission(filepath: str, ground_truth: Dict) -> Optional[Dict]: | |
| """ | |
| Process a single submission file, return leaderboard entry (or None if error). | |
| """ | |
| try: | |
| with open(filepath, 'r', encoding='utf-8') as f: | |
| submission = json.load(f) | |
| meta = submission.get("meta", {}) | |
| predictions = submission.get("predictions", {}) | |
| if not meta.get("method_name"): | |
| logger.warning(f"Skipping {filepath}: missing method_name") | |
| return None | |
| if not predictions: | |
| logger.warning(f"Skipping {filepath}: empty predictions") | |
| return None | |
| # Calculate scores | |
| scores = evaluate_predictions(predictions, ground_truth) | |
| entry = { | |
| "method": meta.get("method_name", "Unknown"), | |
| "url": meta.get("project_url", "#"), | |
| "org": meta.get("organization", "Anonymous"), | |
| "agent": meta.get("agent_framework", "Unknown"), | |
| "backbone": meta.get("backbone_model", "Unknown"), | |
| "retriever": meta.get("retriever_model", "Unknown"), | |
| "track": meta.get("track", "Standard"), | |
| "date": datetime.now().strftime("%Y-%m-%d"), | |
| **scores, | |
| } | |
| logger.info( | |
| f"Evaluated '{entry['method']}': " | |
| f"Overall EM={scores['overall_em']}, F1={scores['overall_f1']}" | |
| ) | |
| return entry | |
| except Exception as e: | |
| logger.error(f"Error processing {filepath}: {e}") | |
| return None | |
| def run_evaluation(): | |
| """ | |
| Main evaluation pipeline: | |
| 1. Load ground truth | |
| 2. Scan all files in submissions/ and re-evaluate | |
| 3. Deduplicate using configuration combinations (method, agent, backbone, retriever, track) | |
| 4. If multiple submissions exist for the same configuration, keep the latest (sorted by filename, last file is considered latest) | |
| 5. Return (number of entries, total entries) | |
| Notes: | |
| - No evaluated.json is maintained, all files are re-evaluated on each startup | |
| - submissions/ is the single source of truth | |
| - Benefits: simple logic, no state inconsistency, automatic recalculation when evaluation logic changes | |
| """ | |
| # 1. Load ground truth | |
| ground_truth = load_ground_truth() | |
| if not ground_truth: | |
| logger.info("No ground truth file found. Skipping evaluation.") | |
| return 0, 0 | |
| # 2. Scan all submissions and evaluate | |
| if not os.path.exists(SUBMISSIONS_DIR): | |
| logger.info("No submissions directory found.") | |
| return 0, 0 | |
| # Use dictionary to store: key is configuration tuple, value is (entry, filename) | |
| # If multiple submissions exist for the same configuration, later evaluated ones will overwrite earlier ones (keep latest) | |
| entries_by_config = {} | |
| for filename in sorted(os.listdir(SUBMISSIONS_DIR)): | |
| if not filename.endswith(".json"): | |
| continue | |
| filepath = os.path.join(SUBMISSIONS_DIR, filename) | |
| logger.info(f"Processing submission: {filename}") | |
| entry = process_submission(filepath, ground_truth) | |
| if entry is not None: | |
| config_key = get_entry_key(entry) | |
| # If this configuration already exists, it means there's a duplicate submission, replace old with new | |
| if config_key in entries_by_config: | |
| old_filename = entries_by_config[config_key][1] | |
| logger.info( | |
| f"Config {config_key} already exists (from {old_filename}), " | |
| f"replacing with {filename}" | |
| ) | |
| entries_by_config[config_key] = (entry, filename) | |
| # 3. Extract all unique entries | |
| leaderboard = [entry for entry, _ in entries_by_config.values()] | |
| # 4. Save results | |
| save_leaderboard(leaderboard) | |
| logger.info(f"Leaderboard updated: {len(leaderboard)} unique configurations.") | |
| return len(leaderboard), len(leaderboard) | |
| def commit_leaderboard_to_repo(): | |
| """ | |
| (Optional) Commit the updated leaderboard_data.json back to HF repository, | |
| to persist data (avoid re-evaluation on every restart). | |
| Note: We no longer commit evaluated.json, as we re-evaluate from submissions/ on each startup. | |
| """ | |
| hf_token = os.environ.get("HF_TOKEN") | |
| space_id = os.environ.get("SPACE_ID") | |
| if not hf_token or not space_id: | |
| logger.info("HF_TOKEN or SPACE_ID not set, skipping repo commit.") | |
| return | |
| try: | |
| from huggingface_hub import HfApi, CommitOperationAdd | |
| api = HfApi(token=hf_token) | |
| # Only commit leaderboard_data.json | |
| if not os.path.exists(LEADERBOARD_FILE): | |
| logger.warning(f"Leaderboard file {LEADERBOARD_FILE} not found, skipping commit.") | |
| return | |
| with open(LEADERBOARD_FILE, 'rb') as f: | |
| api.create_commit( | |
| repo_id=space_id, | |
| repo_type="space", | |
| operations=[ | |
| CommitOperationAdd( | |
| path_in_repo=LEADERBOARD_FILE, | |
| path_or_fileobj=f.read(), | |
| ) | |
| ], | |
| commit_message="[Auto] Update leaderboard scores", | |
| ) | |
| logger.info("Leaderboard committed to repo successfully.") | |
| except Exception as e: | |
| logger.error(f"Failed to commit to repo: {e}") | |
| # ============================================================ | |
| # Entry Point | |
| # ============================================================ | |
| if __name__ == "__main__": | |
| logger.info("=" * 60) | |
| logger.info("DISBench Evaluation Pipeline - Manual Run") | |
| logger.info("=" * 60) | |
| total, _ = run_evaluation() | |
| if total > 0: | |
| logger.info(f"Evaluated all submissions. Committing to repo...") | |
| commit_leaderboard_to_repo() | |
| else: | |
| logger.info("No submissions found.") | |
| logger.info(f"Leaderboard has {total} unique configurations.") |