#!/usr/bin/env python3 """ Batch re-evaluate all submissions with the new Semantic Accuracy metric. This script downloads all prediction files from HuggingFace Hub and re-evaluates them with the ANLS* + LLM judge metric. Usage: # Dry run - list files only python batch_reevaluate.py --dry-run # Re-evaluate all files python batch_reevaluate.py # Re-evaluate specific organization python batch_reevaluate.py --org OpenAI # Upload results after review python batch_reevaluate.py --upload """ import json import os import sys import time from collections import defaultdict from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timezone from pathlib import Path from huggingface_hub import HfApi, hf_hub_download, list_repo_files from datasets import load_dataset # Add parent for imports sys.path.insert(0, str(Path(__file__).parent)) from metrics import ( anls_star, anls_star_llm, aggregate_anls_star_llm, citation_f1, kuiper_statistic ) # Parallelization config MAX_WORKERS = 24 # Config RESULTS_REPO = "agentic-document-ai/backend-results" TOKEN = os.environ.get("HF_TOKEN") OUTPUT_DIR = Path(__file__).parent / "reevaluated_results" def load_gold_data(): """Load gold standard from HuggingFace.""" print("Loading gold standard...") dataset = load_dataset("agentic-document-ai/dataset-PRIVATE", split="test") gold_by_id = {} gold_by_text = {} def _derive_hop_type(evidence: list) -> str: if not evidence: return 'single' documents = set() pages = set() for ev in evidence: doc = ev.get('document') page = ev.get('page') if doc is not None: documents.add(doc) if page is not None: pages.add(page) if len(documents) > 1: return 'cross_doc' if len(pages) > 1: return 'cross_page' return 'single' for ex in dataset: qid = ex.get('id', '') question = ex['question'].strip() evidence = ex.get('evidence', []) data = { 'question': question, 'answers': ex.get('answer_variants', []), 'evidence': evidence, 'category': ex.get('document_category', ''), 'domain': ex.get('domain', ''), 'hop_type': _derive_hop_type(evidence), } gold_by_id[qid] = data gold_by_text[question] = data return gold_by_id, gold_by_text def find_prediction_files(org_filter: str = None): """Find all prediction JSONL files in the results repo.""" files = list_repo_files(RESULTS_REPO, repo_type="dataset", token=TOKEN) pred_files = [f for f in files if '_predictions' in f and f.endswith('.jsonl')] if org_filter: pred_files = [f for f in pred_files if f.startswith(org_filter + '/')] return pred_files def find_result_file(pred_file: str): """Find the corresponding results JSON file for a predictions file.""" # Pattern: {org}/{model}_predictions_{timestamp}.jsonl -> {org}/{model}_results_{timestamp}.json parts = pred_file.rsplit('_predictions_', 1) if len(parts) == 2: result_file = parts[0] + '_results_' + parts[1].replace('.jsonl', '.json') return result_file return None def download_file(filepath: str) -> str: """Download a file from HuggingFace Hub.""" return hf_hub_download( repo_id=RESULTS_REPO, filename=filepath, repo_type="dataset", token=TOKEN ) def _evaluate_single_prediction(args, max_retries=3): """Evaluate a single prediction (for parallel processing).""" idx, pred, gold_data = args answer = pred.get('answer', '') question = pred.get('question', '').strip() citations = pred.get('citations', []) search_history = pred.get('search_history', []) steps = len(search_history) if search_history else pred.get('iterations', 0) # Calculate non-LLM metrics first anls = anls_star(answer, gold_data['answers']) doc_f1 = citation_f1(citations, gold_data['evidence'], level='document') page_f1 = citation_f1(citations, gold_data['evidence'], level='page') # Retry LLM call on failure for attempt in range(max_retries): try: llm_result = anls_star_llm(answer, gold_data['answers'], question) semantic_score = llm_result['score'] break except Exception as e: if attempt < max_retries - 1: print(f" Item {idx} attempt {attempt+1} failed: {e}, retrying...") time.sleep(2 ** attempt) # Exponential backoff else: print(f" Failed item {idx} after {max_retries} retries: {e}") raise return { 'idx': idx, 'anls': anls, 'semantic_score': semantic_score, 'correct': semantic_score >= 0.5, 'doc_f1': doc_f1['f1'], 'page_f1': page_f1['f1'], 'steps': steps, 'hop_type': gold_data.get('hop_type', 'single'), 'category': gold_data['category'], 'domain': gold_data['domain'] } def evaluate_with_semantic(predictions: list, gold_by_id: dict, gold_by_text: dict) -> dict: """Evaluate predictions with semantic accuracy metric (parallelized).""" # First, filter predictions to only those in test set matched_predictions = [] for pred in predictions: question = pred.get('question', '').strip() qid = pred.get('id', '') gold_data = None if question in gold_by_text: gold_data = gold_by_text[question] elif qid and qid in gold_by_id: gold_data = gold_by_id[qid] if gold_data: matched_predictions.append((pred, gold_data)) unmatched = len(predictions) - len(matched_predictions) print(f" Matched {len(matched_predictions)}/{len(predictions)} predictions to test set (skipping {unmatched})") total = len(matched_predictions) evals = [] completed = 0 # Prepare items with index for tracking items_with_idx = [(i, pred, gold) for i, (pred, gold) in enumerate(matched_predictions)] # Parallel evaluation with ThreadPoolExecutor print(f" Evaluating with {MAX_WORKERS} parallel workers...") with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: futures = {executor.submit(_evaluate_single_prediction, item): item[0] for item in items_with_idx} completed_indices = set() try: for future in as_completed(futures, timeout=600): # 10 min overall timeout try: result = future.result(timeout=120) # 2 min per item max evals.append(result) completed_indices.add(result['idx']) completed += 1 if completed % 50 == 0 or completed == total: print(f" Progress: {completed}/{total}") except TimeoutError: idx = futures[future] print(f" TIMEOUT: Item {idx} took too long, skipping") completed += 1 except TimeoutError: # Find which items are still pending pending = set(range(total)) - completed_indices print(f" OVERALL TIMEOUT: {len(pending)} items still pending: {sorted(pending)[:10]}...") # Cancel remaining futures for future in futures: future.cancel() if not evals: return None # Aggregate n = len(evals) semantic_scores = [e['semantic_score'] for e in evals] agg = aggregate_anls_star_llm(semantic_scores, apply_bias_correction=True) mean_anls = sum(e['anls'] for e in evals) / n * 100 mean_doc_f1 = sum(e['doc_f1'] for e in evals) / n * 100 mean_page_f1 = sum(e['page_f1'] for e in evals) / n * 100 kuiper = kuiper_statistic(evals) # By hop type single_hop = [e for e in evals if e['hop_type'] == 'single'] cross_page = [e for e in evals if e['hop_type'] == 'cross_page'] cross_doc = [e for e in evals if e['hop_type'] == 'cross_doc'] # By domain by_domain = defaultdict(list) for e in evals: domain = e['domain'] or 'Other' by_domain[domain].append(e) domain_scores = {} for domain, domain_evals in sorted(by_domain.items()): domain_semantic_scores = [e['semantic_score'] for e in domain_evals] domain_agg = aggregate_anls_star_llm(domain_semantic_scores, apply_bias_correction=True) domain_scores[domain] = { 'semantic': domain_agg['adjusted_score'] * 100, 'anls': sum(e['anls'] for e in domain_evals) / len(domain_evals) * 100, 'n': len(domain_evals) } return { 'overall': { 'semantic': agg['adjusted_score'] * 100, 'semantic_ci': (agg['ci_lower'] * 100, agg['ci_upper'] * 100), # 95% CI 'anls': mean_anls, 'page_f1': mean_page_f1, 'doc_f1': mean_doc_f1, 'kuiper': kuiper['kuiper_stat'] if not kuiper.get('degenerate') else None, }, 'single_evidence': { 'semantic': aggregate_anls_star_llm([e['semantic_score'] for e in single_hop], apply_bias_correction=True)['adjusted_score'] * 100 if single_hop else 0, 'anls': sum(e['anls'] for e in single_hop) / len(single_hop) * 100 if single_hop else 0, 'n': len(single_hop) }, 'multi_evidence_same_doc': { 'semantic': aggregate_anls_star_llm([e['semantic_score'] for e in cross_page], apply_bias_correction=True)['adjusted_score'] * 100 if cross_page else 0, 'anls': sum(e['anls'] for e in cross_page) / len(cross_page) * 100 if cross_page else 0, 'n': len(cross_page) }, 'multi_evidence_multi_doc': { 'semantic': aggregate_anls_star_llm([e['semantic_score'] for e in cross_doc], apply_bias_correction=True)['adjusted_score'] * 100 if cross_doc else 0, 'anls': sum(e['anls'] for e in cross_doc) / len(cross_doc) * 100 if cross_doc else 0, 'n': len(cross_doc) }, 'by_domain': domain_scores, 'n_evaluated': n, 'n_unmatched': unmatched } def main(): import argparse parser = argparse.ArgumentParser(description="Batch re-evaluate submissions") parser.add_argument('--dry-run', action='store_true', help="List files only, don't evaluate") parser.add_argument('--org', type=str, help="Filter by organization (e.g., 'OpenAI')") parser.add_argument('--upload', action='store_true', help="Upload already processed results to HuggingFace Hub (no re-evaluation)") parser.add_argument('--skip-existing', action='store_true', help="Skip already evaluated files") args = parser.parse_args() OUTPUT_DIR.mkdir(exist_ok=True) # Upload-only mode: just upload existing files if args.upload: print("Uploading existing results to HuggingFace Hub...") api = HfApi() result_files = list(OUTPUT_DIR.glob("**/*.json")) print(f"Found {len(result_files)} result files to upload") for result_file in result_files: rel_path = result_file.relative_to(OUTPUT_DIR) print(f" Uploading: {rel_path}") try: api.upload_file( path_or_fileobj=str(result_file), path_in_repo=str(rel_path), repo_id=RESULTS_REPO, repo_type="dataset", token=TOKEN, commit_message=f"Re-evaluate with semantic accuracy: {rel_path.stem}" ) print(f" ✓ Done") except Exception as e: print(f" ✗ Error: {e}") print("\nUpload complete!") return # Find prediction files print("Finding prediction files...") pred_files = find_prediction_files(args.org) print(f"Found {len(pred_files)} prediction files") if args.dry_run: for f in pred_files: print(f" - {f}") return # Load gold standard gold_by_id, gold_by_text = load_gold_data() print(f"Loaded {len(gold_by_id)} gold examples") # Process each file for i, pred_file in enumerate(pred_files): print(f"\n{'='*60}") print(f"[{i+1}/{len(pred_files)}] Processing: {pred_file}") print('='*60) # Check if already processed output_file = OUTPUT_DIR / (Path(pred_file).stem.replace('_predictions', '_results') + '_reevaluated.json') if args.skip_existing and output_file.exists(): print(" Skipping (already processed)") continue try: # Download predictions print(" Downloading predictions...") local_pred = download_file(pred_file) predictions = [] with open(local_pred) as f: for line in f: if line.strip(): predictions.append(json.loads(line)) print(f" Loaded {len(predictions)} predictions") # Download original results to preserve metadata result_file = find_result_file(pred_file) original_metadata = {} if result_file: try: local_result = download_file(result_file) with open(local_result) as f: original_data = json.load(f) original_metadata = { 'model_name': original_data.get('model_name'), 'organization': original_data.get('organization'), 'description': original_data.get('description'), 'link': original_data.get('link'), 'tags': original_data.get('tags'), 'submitted_by': original_data.get('submitted_by'), 'metadata': original_data.get('metadata'), 'submission_date': original_data.get('submission_date'), } print(f" Loaded metadata: model_name={original_metadata.get('model_name')}") except Exception as e: print(f" Warning: Could not load original results: {e}") # Fallback: extract metadata from filename if not found if not original_metadata.get('model_name'): # Pattern: Org/Model_Name_with_Stuff_predictions_timestamp.jsonl filename = Path(pred_file).stem # e.g., GPT-5_(2025-08-07)_with_BM25_Search_Tool_predictions_20260109_152104 parts = filename.rsplit('_predictions_', 1) if parts: model_name = parts[0].replace('_', ' ') # Convert underscores to spaces org = Path(pred_file).parts[0] if '/' in pred_file else 'Unknown' original_metadata = { 'model_name': model_name, 'organization': org.replace('_', ' '), 'description': '', 'tags': ['Agentic'], 'metadata': {'model_type': 'unknown'}, } print(f" Using fallback metadata: model_name={model_name}, org={org}") # Evaluate print(" Evaluating with semantic accuracy...") start_time = time.time() results = evaluate_with_semantic(predictions, gold_by_id, gold_by_text) elapsed = time.time() - start_time if results: print(f"\n Results (took {elapsed:.1f}s):") print(f" Semantic Accuracy: {results['overall']['semantic']:.1f}") print(f" ANLS*: {results['overall']['anls']:.1f}") print(f" Page F1: {results['overall']['page_f1']:.1f}") # Save with original metadata org = Path(pred_file).parts[0] if '/' in pred_file else 'Unknown' output_filename = Path(pred_file).name.replace('_predictions', '_results').replace('.jsonl', '.json') full_result = { **original_metadata, 'results': results, 'reevaluated_date': datetime.now(timezone.utc).isoformat(), 'source_predictions_file': pred_file, 'result_file_path': f"{org}/{output_filename}", } # Create org subfolder org_dir = OUTPUT_DIR / org org_dir.mkdir(exist_ok=True) output_file = org_dir / output_filename with open(output_file, 'w') as f: json.dump(full_result, f, indent=2) print(f" Saved to: {output_file}") else: print(" No valid evaluations") except Exception as e: print(f" Error: {e}") import traceback traceback.print_exc() continue print(f"\n{'='*60}") print("DONE!") print(f"Results saved to: {OUTPUT_DIR}") print(f"\nTo upload results, run: python batch_reevaluate.py --upload") if __name__ == "__main__": main()