"""Parser-quality benchmark helpers.""" from __future__ import annotations import csv from pathlib import Path from statistics import mean from time import perf_counter from zsgdp.benchmarks.ablations import ablation_plan from zsgdp.benchmarks.chunking_quality import chunking_quality_record, score_chunking_quality from zsgdp.benchmarks.datasets import DatasetDocument, get_dataset_loader from zsgdp.benchmarks.ground_truth import ( doclaynet_layout_truths, omnidocbench_formula_truths, omnidocbench_layout_truths, omnidocbench_table_truths, parsed_formula_records, parsed_layout_predictions, parsed_table_records, ) from zsgdp.benchmarks.per_parser_metrics import compute_per_parser_metrics from zsgdp.benchmarks.retrieval import run_retrieval_for_document from zsgdp.benchmarks.structure_quality import score_structure_quality, structure_quality_record from zsgdp.benchmarks.throughput import summarize_throughput, throughput_record from zsgdp.config import load_config from zsgdp.pipeline import parse_document from zsgdp.utils import write_json from zsgdp.verify.formula_extraction import compute_formula_extraction from zsgdp.verify.layout_f1 import compute_layout_f1 from zsgdp.verify.retrieval import compute_retrieval_metrics from zsgdp.verify.table_structure import compute_table_structure_score GROUND_TRUTH_ADAPTERS = { "doclaynet": doclaynet_layout_truths, "omnidocbench": omnidocbench_layout_truths, } TABLE_TRUTH_ADAPTERS = { "omnidocbench": omnidocbench_table_truths, } FORMULA_TRUTH_ADAPTERS = { "omnidocbench": omnidocbench_formula_truths, } def score_parser_quality(records: list[dict]) -> dict[str, float]: if not records: return {"mean_quality_score": 0.0} return { "mean_quality_score": sum(float(record.get("quality_score", 0.0)) for record in records) / len(records) } def run_parser_benchmark( input_dir: str | Path, output_dir: str | Path, *, config_path: str | Path | None = None, selected_parsers: list[str] | None = None, dataset_name: str = "custom_folder", ) -> dict: input_root = Path(input_dir) output_root = Path(output_dir) output_root.mkdir(parents=True, exist_ok=True) loader = get_dataset_loader(dataset_name) bench_config = load_config(config_path) documents: list[dict] = [] parser_rows: list[dict] = [] chunk_rows: list[dict] = [] structure_rows: list[dict] = [] chunk_quality_rows: list[dict] = [] throughput_rows: list[dict] = [] repair_rows: list[dict] = [] layout_rows: list[dict] = [] table_structure_rows: list[dict] = [] formula_rows: list[dict] = [] retrieval_rows: list[dict] = [] per_parser_rows: list[dict] = [] observed_chunk_strategies: set[str] = set() for dataset_document in loader(input_root): path = dataset_document.path doc_out = output_root / "parsed" / (dataset_document.doc_id or path.stem) started = perf_counter() parsed = parse_document(path, doc_out, config_path=config_path, selected_parsers=selected_parsers) elapsed_seconds = perf_counter() - started chunk_strategy_counts = _chunk_strategy_counts(parsed.chunks) observed_chunk_strategies.update(chunk_strategy_counts) disagreement = parsed.provenance.get("parser_disagreement", {}) or {} repair_success = parsed.provenance.get("repair_success", {}) or {} layout_metrics = _layout_metrics(dataset_document, parsed) table_metrics = _table_structure_metrics(dataset_document, parsed) formula_metrics = _formula_extraction_metrics(dataset_document, parsed) retrieval_metrics = _retrieval_metrics(parsed, bench_config) contribution = _parser_contribution(parsed) per_parser = _compute_per_parser_block(dataset_document, parsed) doc_record = { "source_path": str(path), "dataset_id": dataset_document.dataset_id, "dataset_doc_id": dataset_document.doc_id, "has_ground_truth": dataset_document.ground_truth is not None, "doc_id": parsed.doc_id, "file_type": parsed.file_type, "quality_score": parsed.quality_report.score, "element_count": len(parsed.elements), "table_count": len(parsed.tables), "figure_count": len(parsed.figures), "chunk_count": len(parsed.chunks), "chunk_strategy_counts": chunk_strategy_counts, "chunk_quality_metrics": _chunk_quality_metrics(parsed.quality_report.metrics), "elapsed_seconds": elapsed_seconds, "parser_metrics": parsed.provenance.get("parser_metrics", {}), "parser_failures": parsed.provenance.get("parser_failures", {}), "parser_disagreement_rate": float(disagreement.get("disagreement_rate", 0.0)), "candidate_count": int(disagreement.get("candidate_count", 0)), "conflict_count": int(disagreement.get("conflict_count", 0)), "repair_resolution_rate": float(repair_success.get("repair_resolution_rate", 1.0)), "repair_regression_rate": float(repair_success.get("repair_regression_rate", 0.0)), "repair_iteration_count": int(repair_success.get("iteration_count", 0)), "layout_f1": layout_metrics["summary"]["class_aware_f1"], "layout_class_agnostic_f1": layout_metrics["summary"]["class_agnostic_f1"], "layout_evaluated": layout_metrics["summary"]["evaluated"], "table_structure_score": table_metrics["summary"]["mean_table_score"], "table_match_rate": table_metrics["summary"]["table_match_rate"], "table_structure_evaluated": table_metrics["summary"]["evaluated"], "formula_cer": formula_metrics["summary"]["mean_cer"], "formula_accuracy": formula_metrics["summary"]["mean_accuracy"], "formula_exact_match_rate": formula_metrics["summary"]["exact_match_rate"], "formula_evaluated": formula_metrics["summary"]["evaluated"], "retrieval_recall_at_1": retrieval_metrics["summary"]["recall_at_1"], "retrieval_recall_at_5": retrieval_metrics["summary"]["recall_at_5"], "retrieval_mrr": retrieval_metrics["summary"]["mean_reciprocal_rank"], "retrieval_query_count": retrieval_metrics["summary"]["query_count"], "retrieval_evaluated": retrieval_metrics["summary"]["evaluated"], "parser_contribution_counts": contribution["counts"], "parser_contribution_fractions": contribution["fractions"], "per_parser_metrics": per_parser, } documents.append(doc_record) for parser_name, block in per_parser.items(): per_parser_rows.append(_per_parser_row(path, parsed, dataset_document, parser_name, block)) repair_rows.append(_repair_row(path, parsed, repair_success, disagreement)) if layout_metrics["summary"]["evaluated"]: layout_rows.append(_layout_row(path, parsed, dataset_document, layout_metrics)) if table_metrics["summary"]["evaluated"]: table_structure_rows.append(_table_structure_row(path, parsed, dataset_document, table_metrics)) if formula_metrics["summary"]["evaluated"]: formula_rows.append(_formula_row(path, parsed, dataset_document, formula_metrics)) if retrieval_metrics["summary"]["evaluated"]: retrieval_rows.append(_retrieval_row(path, parsed, retrieval_metrics)) for parser_name, metrics in doc_record["parser_metrics"].items(): parser_rows.append( { "source_path": str(path), "doc_id": parsed.doc_id, "parser": parser_name, **metrics, } ) chunk_rows.extend(_chunk_strategy_rows(path, parsed)) structure_rows.append(structure_quality_record(parsed, str(path))) chunk_quality_rows.append(chunking_quality_record(parsed, str(path))) throughput_rows.append(throughput_record(parsed, str(path), elapsed_seconds)) config = bench_config parser_contribution_summary = _aggregate_parser_contributions(documents) summary = { "dataset_name": dataset_name, "dataset_root": str(input_root), "document_count": len(documents), "parser_contribution_summary": parser_contribution_summary, "mean_quality_score": mean([doc["quality_score"] for doc in documents]) if documents else 0.0, "mean_parser_disagreement_rate": _mean_value(documents, "parser_disagreement_rate"), "mean_repair_resolution_rate": _mean_value(documents, "repair_resolution_rate"), "mean_repair_regression_rate": _mean_value(documents, "repair_regression_rate"), "mean_layout_f1": _mean_value( [doc for doc in documents if doc.get("layout_evaluated")], "layout_f1" ), "mean_layout_class_agnostic_f1": _mean_value( [doc for doc in documents if doc.get("layout_evaluated")], "layout_class_agnostic_f1" ), "layout_evaluated_count": sum(1 for doc in documents if doc.get("layout_evaluated")), "mean_table_structure_score": _mean_value( [doc for doc in documents if doc.get("table_structure_evaluated")], "table_structure_score" ), "mean_table_match_rate": _mean_value( [doc for doc in documents if doc.get("table_structure_evaluated")], "table_match_rate" ), "table_structure_evaluated_count": sum(1 for doc in documents if doc.get("table_structure_evaluated")), "mean_formula_cer": _mean_value( [doc for doc in documents if doc.get("formula_evaluated")], "formula_cer" ), "mean_formula_accuracy": _mean_value( [doc for doc in documents if doc.get("formula_evaluated")], "formula_accuracy" ), "formula_evaluated_count": sum(1 for doc in documents if doc.get("formula_evaluated")), "mean_retrieval_recall_at_1": _mean_value( [doc for doc in documents if doc.get("retrieval_evaluated")], "retrieval_recall_at_1" ), "mean_retrieval_recall_at_5": _mean_value( [doc for doc in documents if doc.get("retrieval_evaluated")], "retrieval_recall_at_5" ), "mean_retrieval_mrr": _mean_value( [doc for doc in documents if doc.get("retrieval_evaluated")], "retrieval_mrr" ), "retrieval_evaluated_count": sum(1 for doc in documents if doc.get("retrieval_evaluated")), "documents": documents, "parser_leaderboard": _parser_leaderboard(parser_rows), "per_parser_gt_leaderboard": _per_parser_gt_leaderboard(per_parser_rows), "chunk_strategy_leaderboard": _chunk_strategy_leaderboard(chunk_rows), "structure_quality": score_structure_quality(structure_rows), "chunking_quality": score_chunking_quality(chunk_quality_rows), "throughput": summarize_throughput(throughput_rows), "ablation_plan": ablation_plan(config, observed_chunk_strategies), } write_json(output_root / "results.json", summary) write_json(output_root / "ablations.json", summary["ablation_plan"]) _write_leaderboard_csv(output_root / "leaderboard.csv", summary["parser_leaderboard"]) _write_parser_rows_csv(output_root / "parser_runs.csv", parser_rows) _write_chunk_rows_csv(output_root / "chunk_runs.csv", chunk_rows) _write_structure_rows_csv(output_root / "structure_runs.csv", structure_rows) _write_chunk_quality_rows_csv(output_root / "chunk_quality.csv", chunk_quality_rows) _write_throughput_rows_csv(output_root / "throughput_runs.csv", throughput_rows) _write_repair_rows_csv(output_root / "repair_runs.csv", repair_rows) _write_layout_rows_csv(output_root / "layout_runs.csv", layout_rows) _write_table_structure_rows_csv(output_root / "table_structure_runs.csv", table_structure_rows) _write_formula_rows_csv(output_root / "formula_runs.csv", formula_rows) _write_retrieval_rows_csv(output_root / "retrieval_runs.csv", retrieval_rows) _write_per_parser_rows_csv(output_root / "per_parser_metrics.csv", per_parser_rows) _write_per_parser_gt_leaderboard_csv( output_root / "per_parser_gt_leaderboard.csv", summary["per_parser_gt_leaderboard"] ) return summary def _per_parser_gt_leaderboard(rows: list[dict]) -> list[dict]: """Aggregate per-document per-parser rows into one leaderboard row per parser. A metric contributes to a parser's mean only when that parser actually had a non-zero prediction count for that metric on that document; this keeps "0.00 from no predictions" from dragging the mean down for parsers that simply don't emit bboxes (text/markdown). The number of documents contributing to each metric is reported alongside the mean. """ grouped: dict[str, list[dict]] = {} for row in rows: grouped.setdefault(row["parser"], []).append(row) leaderboard: list[dict] = [] for parser_name, parser_rows in grouped.items(): layout_rows = [row for row in parser_rows if row.get("layout_evaluated")] table_rows = [row for row in parser_rows if row.get("table_evaluated")] formula_rows = [row for row in parser_rows if row.get("formula_evaluated")] leaderboard.append( { "parser": parser_name, "document_count": len(parser_rows), "layout_evaluated_count": len(layout_rows), "mean_layout_class_aware_f1": _mean_value(layout_rows, "layout_class_aware_f1"), "mean_layout_class_agnostic_f1": _mean_value(layout_rows, "layout_class_agnostic_f1"), "mean_layout_precision": _mean_value(layout_rows, "layout_class_aware_precision"), "mean_layout_recall": _mean_value(layout_rows, "layout_class_aware_recall"), "table_evaluated_count": len(table_rows), "mean_table_structure_score": _mean_value(table_rows, "table_structure_score"), "mean_table_match_rate": _mean_value(table_rows, "table_match_rate"), "mean_table_cell_content_f1": _mean_value(table_rows, "table_cell_content_f1"), "formula_evaluated_count": len(formula_rows), "mean_formula_cer": _mean_value(formula_rows, "formula_cer"), "mean_formula_accuracy": _mean_value(formula_rows, "formula_accuracy"), "mean_formula_exact_match_rate": _mean_value(formula_rows, "formula_exact_match_rate"), "mean_element_count": _mean_value(parser_rows, "element_count"), "mean_table_count": _mean_value(parser_rows, "table_count"), "mean_figure_count": _mean_value(parser_rows, "figure_count"), } ) return sorted( leaderboard, key=lambda row: ( row["mean_layout_class_aware_f1"], row["mean_table_structure_score"], -row["mean_formula_cer"], ), reverse=True, ) def _write_per_parser_gt_leaderboard_csv(path: Path, rows: list[dict]) -> None: fieldnames = [ "parser", "document_count", "layout_evaluated_count", "mean_layout_class_aware_f1", "mean_layout_class_agnostic_f1", "mean_layout_precision", "mean_layout_recall", "table_evaluated_count", "mean_table_structure_score", "mean_table_match_rate", "mean_table_cell_content_f1", "formula_evaluated_count", "mean_formula_cer", "mean_formula_accuracy", "mean_formula_exact_match_rate", "mean_element_count", "mean_table_count", "mean_figure_count", ] normalized = [{field: row.get(field, "") for field in fieldnames} for row in rows] _write_csv(path, normalized, fieldnames) def _compute_per_parser_block(dataset_document: DatasetDocument, parsed) -> dict[str, dict[str, Any]]: layout_adapter = GROUND_TRUTH_ADAPTERS.get(dataset_document.dataset_id) table_adapter = TABLE_TRUTH_ADAPTERS.get(dataset_document.dataset_id) formula_adapter = FORMULA_TRUTH_ADAPTERS.get(dataset_document.dataset_id) gt = dataset_document.ground_truth layout_truths = layout_adapter(gt) if (layout_adapter and gt is not None) else None table_truths = table_adapter(gt) if (table_adapter and gt is not None) else None formula_truths = formula_adapter(gt) if (formula_adapter and gt is not None) else None if not (layout_truths or table_truths or formula_truths): return {} return compute_per_parser_metrics( parsed, layout_truths=layout_truths or None, table_truths=table_truths or None, formula_truths=formula_truths or None, ) def _per_parser_row( path: Path, parsed, dataset_document: DatasetDocument, parser_name: str, block: dict[str, Any], ) -> dict[str, Any]: layout = block.get("layout") or {} table = block.get("table_structure") or {} formula = block.get("formula") or {} return { "source_path": str(path), "doc_id": parsed.doc_id, "dataset_id": dataset_document.dataset_id, "parser": parser_name, "element_count": int(block.get("element_count", 0)), "table_count": int(block.get("table_count", 0)), "figure_count": int(block.get("figure_count", 0)), "layout_evaluated": "layout" in block, "table_evaluated": "table_structure" in block, "formula_evaluated": "formula" in block, "layout_prediction_count": int(layout.get("prediction_count", 0)), "layout_class_aware_f1": float(layout.get("class_aware_f1", 0.0)), "layout_class_aware_precision": float(layout.get("class_aware_precision", 0.0)), "layout_class_aware_recall": float(layout.get("class_aware_recall", 0.0)), "layout_class_agnostic_f1": float(layout.get("class_agnostic_f1", 0.0)), "table_structure_score": float(table.get("mean_table_score", 0.0)), "table_match_rate": float(table.get("table_match_rate", 0.0)), "table_cell_content_f1": float(table.get("mean_cell_content_f1", 0.0)), "formula_cer": float(formula.get("mean_cer", 0.0)) if formula else 0.0, "formula_accuracy": float(formula.get("mean_accuracy", 0.0)) if formula else 0.0, "formula_exact_match_rate": float(formula.get("exact_match_rate", 0.0)) if formula else 0.0, } def _write_per_parser_rows_csv(path: Path, rows: list[dict]) -> None: fieldnames = [ "source_path", "doc_id", "dataset_id", "parser", "element_count", "table_count", "figure_count", "layout_evaluated", "table_evaluated", "formula_evaluated", "layout_prediction_count", "layout_class_aware_f1", "layout_class_aware_precision", "layout_class_aware_recall", "layout_class_agnostic_f1", "table_structure_score", "table_match_rate", "table_cell_content_f1", "formula_cer", "formula_accuracy", "formula_exact_match_rate", ] normalized = [{field: row.get(field, "") for field in fieldnames} for row in rows] _write_csv(path, normalized, fieldnames) def _retrieval_metrics(parsed, config: dict | None = None) -> dict: from zsgdp.benchmarks.embedding_retriever import build_retriever retriever = build_retriever(config) if config else None run = run_retrieval_for_document(parsed, retriever=retriever) if not run["evaluated"]: return { "summary": { "evaluated": False, "query_count": 0, "recall_at_1": 0.0, "recall_at_5": 0.0, "mean_reciprocal_rank": 0.0, }, "metrics": None, "reason": run.get("reason"), } metrics = compute_retrieval_metrics( ((result["retrieved"], result["truths"]) for result in run["results"]), ) return { "summary": { "evaluated": True, "query_count": int(metrics["query_count"]), "recall_at_1": float(metrics["recall_at_k"].get(1, 0.0)), "recall_at_5": float(metrics["recall_at_k"].get(5, 0.0)), "mean_reciprocal_rank": float(metrics["mean_reciprocal_rank"]), }, "metrics": metrics, "reason": None, } def _retrieval_row(path: Path, parsed, retrieval_metrics: dict) -> dict: detail = retrieval_metrics["metrics"] or {} recall = detail.get("recall_at_k", {}) return { "source_path": str(path), "doc_id": parsed.doc_id, "query_count": int(detail.get("query_count", 0)), "recall_at_1": float(recall.get(1, 0.0)), "recall_at_3": float(recall.get(3, 0.0)), "recall_at_5": float(recall.get(5, 0.0)), "mean_reciprocal_rank": float(detail.get("mean_reciprocal_rank", 0.0)), "citation_accuracy_at_5": float(detail.get("citation_accuracy_at_k", {}).get(5, 0.0)), } def _parser_contribution(parsed) -> dict[str, Any]: """Count which parser produced each merged element/table/figure. Counts are over the *post-merge* output, not the pre-merge candidates. This is a "contribution" view (which parser's output survived) rather than an "ablation" view (which parser would do best alone). """ counts: dict[str, int] = {} for element in parsed.elements: counts[element.source_parser] = counts.get(element.source_parser, 0) + 1 for table in parsed.tables: counts[table.source_parser] = counts.get(table.source_parser, 0) + 1 for figure in parsed.figures: counts[figure.source_parser] = counts.get(figure.source_parser, 0) + 1 total = sum(counts.values()) fractions = {parser: (count / total) for parser, count in counts.items()} if total else {} return {"counts": counts, "fractions": fractions, "total": total} def _aggregate_parser_contributions(documents: list[dict]) -> dict[str, Any]: parser_totals: dict[str, int] = {} grand_total = 0 for doc in documents: counts = doc.get("parser_contribution_counts") or {} if not isinstance(counts, dict): continue for parser, count in counts.items(): parser_totals[parser] = parser_totals.get(parser, 0) + int(count) grand_total += int(count) fractions = {parser: (count / grand_total) for parser, count in parser_totals.items()} if grand_total else {} return {"counts": dict(sorted(parser_totals.items())), "fractions": dict(sorted(fractions.items())), "total": grand_total} def _write_retrieval_rows_csv(path: Path, rows: list[dict]) -> None: fieldnames = [ "source_path", "doc_id", "query_count", "recall_at_1", "recall_at_3", "recall_at_5", "mean_reciprocal_rank", "citation_accuracy_at_5", ] normalized = [{field: row.get(field, "") for field in fieldnames} for row in rows] _write_csv(path, normalized, fieldnames) def _table_structure_metrics(dataset_document: DatasetDocument, parsed) -> dict: adapter = TABLE_TRUTH_ADAPTERS.get(dataset_document.dataset_id) if adapter is None or dataset_document.ground_truth is None: return { "summary": {"evaluated": False, "mean_table_score": 1.0, "table_match_rate": 1.0}, "metrics": None, "reason": "no_table_truth_adapter" if adapter is None else "no_ground_truth", } truths = adapter(dataset_document.ground_truth) predictions = parsed_table_records(parsed) if not truths and not predictions: return { "summary": {"evaluated": False, "mean_table_score": 1.0, "table_match_rate": 1.0}, "metrics": None, "reason": "no_truths_and_no_predictions", } metrics = compute_table_structure_score(predictions, truths) return { "summary": { "evaluated": True, "mean_table_score": float(metrics["mean_table_score"]), "table_match_rate": float(metrics["table_match_rate"]), }, "metrics": metrics, "reason": None, } def _formula_extraction_metrics(dataset_document: DatasetDocument, parsed) -> dict: adapter = FORMULA_TRUTH_ADAPTERS.get(dataset_document.dataset_id) if adapter is None or dataset_document.ground_truth is None: return { "summary": {"evaluated": False, "mean_cer": 0.0, "mean_accuracy": 1.0, "exact_match_rate": 1.0}, "metrics": None, "reason": "no_formula_truth_adapter" if adapter is None else "no_ground_truth", } truths = adapter(dataset_document.ground_truth) predictions = parsed_formula_records(parsed) if not truths and not predictions: return { "summary": {"evaluated": False, "mean_cer": 0.0, "mean_accuracy": 1.0, "exact_match_rate": 1.0}, "metrics": None, "reason": "no_truths_and_no_predictions", } metrics = compute_formula_extraction(predictions, truths) return { "summary": { "evaluated": True, "mean_cer": float(metrics["mean_cer"]), "mean_accuracy": float(metrics["mean_accuracy"]), "exact_match_rate": float(metrics["exact_match_rate"]), }, "metrics": metrics, "reason": None, } def _table_structure_row(path: Path, parsed, dataset_document: DatasetDocument, metrics: dict) -> dict: detail = metrics["metrics"] or {} return { "source_path": str(path), "doc_id": parsed.doc_id, "dataset_id": dataset_document.dataset_id, "prediction_count": int(detail.get("prediction_count", 0)), "truth_count": int(detail.get("truth_count", 0)), "matched_pair_count": int(detail.get("matched_pair_count", 0)), "table_match_rate": float(detail.get("table_match_rate", 0.0)), "mean_table_score": float(detail.get("mean_table_score", 0.0)), "mean_shape_similarity": float(detail.get("mean_shape_similarity", 0.0)), "mean_cell_content_f1": float(detail.get("mean_cell_content_f1", 0.0)), "table_count_delta": int(detail.get("table_count_delta", 0)), } def _formula_row(path: Path, parsed, dataset_document: DatasetDocument, metrics: dict) -> dict: detail = metrics["metrics"] or {} return { "source_path": str(path), "doc_id": parsed.doc_id, "dataset_id": dataset_document.dataset_id, "prediction_count": int(detail.get("prediction_count", 0)), "truth_count": int(detail.get("truth_count", 0)), "matched_pair_count": int(detail.get("matched_pair_count", 0)), "mean_cer": float(detail.get("mean_cer", 1.0)), "mean_accuracy": float(detail.get("mean_accuracy", 0.0)), "exact_match_rate": float(detail.get("exact_match_rate", 0.0)), "formula_precision": float(detail.get("formula_precision", 0.0)), "formula_recall": float(detail.get("formula_recall", 0.0)), } def _write_table_structure_rows_csv(path: Path, rows: list[dict]) -> None: fieldnames = [ "source_path", "doc_id", "dataset_id", "prediction_count", "truth_count", "matched_pair_count", "table_match_rate", "mean_table_score", "mean_shape_similarity", "mean_cell_content_f1", "table_count_delta", ] normalized = [{field: row.get(field, "") for field in fieldnames} for row in rows] _write_csv(path, normalized, fieldnames) def _write_formula_rows_csv(path: Path, rows: list[dict]) -> None: fieldnames = [ "source_path", "doc_id", "dataset_id", "prediction_count", "truth_count", "matched_pair_count", "mean_cer", "mean_accuracy", "exact_match_rate", "formula_precision", "formula_recall", ] normalized = [{field: row.get(field, "") for field in fieldnames} for row in rows] _write_csv(path, normalized, fieldnames) def _layout_metrics(dataset_document: DatasetDocument, parsed) -> dict: adapter = GROUND_TRUTH_ADAPTERS.get(dataset_document.dataset_id) if adapter is None or dataset_document.ground_truth is None: return { "summary": {"evaluated": False, "class_aware_f1": 0.0, "class_agnostic_f1": 0.0}, "metrics": None, "reason": "no_ground_truth_adapter" if adapter is None else "no_ground_truth", } truths = adapter(dataset_document.ground_truth) predictions = parsed_layout_predictions(parsed) if not truths and not predictions: return { "summary": {"evaluated": False, "class_aware_f1": 0.0, "class_agnostic_f1": 0.0}, "metrics": None, "reason": "no_truths_and_no_predictions", } metrics = compute_layout_f1(predictions, truths) return { "summary": { "evaluated": True, "class_aware_f1": float(metrics["class_aware"]["f1"]), "class_agnostic_f1": float(metrics["class_agnostic"]["f1"]), }, "metrics": metrics, "reason": None, } def _layout_row(path: Path, parsed, dataset_document: DatasetDocument, layout_metrics: dict) -> dict: metrics = layout_metrics["metrics"] or {} class_aware = metrics.get("class_aware", {}) class_agnostic = metrics.get("class_agnostic", {}) return { "source_path": str(path), "doc_id": parsed.doc_id, "dataset_id": dataset_document.dataset_id, "iou_threshold": float(metrics.get("iou_threshold", 0.5)), "prediction_count": int(metrics.get("prediction_count", 0)), "truth_count": int(metrics.get("truth_count", 0)), "class_aware_precision": float(class_aware.get("precision", 0.0)), "class_aware_recall": float(class_aware.get("recall", 0.0)), "class_aware_f1": float(class_aware.get("f1", 0.0)), "class_agnostic_precision": float(class_agnostic.get("precision", 0.0)), "class_agnostic_recall": float(class_agnostic.get("recall", 0.0)), "class_agnostic_f1": float(class_agnostic.get("f1", 0.0)), } def _write_layout_rows_csv(path: Path, rows: list[dict]) -> None: fieldnames = [ "source_path", "doc_id", "dataset_id", "iou_threshold", "prediction_count", "truth_count", "class_aware_precision", "class_aware_recall", "class_aware_f1", "class_agnostic_precision", "class_agnostic_recall", "class_agnostic_f1", ] normalized = [{field: row.get(field, "") for field in fieldnames} for row in rows] _write_csv(path, normalized, fieldnames) def _repair_row(path: Path, parsed, repair_success: dict, disagreement: dict) -> dict: return { "source_path": str(path), "doc_id": parsed.doc_id, "candidate_count": int(disagreement.get("candidate_count", 0)), "parser_disagreement_rate": float(disagreement.get("disagreement_rate", 0.0)), "conflict_count": int(disagreement.get("conflict_count", 0)), "iteration_count": int(repair_success.get("iteration_count", 0)), "total_actions": int(repair_success.get("total_actions", 0)), "score_delta": float(repair_success.get("score_delta", 0.0)), "pre_repair_blocking_count": int(repair_success.get("pre_repair_blocking_count", 0)), "post_repair_blocking_count": int(repair_success.get("post_repair_blocking_count", 0)), "resolved_blocking_count": int(repair_success.get("resolved_blocking_count", 0)), "regressed_blocking_count": int(repair_success.get("regressed_blocking_count", 0)), "repair_resolution_rate": float(repair_success.get("repair_resolution_rate", 1.0)), "repair_regression_rate": float(repair_success.get("repair_regression_rate", 0.0)), } def _write_repair_rows_csv(path: Path, rows: list[dict]) -> None: fieldnames = [ "source_path", "doc_id", "candidate_count", "parser_disagreement_rate", "conflict_count", "iteration_count", "total_actions", "score_delta", "pre_repair_blocking_count", "post_repair_blocking_count", "resolved_blocking_count", "regressed_blocking_count", "repair_resolution_rate", "repair_regression_rate", ] normalized = [{field: row.get(field, "") for field in fieldnames} for row in rows] _write_csv(path, normalized, fieldnames) def _chunk_strategy_counts(chunks: list) -> dict[str, int]: counts: dict[str, int] = {} for chunk in chunks: counts[chunk.strategy] = counts.get(chunk.strategy, 0) + 1 return counts def _chunk_quality_metrics(metrics: dict) -> dict: keys = [ "chunk_count", "parent_chunk_count", "child_chunk_count", "avg_chunk_tokens", "max_chunk_tokens", "table_chunk_coverage", "figure_chunk_coverage", ] return {key: metrics[key] for key in keys if key in metrics} def _chunk_strategy_rows(path: Path, parsed) -> list[dict]: grouped: dict[str, list] = {} for chunk in parsed.chunks: grouped.setdefault(chunk.strategy, []).append(chunk) rows: list[dict] = [] for strategy, chunks in sorted(grouped.items()): token_counts = [chunk.token_count for chunk in chunks] rows.append( { "source_path": str(path), "doc_id": parsed.doc_id, "strategy": strategy, "quality_score": parsed.quality_report.score, "chunk_count": len(chunks), "avg_tokens": mean(token_counts) if token_counts else 0.0, "max_tokens": max(token_counts) if token_counts else 0, "table_linked_chunks": sum(1 for chunk in chunks if chunk.table_ids), "figure_linked_chunks": sum(1 for chunk in chunks if chunk.figure_ids), "visual_context_chunks": sum(1 for chunk in chunks if chunk.requires_visual_context), } ) return rows def _chunk_strategy_leaderboard(rows: list[dict]) -> list[dict]: grouped: dict[str, list[dict]] = {} for row in rows: grouped.setdefault(row["strategy"], []).append(row) leaderboard: list[dict] = [] for strategy, strategy_rows in grouped.items(): leaderboard.append( { "strategy": strategy, "runs": len(strategy_rows), "total_chunks": sum(int(row.get("chunk_count", 0)) for row in strategy_rows), "mean_chunk_count": _mean_value(strategy_rows, "chunk_count"), "mean_avg_tokens": _mean_value(strategy_rows, "avg_tokens"), "mean_max_tokens": _mean_value(strategy_rows, "max_tokens"), "mean_quality_score": _mean_value(strategy_rows, "quality_score"), "total_table_linked_chunks": sum(int(row.get("table_linked_chunks", 0)) for row in strategy_rows), "total_figure_linked_chunks": sum(int(row.get("figure_linked_chunks", 0)) for row in strategy_rows), "total_visual_context_chunks": sum(int(row.get("visual_context_chunks", 0)) for row in strategy_rows), } ) return sorted(leaderboard, key=lambda row: (row["mean_quality_score"], row["total_chunks"]), reverse=True) def _parser_leaderboard(rows: list[dict]) -> list[dict]: grouped: dict[str, list[dict]] = {} for row in rows: grouped.setdefault(row["parser"], []).append(row) leaderboard: list[dict] = [] for parser_name, parser_rows in grouped.items(): successes = [row for row in parser_rows if not row.get("failed")] leaderboard.append( { "parser": parser_name, "runs": len(parser_rows), "successes": len(successes), "failures": len(parser_rows) - len(successes), "mean_elapsed_seconds": _mean_value(successes, "elapsed_seconds"), "mean_text_coverage_ratio": _mean_value(successes, "text_coverage_ratio"), "mean_element_count": _mean_value(successes, "element_count"), "mean_table_count": _mean_value(successes, "table_count"), "mean_figure_count": _mean_value(successes, "figure_count"), "mean_valid_table_ratio": _mean_value(successes, "valid_table_ratio"), } ) return sorted( leaderboard, key=lambda row: (row["mean_text_coverage_ratio"], row["mean_valid_table_ratio"], -row["mean_elapsed_seconds"]), reverse=True, ) def _mean_value(rows: list[dict], key: str) -> float: values = [float(row[key]) for row in rows if row.get(key) is not None] return mean(values) if values else 0.0 def _write_leaderboard_csv(path: Path, rows: list[dict]) -> None: fieldnames = [ "parser", "runs", "successes", "failures", "mean_elapsed_seconds", "mean_text_coverage_ratio", "mean_element_count", "mean_table_count", "mean_figure_count", "mean_valid_table_ratio", ] _write_csv(path, rows, fieldnames) def _write_parser_rows_csv(path: Path, rows: list[dict]) -> None: fieldnames = [ "source_path", "doc_id", "parser", "failed", "error", "elapsed_seconds", "page_count", "element_count", "table_count", "figure_count", "text_chars", "expected_text_chars", "text_coverage_ratio", "valid_table_ratio", "has_bboxes", "has_page_images", ] normalized = [{field: row.get(field, "") for field in fieldnames} for row in rows] _write_csv(path, normalized, fieldnames) def _write_chunk_rows_csv(path: Path, rows: list[dict]) -> None: fieldnames = [ "source_path", "doc_id", "strategy", "quality_score", "chunk_count", "avg_tokens", "max_tokens", "table_linked_chunks", "figure_linked_chunks", "visual_context_chunks", ] normalized = [{field: row.get(field, "") for field in fieldnames} for row in rows] _write_csv(path, normalized, fieldnames) def _write_structure_rows_csv(path: Path, rows: list[dict]) -> None: fieldnames = [ "source_path", "doc_id", "table_count", "valid_table_count", "table_exact", "figure_count", "captioned_figure_count", "figure_caption_correct", "reading_order_issue_count", "reading_order_health", "document_text_coverage", ] normalized = [{field: row.get(field, "") for field in fieldnames} for row in rows] _write_csv(path, normalized, fieldnames) def _write_chunk_quality_rows_csv(path: Path, rows: list[dict]) -> None: fieldnames = [ "source_path", "doc_id", "chunk_count", "boundary_precision", "parent_child_resolution", "provenance_completeness", "retrieval_readiness", "table_chunk_coverage", "figure_chunk_coverage", "avg_tokens", "max_tokens", ] normalized = [{field: row.get(field, "") for field in fieldnames} for row in rows] _write_csv(path, normalized, fieldnames) def _write_throughput_rows_csv(path: Path, rows: list[dict]) -> None: fieldnames = [ "source_path", "doc_id", "page_count", "elapsed_seconds", "pages_per_second", "elements_per_second", "chunks_per_second", "gpu_task_count", "runtime_device", "max_gpu_seconds_per_doc", ] normalized = [{field: row.get(field, "") for field in fieldnames} for row in rows] _write_csv(path, normalized, fieldnames) def _write_csv(path: Path, rows: list[dict], fieldnames: list[str]) -> None: with path.open("w", encoding="utf-8", newline="") as handle: writer = csv.DictWriter(handle, fieldnames=fieldnames) writer.writeheader() writer.writerows(rows)