Spaces:
Running on Zero
Running on Zero
| """Parser-quality benchmark helpers.""" | |
| from __future__ import annotations | |
| import csv | |
| from pathlib import Path | |
| from statistics import mean | |
| from time import perf_counter | |
| from zsgdp.benchmarks.ablations import ablation_plan | |
| from zsgdp.benchmarks.chunking_quality import chunking_quality_record, score_chunking_quality | |
| from zsgdp.benchmarks.datasets import DatasetDocument, get_dataset_loader | |
| from zsgdp.benchmarks.ground_truth import ( | |
| doclaynet_layout_truths, | |
| omnidocbench_formula_truths, | |
| omnidocbench_layout_truths, | |
| omnidocbench_table_truths, | |
| parsed_formula_records, | |
| parsed_layout_predictions, | |
| parsed_table_records, | |
| ) | |
| from zsgdp.benchmarks.per_parser_metrics import compute_per_parser_metrics | |
| from zsgdp.benchmarks.retrieval import run_retrieval_for_document | |
| from zsgdp.benchmarks.structure_quality import score_structure_quality, structure_quality_record | |
| from zsgdp.benchmarks.throughput import summarize_throughput, throughput_record | |
| from zsgdp.config import load_config | |
| from zsgdp.pipeline import parse_document | |
| from zsgdp.utils import write_json | |
| from zsgdp.verify.formula_extraction import compute_formula_extraction | |
| from zsgdp.verify.layout_f1 import compute_layout_f1 | |
| from zsgdp.verify.retrieval import compute_retrieval_metrics | |
| from zsgdp.verify.table_structure import compute_table_structure_score | |
| GROUND_TRUTH_ADAPTERS = { | |
| "doclaynet": doclaynet_layout_truths, | |
| "omnidocbench": omnidocbench_layout_truths, | |
| } | |
| TABLE_TRUTH_ADAPTERS = { | |
| "omnidocbench": omnidocbench_table_truths, | |
| } | |
| FORMULA_TRUTH_ADAPTERS = { | |
| "omnidocbench": omnidocbench_formula_truths, | |
| } | |
| def score_parser_quality(records: list[dict]) -> dict[str, float]: | |
| if not records: | |
| return {"mean_quality_score": 0.0} | |
| return { | |
| "mean_quality_score": sum(float(record.get("quality_score", 0.0)) for record in records) / len(records) | |
| } | |
| def run_parser_benchmark( | |
| input_dir: str | Path, | |
| output_dir: str | Path, | |
| *, | |
| config_path: str | Path | None = None, | |
| selected_parsers: list[str] | None = None, | |
| dataset_name: str = "custom_folder", | |
| ) -> dict: | |
| input_root = Path(input_dir) | |
| output_root = Path(output_dir) | |
| output_root.mkdir(parents=True, exist_ok=True) | |
| loader = get_dataset_loader(dataset_name) | |
| bench_config = load_config(config_path) | |
| documents: list[dict] = [] | |
| parser_rows: list[dict] = [] | |
| chunk_rows: list[dict] = [] | |
| structure_rows: list[dict] = [] | |
| chunk_quality_rows: list[dict] = [] | |
| throughput_rows: list[dict] = [] | |
| repair_rows: list[dict] = [] | |
| layout_rows: list[dict] = [] | |
| table_structure_rows: list[dict] = [] | |
| formula_rows: list[dict] = [] | |
| retrieval_rows: list[dict] = [] | |
| per_parser_rows: list[dict] = [] | |
| observed_chunk_strategies: set[str] = set() | |
| for dataset_document in loader(input_root): | |
| path = dataset_document.path | |
| doc_out = output_root / "parsed" / (dataset_document.doc_id or path.stem) | |
| started = perf_counter() | |
| parsed = parse_document(path, doc_out, config_path=config_path, selected_parsers=selected_parsers) | |
| elapsed_seconds = perf_counter() - started | |
| chunk_strategy_counts = _chunk_strategy_counts(parsed.chunks) | |
| observed_chunk_strategies.update(chunk_strategy_counts) | |
| disagreement = parsed.provenance.get("parser_disagreement", {}) or {} | |
| repair_success = parsed.provenance.get("repair_success", {}) or {} | |
| layout_metrics = _layout_metrics(dataset_document, parsed) | |
| table_metrics = _table_structure_metrics(dataset_document, parsed) | |
| formula_metrics = _formula_extraction_metrics(dataset_document, parsed) | |
| retrieval_metrics = _retrieval_metrics(parsed, bench_config) | |
| contribution = _parser_contribution(parsed) | |
| per_parser = _compute_per_parser_block(dataset_document, parsed) | |
| doc_record = { | |
| "source_path": str(path), | |
| "dataset_id": dataset_document.dataset_id, | |
| "dataset_doc_id": dataset_document.doc_id, | |
| "has_ground_truth": dataset_document.ground_truth is not None, | |
| "doc_id": parsed.doc_id, | |
| "file_type": parsed.file_type, | |
| "quality_score": parsed.quality_report.score, | |
| "element_count": len(parsed.elements), | |
| "table_count": len(parsed.tables), | |
| "figure_count": len(parsed.figures), | |
| "chunk_count": len(parsed.chunks), | |
| "chunk_strategy_counts": chunk_strategy_counts, | |
| "chunk_quality_metrics": _chunk_quality_metrics(parsed.quality_report.metrics), | |
| "elapsed_seconds": elapsed_seconds, | |
| "parser_metrics": parsed.provenance.get("parser_metrics", {}), | |
| "parser_failures": parsed.provenance.get("parser_failures", {}), | |
| "parser_disagreement_rate": float(disagreement.get("disagreement_rate", 0.0)), | |
| "candidate_count": int(disagreement.get("candidate_count", 0)), | |
| "conflict_count": int(disagreement.get("conflict_count", 0)), | |
| "repair_resolution_rate": float(repair_success.get("repair_resolution_rate", 1.0)), | |
| "repair_regression_rate": float(repair_success.get("repair_regression_rate", 0.0)), | |
| "repair_iteration_count": int(repair_success.get("iteration_count", 0)), | |
| "layout_f1": layout_metrics["summary"]["class_aware_f1"], | |
| "layout_class_agnostic_f1": layout_metrics["summary"]["class_agnostic_f1"], | |
| "layout_evaluated": layout_metrics["summary"]["evaluated"], | |
| "table_structure_score": table_metrics["summary"]["mean_table_score"], | |
| "table_match_rate": table_metrics["summary"]["table_match_rate"], | |
| "table_structure_evaluated": table_metrics["summary"]["evaluated"], | |
| "formula_cer": formula_metrics["summary"]["mean_cer"], | |
| "formula_accuracy": formula_metrics["summary"]["mean_accuracy"], | |
| "formula_exact_match_rate": formula_metrics["summary"]["exact_match_rate"], | |
| "formula_evaluated": formula_metrics["summary"]["evaluated"], | |
| "retrieval_recall_at_1": retrieval_metrics["summary"]["recall_at_1"], | |
| "retrieval_recall_at_5": retrieval_metrics["summary"]["recall_at_5"], | |
| "retrieval_mrr": retrieval_metrics["summary"]["mean_reciprocal_rank"], | |
| "retrieval_query_count": retrieval_metrics["summary"]["query_count"], | |
| "retrieval_evaluated": retrieval_metrics["summary"]["evaluated"], | |
| "parser_contribution_counts": contribution["counts"], | |
| "parser_contribution_fractions": contribution["fractions"], | |
| "per_parser_metrics": per_parser, | |
| } | |
| documents.append(doc_record) | |
| for parser_name, block in per_parser.items(): | |
| per_parser_rows.append(_per_parser_row(path, parsed, dataset_document, parser_name, block)) | |
| repair_rows.append(_repair_row(path, parsed, repair_success, disagreement)) | |
| if layout_metrics["summary"]["evaluated"]: | |
| layout_rows.append(_layout_row(path, parsed, dataset_document, layout_metrics)) | |
| if table_metrics["summary"]["evaluated"]: | |
| table_structure_rows.append(_table_structure_row(path, parsed, dataset_document, table_metrics)) | |
| if formula_metrics["summary"]["evaluated"]: | |
| formula_rows.append(_formula_row(path, parsed, dataset_document, formula_metrics)) | |
| if retrieval_metrics["summary"]["evaluated"]: | |
| retrieval_rows.append(_retrieval_row(path, parsed, retrieval_metrics)) | |
| for parser_name, metrics in doc_record["parser_metrics"].items(): | |
| parser_rows.append( | |
| { | |
| "source_path": str(path), | |
| "doc_id": parsed.doc_id, | |
| "parser": parser_name, | |
| **metrics, | |
| } | |
| ) | |
| chunk_rows.extend(_chunk_strategy_rows(path, parsed)) | |
| structure_rows.append(structure_quality_record(parsed, str(path))) | |
| chunk_quality_rows.append(chunking_quality_record(parsed, str(path))) | |
| throughput_rows.append(throughput_record(parsed, str(path), elapsed_seconds)) | |
| config = bench_config | |
| parser_contribution_summary = _aggregate_parser_contributions(documents) | |
| summary = { | |
| "dataset_name": dataset_name, | |
| "dataset_root": str(input_root), | |
| "document_count": len(documents), | |
| "parser_contribution_summary": parser_contribution_summary, | |
| "mean_quality_score": mean([doc["quality_score"] for doc in documents]) if documents else 0.0, | |
| "mean_parser_disagreement_rate": _mean_value(documents, "parser_disagreement_rate"), | |
| "mean_repair_resolution_rate": _mean_value(documents, "repair_resolution_rate"), | |
| "mean_repair_regression_rate": _mean_value(documents, "repair_regression_rate"), | |
| "mean_layout_f1": _mean_value( | |
| [doc for doc in documents if doc.get("layout_evaluated")], "layout_f1" | |
| ), | |
| "mean_layout_class_agnostic_f1": _mean_value( | |
| [doc for doc in documents if doc.get("layout_evaluated")], "layout_class_agnostic_f1" | |
| ), | |
| "layout_evaluated_count": sum(1 for doc in documents if doc.get("layout_evaluated")), | |
| "mean_table_structure_score": _mean_value( | |
| [doc for doc in documents if doc.get("table_structure_evaluated")], "table_structure_score" | |
| ), | |
| "mean_table_match_rate": _mean_value( | |
| [doc for doc in documents if doc.get("table_structure_evaluated")], "table_match_rate" | |
| ), | |
| "table_structure_evaluated_count": sum(1 for doc in documents if doc.get("table_structure_evaluated")), | |
| "mean_formula_cer": _mean_value( | |
| [doc for doc in documents if doc.get("formula_evaluated")], "formula_cer" | |
| ), | |
| "mean_formula_accuracy": _mean_value( | |
| [doc for doc in documents if doc.get("formula_evaluated")], "formula_accuracy" | |
| ), | |
| "formula_evaluated_count": sum(1 for doc in documents if doc.get("formula_evaluated")), | |
| "mean_retrieval_recall_at_1": _mean_value( | |
| [doc for doc in documents if doc.get("retrieval_evaluated")], "retrieval_recall_at_1" | |
| ), | |
| "mean_retrieval_recall_at_5": _mean_value( | |
| [doc for doc in documents if doc.get("retrieval_evaluated")], "retrieval_recall_at_5" | |
| ), | |
| "mean_retrieval_mrr": _mean_value( | |
| [doc for doc in documents if doc.get("retrieval_evaluated")], "retrieval_mrr" | |
| ), | |
| "retrieval_evaluated_count": sum(1 for doc in documents if doc.get("retrieval_evaluated")), | |
| "documents": documents, | |
| "parser_leaderboard": _parser_leaderboard(parser_rows), | |
| "per_parser_gt_leaderboard": _per_parser_gt_leaderboard(per_parser_rows), | |
| "chunk_strategy_leaderboard": _chunk_strategy_leaderboard(chunk_rows), | |
| "structure_quality": score_structure_quality(structure_rows), | |
| "chunking_quality": score_chunking_quality(chunk_quality_rows), | |
| "throughput": summarize_throughput(throughput_rows), | |
| "ablation_plan": ablation_plan(config, observed_chunk_strategies), | |
| } | |
| write_json(output_root / "results.json", summary) | |
| write_json(output_root / "ablations.json", summary["ablation_plan"]) | |
| _write_leaderboard_csv(output_root / "leaderboard.csv", summary["parser_leaderboard"]) | |
| _write_parser_rows_csv(output_root / "parser_runs.csv", parser_rows) | |
| _write_chunk_rows_csv(output_root / "chunk_runs.csv", chunk_rows) | |
| _write_structure_rows_csv(output_root / "structure_runs.csv", structure_rows) | |
| _write_chunk_quality_rows_csv(output_root / "chunk_quality.csv", chunk_quality_rows) | |
| _write_throughput_rows_csv(output_root / "throughput_runs.csv", throughput_rows) | |
| _write_repair_rows_csv(output_root / "repair_runs.csv", repair_rows) | |
| _write_layout_rows_csv(output_root / "layout_runs.csv", layout_rows) | |
| _write_table_structure_rows_csv(output_root / "table_structure_runs.csv", table_structure_rows) | |
| _write_formula_rows_csv(output_root / "formula_runs.csv", formula_rows) | |
| _write_retrieval_rows_csv(output_root / "retrieval_runs.csv", retrieval_rows) | |
| _write_per_parser_rows_csv(output_root / "per_parser_metrics.csv", per_parser_rows) | |
| _write_per_parser_gt_leaderboard_csv( | |
| output_root / "per_parser_gt_leaderboard.csv", summary["per_parser_gt_leaderboard"] | |
| ) | |
| return summary | |
| def _per_parser_gt_leaderboard(rows: list[dict]) -> list[dict]: | |
| """Aggregate per-document per-parser rows into one leaderboard row per parser. | |
| A metric contributes to a parser's mean only when that parser actually | |
| had a non-zero prediction count for that metric on that document; this | |
| keeps "0.00 from no predictions" from dragging the mean down for parsers | |
| that simply don't emit bboxes (text/markdown). The number of documents | |
| contributing to each metric is reported alongside the mean. | |
| """ | |
| grouped: dict[str, list[dict]] = {} | |
| for row in rows: | |
| grouped.setdefault(row["parser"], []).append(row) | |
| leaderboard: list[dict] = [] | |
| for parser_name, parser_rows in grouped.items(): | |
| layout_rows = [row for row in parser_rows if row.get("layout_evaluated")] | |
| table_rows = [row for row in parser_rows if row.get("table_evaluated")] | |
| formula_rows = [row for row in parser_rows if row.get("formula_evaluated")] | |
| leaderboard.append( | |
| { | |
| "parser": parser_name, | |
| "document_count": len(parser_rows), | |
| "layout_evaluated_count": len(layout_rows), | |
| "mean_layout_class_aware_f1": _mean_value(layout_rows, "layout_class_aware_f1"), | |
| "mean_layout_class_agnostic_f1": _mean_value(layout_rows, "layout_class_agnostic_f1"), | |
| "mean_layout_precision": _mean_value(layout_rows, "layout_class_aware_precision"), | |
| "mean_layout_recall": _mean_value(layout_rows, "layout_class_aware_recall"), | |
| "table_evaluated_count": len(table_rows), | |
| "mean_table_structure_score": _mean_value(table_rows, "table_structure_score"), | |
| "mean_table_match_rate": _mean_value(table_rows, "table_match_rate"), | |
| "mean_table_cell_content_f1": _mean_value(table_rows, "table_cell_content_f1"), | |
| "formula_evaluated_count": len(formula_rows), | |
| "mean_formula_cer": _mean_value(formula_rows, "formula_cer"), | |
| "mean_formula_accuracy": _mean_value(formula_rows, "formula_accuracy"), | |
| "mean_formula_exact_match_rate": _mean_value(formula_rows, "formula_exact_match_rate"), | |
| "mean_element_count": _mean_value(parser_rows, "element_count"), | |
| "mean_table_count": _mean_value(parser_rows, "table_count"), | |
| "mean_figure_count": _mean_value(parser_rows, "figure_count"), | |
| } | |
| ) | |
| return sorted( | |
| leaderboard, | |
| key=lambda row: ( | |
| row["mean_layout_class_aware_f1"], | |
| row["mean_table_structure_score"], | |
| -row["mean_formula_cer"], | |
| ), | |
| reverse=True, | |
| ) | |
| def _write_per_parser_gt_leaderboard_csv(path: Path, rows: list[dict]) -> None: | |
| fieldnames = [ | |
| "parser", | |
| "document_count", | |
| "layout_evaluated_count", | |
| "mean_layout_class_aware_f1", | |
| "mean_layout_class_agnostic_f1", | |
| "mean_layout_precision", | |
| "mean_layout_recall", | |
| "table_evaluated_count", | |
| "mean_table_structure_score", | |
| "mean_table_match_rate", | |
| "mean_table_cell_content_f1", | |
| "formula_evaluated_count", | |
| "mean_formula_cer", | |
| "mean_formula_accuracy", | |
| "mean_formula_exact_match_rate", | |
| "mean_element_count", | |
| "mean_table_count", | |
| "mean_figure_count", | |
| ] | |
| normalized = [{field: row.get(field, "") for field in fieldnames} for row in rows] | |
| _write_csv(path, normalized, fieldnames) | |
| def _compute_per_parser_block(dataset_document: DatasetDocument, parsed) -> dict[str, dict[str, Any]]: | |
| layout_adapter = GROUND_TRUTH_ADAPTERS.get(dataset_document.dataset_id) | |
| table_adapter = TABLE_TRUTH_ADAPTERS.get(dataset_document.dataset_id) | |
| formula_adapter = FORMULA_TRUTH_ADAPTERS.get(dataset_document.dataset_id) | |
| gt = dataset_document.ground_truth | |
| layout_truths = layout_adapter(gt) if (layout_adapter and gt is not None) else None | |
| table_truths = table_adapter(gt) if (table_adapter and gt is not None) else None | |
| formula_truths = formula_adapter(gt) if (formula_adapter and gt is not None) else None | |
| if not (layout_truths or table_truths or formula_truths): | |
| return {} | |
| return compute_per_parser_metrics( | |
| parsed, | |
| layout_truths=layout_truths or None, | |
| table_truths=table_truths or None, | |
| formula_truths=formula_truths or None, | |
| ) | |
| def _per_parser_row( | |
| path: Path, | |
| parsed, | |
| dataset_document: DatasetDocument, | |
| parser_name: str, | |
| block: dict[str, Any], | |
| ) -> dict[str, Any]: | |
| layout = block.get("layout") or {} | |
| table = block.get("table_structure") or {} | |
| formula = block.get("formula") or {} | |
| return { | |
| "source_path": str(path), | |
| "doc_id": parsed.doc_id, | |
| "dataset_id": dataset_document.dataset_id, | |
| "parser": parser_name, | |
| "element_count": int(block.get("element_count", 0)), | |
| "table_count": int(block.get("table_count", 0)), | |
| "figure_count": int(block.get("figure_count", 0)), | |
| "layout_evaluated": "layout" in block, | |
| "table_evaluated": "table_structure" in block, | |
| "formula_evaluated": "formula" in block, | |
| "layout_prediction_count": int(layout.get("prediction_count", 0)), | |
| "layout_class_aware_f1": float(layout.get("class_aware_f1", 0.0)), | |
| "layout_class_aware_precision": float(layout.get("class_aware_precision", 0.0)), | |
| "layout_class_aware_recall": float(layout.get("class_aware_recall", 0.0)), | |
| "layout_class_agnostic_f1": float(layout.get("class_agnostic_f1", 0.0)), | |
| "table_structure_score": float(table.get("mean_table_score", 0.0)), | |
| "table_match_rate": float(table.get("table_match_rate", 0.0)), | |
| "table_cell_content_f1": float(table.get("mean_cell_content_f1", 0.0)), | |
| "formula_cer": float(formula.get("mean_cer", 0.0)) if formula else 0.0, | |
| "formula_accuracy": float(formula.get("mean_accuracy", 0.0)) if formula else 0.0, | |
| "formula_exact_match_rate": float(formula.get("exact_match_rate", 0.0)) if formula else 0.0, | |
| } | |
| def _write_per_parser_rows_csv(path: Path, rows: list[dict]) -> None: | |
| fieldnames = [ | |
| "source_path", | |
| "doc_id", | |
| "dataset_id", | |
| "parser", | |
| "element_count", | |
| "table_count", | |
| "figure_count", | |
| "layout_evaluated", | |
| "table_evaluated", | |
| "formula_evaluated", | |
| "layout_prediction_count", | |
| "layout_class_aware_f1", | |
| "layout_class_aware_precision", | |
| "layout_class_aware_recall", | |
| "layout_class_agnostic_f1", | |
| "table_structure_score", | |
| "table_match_rate", | |
| "table_cell_content_f1", | |
| "formula_cer", | |
| "formula_accuracy", | |
| "formula_exact_match_rate", | |
| ] | |
| normalized = [{field: row.get(field, "") for field in fieldnames} for row in rows] | |
| _write_csv(path, normalized, fieldnames) | |
| def _retrieval_metrics(parsed, config: dict | None = None) -> dict: | |
| from zsgdp.benchmarks.embedding_retriever import build_retriever | |
| retriever = build_retriever(config) if config else None | |
| run = run_retrieval_for_document(parsed, retriever=retriever) | |
| if not run["evaluated"]: | |
| return { | |
| "summary": { | |
| "evaluated": False, | |
| "query_count": 0, | |
| "recall_at_1": 0.0, | |
| "recall_at_5": 0.0, | |
| "mean_reciprocal_rank": 0.0, | |
| }, | |
| "metrics": None, | |
| "reason": run.get("reason"), | |
| } | |
| metrics = compute_retrieval_metrics( | |
| ((result["retrieved"], result["truths"]) for result in run["results"]), | |
| ) | |
| return { | |
| "summary": { | |
| "evaluated": True, | |
| "query_count": int(metrics["query_count"]), | |
| "recall_at_1": float(metrics["recall_at_k"].get(1, 0.0)), | |
| "recall_at_5": float(metrics["recall_at_k"].get(5, 0.0)), | |
| "mean_reciprocal_rank": float(metrics["mean_reciprocal_rank"]), | |
| }, | |
| "metrics": metrics, | |
| "reason": None, | |
| } | |
| def _retrieval_row(path: Path, parsed, retrieval_metrics: dict) -> dict: | |
| detail = retrieval_metrics["metrics"] or {} | |
| recall = detail.get("recall_at_k", {}) | |
| return { | |
| "source_path": str(path), | |
| "doc_id": parsed.doc_id, | |
| "query_count": int(detail.get("query_count", 0)), | |
| "recall_at_1": float(recall.get(1, 0.0)), | |
| "recall_at_3": float(recall.get(3, 0.0)), | |
| "recall_at_5": float(recall.get(5, 0.0)), | |
| "mean_reciprocal_rank": float(detail.get("mean_reciprocal_rank", 0.0)), | |
| "citation_accuracy_at_5": float(detail.get("citation_accuracy_at_k", {}).get(5, 0.0)), | |
| } | |
| def _parser_contribution(parsed) -> dict[str, Any]: | |
| """Count which parser produced each merged element/table/figure. | |
| Counts are over the *post-merge* output, not the pre-merge candidates. | |
| This is a "contribution" view (which parser's output survived) rather | |
| than an "ablation" view (which parser would do best alone). | |
| """ | |
| counts: dict[str, int] = {} | |
| for element in parsed.elements: | |
| counts[element.source_parser] = counts.get(element.source_parser, 0) + 1 | |
| for table in parsed.tables: | |
| counts[table.source_parser] = counts.get(table.source_parser, 0) + 1 | |
| for figure in parsed.figures: | |
| counts[figure.source_parser] = counts.get(figure.source_parser, 0) + 1 | |
| total = sum(counts.values()) | |
| fractions = {parser: (count / total) for parser, count in counts.items()} if total else {} | |
| return {"counts": counts, "fractions": fractions, "total": total} | |
| def _aggregate_parser_contributions(documents: list[dict]) -> dict[str, Any]: | |
| parser_totals: dict[str, int] = {} | |
| grand_total = 0 | |
| for doc in documents: | |
| counts = doc.get("parser_contribution_counts") or {} | |
| if not isinstance(counts, dict): | |
| continue | |
| for parser, count in counts.items(): | |
| parser_totals[parser] = parser_totals.get(parser, 0) + int(count) | |
| grand_total += int(count) | |
| fractions = {parser: (count / grand_total) for parser, count in parser_totals.items()} if grand_total else {} | |
| return {"counts": dict(sorted(parser_totals.items())), "fractions": dict(sorted(fractions.items())), "total": grand_total} | |
| def _write_retrieval_rows_csv(path: Path, rows: list[dict]) -> None: | |
| fieldnames = [ | |
| "source_path", | |
| "doc_id", | |
| "query_count", | |
| "recall_at_1", | |
| "recall_at_3", | |
| "recall_at_5", | |
| "mean_reciprocal_rank", | |
| "citation_accuracy_at_5", | |
| ] | |
| normalized = [{field: row.get(field, "") for field in fieldnames} for row in rows] | |
| _write_csv(path, normalized, fieldnames) | |
| def _table_structure_metrics(dataset_document: DatasetDocument, parsed) -> dict: | |
| adapter = TABLE_TRUTH_ADAPTERS.get(dataset_document.dataset_id) | |
| if adapter is None or dataset_document.ground_truth is None: | |
| return { | |
| "summary": {"evaluated": False, "mean_table_score": 1.0, "table_match_rate": 1.0}, | |
| "metrics": None, | |
| "reason": "no_table_truth_adapter" if adapter is None else "no_ground_truth", | |
| } | |
| truths = adapter(dataset_document.ground_truth) | |
| predictions = parsed_table_records(parsed) | |
| if not truths and not predictions: | |
| return { | |
| "summary": {"evaluated": False, "mean_table_score": 1.0, "table_match_rate": 1.0}, | |
| "metrics": None, | |
| "reason": "no_truths_and_no_predictions", | |
| } | |
| metrics = compute_table_structure_score(predictions, truths) | |
| return { | |
| "summary": { | |
| "evaluated": True, | |
| "mean_table_score": float(metrics["mean_table_score"]), | |
| "table_match_rate": float(metrics["table_match_rate"]), | |
| }, | |
| "metrics": metrics, | |
| "reason": None, | |
| } | |
| def _formula_extraction_metrics(dataset_document: DatasetDocument, parsed) -> dict: | |
| adapter = FORMULA_TRUTH_ADAPTERS.get(dataset_document.dataset_id) | |
| if adapter is None or dataset_document.ground_truth is None: | |
| return { | |
| "summary": {"evaluated": False, "mean_cer": 0.0, "mean_accuracy": 1.0, "exact_match_rate": 1.0}, | |
| "metrics": None, | |
| "reason": "no_formula_truth_adapter" if adapter is None else "no_ground_truth", | |
| } | |
| truths = adapter(dataset_document.ground_truth) | |
| predictions = parsed_formula_records(parsed) | |
| if not truths and not predictions: | |
| return { | |
| "summary": {"evaluated": False, "mean_cer": 0.0, "mean_accuracy": 1.0, "exact_match_rate": 1.0}, | |
| "metrics": None, | |
| "reason": "no_truths_and_no_predictions", | |
| } | |
| metrics = compute_formula_extraction(predictions, truths) | |
| return { | |
| "summary": { | |
| "evaluated": True, | |
| "mean_cer": float(metrics["mean_cer"]), | |
| "mean_accuracy": float(metrics["mean_accuracy"]), | |
| "exact_match_rate": float(metrics["exact_match_rate"]), | |
| }, | |
| "metrics": metrics, | |
| "reason": None, | |
| } | |
| def _table_structure_row(path: Path, parsed, dataset_document: DatasetDocument, metrics: dict) -> dict: | |
| detail = metrics["metrics"] or {} | |
| return { | |
| "source_path": str(path), | |
| "doc_id": parsed.doc_id, | |
| "dataset_id": dataset_document.dataset_id, | |
| "prediction_count": int(detail.get("prediction_count", 0)), | |
| "truth_count": int(detail.get("truth_count", 0)), | |
| "matched_pair_count": int(detail.get("matched_pair_count", 0)), | |
| "table_match_rate": float(detail.get("table_match_rate", 0.0)), | |
| "mean_table_score": float(detail.get("mean_table_score", 0.0)), | |
| "mean_shape_similarity": float(detail.get("mean_shape_similarity", 0.0)), | |
| "mean_cell_content_f1": float(detail.get("mean_cell_content_f1", 0.0)), | |
| "table_count_delta": int(detail.get("table_count_delta", 0)), | |
| } | |
| def _formula_row(path: Path, parsed, dataset_document: DatasetDocument, metrics: dict) -> dict: | |
| detail = metrics["metrics"] or {} | |
| return { | |
| "source_path": str(path), | |
| "doc_id": parsed.doc_id, | |
| "dataset_id": dataset_document.dataset_id, | |
| "prediction_count": int(detail.get("prediction_count", 0)), | |
| "truth_count": int(detail.get("truth_count", 0)), | |
| "matched_pair_count": int(detail.get("matched_pair_count", 0)), | |
| "mean_cer": float(detail.get("mean_cer", 1.0)), | |
| "mean_accuracy": float(detail.get("mean_accuracy", 0.0)), | |
| "exact_match_rate": float(detail.get("exact_match_rate", 0.0)), | |
| "formula_precision": float(detail.get("formula_precision", 0.0)), | |
| "formula_recall": float(detail.get("formula_recall", 0.0)), | |
| } | |
| def _write_table_structure_rows_csv(path: Path, rows: list[dict]) -> None: | |
| fieldnames = [ | |
| "source_path", | |
| "doc_id", | |
| "dataset_id", | |
| "prediction_count", | |
| "truth_count", | |
| "matched_pair_count", | |
| "table_match_rate", | |
| "mean_table_score", | |
| "mean_shape_similarity", | |
| "mean_cell_content_f1", | |
| "table_count_delta", | |
| ] | |
| normalized = [{field: row.get(field, "") for field in fieldnames} for row in rows] | |
| _write_csv(path, normalized, fieldnames) | |
| def _write_formula_rows_csv(path: Path, rows: list[dict]) -> None: | |
| fieldnames = [ | |
| "source_path", | |
| "doc_id", | |
| "dataset_id", | |
| "prediction_count", | |
| "truth_count", | |
| "matched_pair_count", | |
| "mean_cer", | |
| "mean_accuracy", | |
| "exact_match_rate", | |
| "formula_precision", | |
| "formula_recall", | |
| ] | |
| normalized = [{field: row.get(field, "") for field in fieldnames} for row in rows] | |
| _write_csv(path, normalized, fieldnames) | |
| def _layout_metrics(dataset_document: DatasetDocument, parsed) -> dict: | |
| adapter = GROUND_TRUTH_ADAPTERS.get(dataset_document.dataset_id) | |
| if adapter is None or dataset_document.ground_truth is None: | |
| return { | |
| "summary": {"evaluated": False, "class_aware_f1": 0.0, "class_agnostic_f1": 0.0}, | |
| "metrics": None, | |
| "reason": "no_ground_truth_adapter" if adapter is None else "no_ground_truth", | |
| } | |
| truths = adapter(dataset_document.ground_truth) | |
| predictions = parsed_layout_predictions(parsed) | |
| if not truths and not predictions: | |
| return { | |
| "summary": {"evaluated": False, "class_aware_f1": 0.0, "class_agnostic_f1": 0.0}, | |
| "metrics": None, | |
| "reason": "no_truths_and_no_predictions", | |
| } | |
| metrics = compute_layout_f1(predictions, truths) | |
| return { | |
| "summary": { | |
| "evaluated": True, | |
| "class_aware_f1": float(metrics["class_aware"]["f1"]), | |
| "class_agnostic_f1": float(metrics["class_agnostic"]["f1"]), | |
| }, | |
| "metrics": metrics, | |
| "reason": None, | |
| } | |
| def _layout_row(path: Path, parsed, dataset_document: DatasetDocument, layout_metrics: dict) -> dict: | |
| metrics = layout_metrics["metrics"] or {} | |
| class_aware = metrics.get("class_aware", {}) | |
| class_agnostic = metrics.get("class_agnostic", {}) | |
| return { | |
| "source_path": str(path), | |
| "doc_id": parsed.doc_id, | |
| "dataset_id": dataset_document.dataset_id, | |
| "iou_threshold": float(metrics.get("iou_threshold", 0.5)), | |
| "prediction_count": int(metrics.get("prediction_count", 0)), | |
| "truth_count": int(metrics.get("truth_count", 0)), | |
| "class_aware_precision": float(class_aware.get("precision", 0.0)), | |
| "class_aware_recall": float(class_aware.get("recall", 0.0)), | |
| "class_aware_f1": float(class_aware.get("f1", 0.0)), | |
| "class_agnostic_precision": float(class_agnostic.get("precision", 0.0)), | |
| "class_agnostic_recall": float(class_agnostic.get("recall", 0.0)), | |
| "class_agnostic_f1": float(class_agnostic.get("f1", 0.0)), | |
| } | |
| def _write_layout_rows_csv(path: Path, rows: list[dict]) -> None: | |
| fieldnames = [ | |
| "source_path", | |
| "doc_id", | |
| "dataset_id", | |
| "iou_threshold", | |
| "prediction_count", | |
| "truth_count", | |
| "class_aware_precision", | |
| "class_aware_recall", | |
| "class_aware_f1", | |
| "class_agnostic_precision", | |
| "class_agnostic_recall", | |
| "class_agnostic_f1", | |
| ] | |
| normalized = [{field: row.get(field, "") for field in fieldnames} for row in rows] | |
| _write_csv(path, normalized, fieldnames) | |
| def _repair_row(path: Path, parsed, repair_success: dict, disagreement: dict) -> dict: | |
| return { | |
| "source_path": str(path), | |
| "doc_id": parsed.doc_id, | |
| "candidate_count": int(disagreement.get("candidate_count", 0)), | |
| "parser_disagreement_rate": float(disagreement.get("disagreement_rate", 0.0)), | |
| "conflict_count": int(disagreement.get("conflict_count", 0)), | |
| "iteration_count": int(repair_success.get("iteration_count", 0)), | |
| "total_actions": int(repair_success.get("total_actions", 0)), | |
| "score_delta": float(repair_success.get("score_delta", 0.0)), | |
| "pre_repair_blocking_count": int(repair_success.get("pre_repair_blocking_count", 0)), | |
| "post_repair_blocking_count": int(repair_success.get("post_repair_blocking_count", 0)), | |
| "resolved_blocking_count": int(repair_success.get("resolved_blocking_count", 0)), | |
| "regressed_blocking_count": int(repair_success.get("regressed_blocking_count", 0)), | |
| "repair_resolution_rate": float(repair_success.get("repair_resolution_rate", 1.0)), | |
| "repair_regression_rate": float(repair_success.get("repair_regression_rate", 0.0)), | |
| } | |
| def _write_repair_rows_csv(path: Path, rows: list[dict]) -> None: | |
| fieldnames = [ | |
| "source_path", | |
| "doc_id", | |
| "candidate_count", | |
| "parser_disagreement_rate", | |
| "conflict_count", | |
| "iteration_count", | |
| "total_actions", | |
| "score_delta", | |
| "pre_repair_blocking_count", | |
| "post_repair_blocking_count", | |
| "resolved_blocking_count", | |
| "regressed_blocking_count", | |
| "repair_resolution_rate", | |
| "repair_regression_rate", | |
| ] | |
| normalized = [{field: row.get(field, "") for field in fieldnames} for row in rows] | |
| _write_csv(path, normalized, fieldnames) | |
| def _chunk_strategy_counts(chunks: list) -> dict[str, int]: | |
| counts: dict[str, int] = {} | |
| for chunk in chunks: | |
| counts[chunk.strategy] = counts.get(chunk.strategy, 0) + 1 | |
| return counts | |
| def _chunk_quality_metrics(metrics: dict) -> dict: | |
| keys = [ | |
| "chunk_count", | |
| "parent_chunk_count", | |
| "child_chunk_count", | |
| "avg_chunk_tokens", | |
| "max_chunk_tokens", | |
| "table_chunk_coverage", | |
| "figure_chunk_coverage", | |
| ] | |
| return {key: metrics[key] for key in keys if key in metrics} | |
| def _chunk_strategy_rows(path: Path, parsed) -> list[dict]: | |
| grouped: dict[str, list] = {} | |
| for chunk in parsed.chunks: | |
| grouped.setdefault(chunk.strategy, []).append(chunk) | |
| rows: list[dict] = [] | |
| for strategy, chunks in sorted(grouped.items()): | |
| token_counts = [chunk.token_count for chunk in chunks] | |
| rows.append( | |
| { | |
| "source_path": str(path), | |
| "doc_id": parsed.doc_id, | |
| "strategy": strategy, | |
| "quality_score": parsed.quality_report.score, | |
| "chunk_count": len(chunks), | |
| "avg_tokens": mean(token_counts) if token_counts else 0.0, | |
| "max_tokens": max(token_counts) if token_counts else 0, | |
| "table_linked_chunks": sum(1 for chunk in chunks if chunk.table_ids), | |
| "figure_linked_chunks": sum(1 for chunk in chunks if chunk.figure_ids), | |
| "visual_context_chunks": sum(1 for chunk in chunks if chunk.requires_visual_context), | |
| } | |
| ) | |
| return rows | |
| def _chunk_strategy_leaderboard(rows: list[dict]) -> list[dict]: | |
| grouped: dict[str, list[dict]] = {} | |
| for row in rows: | |
| grouped.setdefault(row["strategy"], []).append(row) | |
| leaderboard: list[dict] = [] | |
| for strategy, strategy_rows in grouped.items(): | |
| leaderboard.append( | |
| { | |
| "strategy": strategy, | |
| "runs": len(strategy_rows), | |
| "total_chunks": sum(int(row.get("chunk_count", 0)) for row in strategy_rows), | |
| "mean_chunk_count": _mean_value(strategy_rows, "chunk_count"), | |
| "mean_avg_tokens": _mean_value(strategy_rows, "avg_tokens"), | |
| "mean_max_tokens": _mean_value(strategy_rows, "max_tokens"), | |
| "mean_quality_score": _mean_value(strategy_rows, "quality_score"), | |
| "total_table_linked_chunks": sum(int(row.get("table_linked_chunks", 0)) for row in strategy_rows), | |
| "total_figure_linked_chunks": sum(int(row.get("figure_linked_chunks", 0)) for row in strategy_rows), | |
| "total_visual_context_chunks": sum(int(row.get("visual_context_chunks", 0)) for row in strategy_rows), | |
| } | |
| ) | |
| return sorted(leaderboard, key=lambda row: (row["mean_quality_score"], row["total_chunks"]), reverse=True) | |
| def _parser_leaderboard(rows: list[dict]) -> list[dict]: | |
| grouped: dict[str, list[dict]] = {} | |
| for row in rows: | |
| grouped.setdefault(row["parser"], []).append(row) | |
| leaderboard: list[dict] = [] | |
| for parser_name, parser_rows in grouped.items(): | |
| successes = [row for row in parser_rows if not row.get("failed")] | |
| leaderboard.append( | |
| { | |
| "parser": parser_name, | |
| "runs": len(parser_rows), | |
| "successes": len(successes), | |
| "failures": len(parser_rows) - len(successes), | |
| "mean_elapsed_seconds": _mean_value(successes, "elapsed_seconds"), | |
| "mean_text_coverage_ratio": _mean_value(successes, "text_coverage_ratio"), | |
| "mean_element_count": _mean_value(successes, "element_count"), | |
| "mean_table_count": _mean_value(successes, "table_count"), | |
| "mean_figure_count": _mean_value(successes, "figure_count"), | |
| "mean_valid_table_ratio": _mean_value(successes, "valid_table_ratio"), | |
| } | |
| ) | |
| return sorted( | |
| leaderboard, | |
| key=lambda row: (row["mean_text_coverage_ratio"], row["mean_valid_table_ratio"], -row["mean_elapsed_seconds"]), | |
| reverse=True, | |
| ) | |
| def _mean_value(rows: list[dict], key: str) -> float: | |
| values = [float(row[key]) for row in rows if row.get(key) is not None] | |
| return mean(values) if values else 0.0 | |
| def _write_leaderboard_csv(path: Path, rows: list[dict]) -> None: | |
| fieldnames = [ | |
| "parser", | |
| "runs", | |
| "successes", | |
| "failures", | |
| "mean_elapsed_seconds", | |
| "mean_text_coverage_ratio", | |
| "mean_element_count", | |
| "mean_table_count", | |
| "mean_figure_count", | |
| "mean_valid_table_ratio", | |
| ] | |
| _write_csv(path, rows, fieldnames) | |
| def _write_parser_rows_csv(path: Path, rows: list[dict]) -> None: | |
| fieldnames = [ | |
| "source_path", | |
| "doc_id", | |
| "parser", | |
| "failed", | |
| "error", | |
| "elapsed_seconds", | |
| "page_count", | |
| "element_count", | |
| "table_count", | |
| "figure_count", | |
| "text_chars", | |
| "expected_text_chars", | |
| "text_coverage_ratio", | |
| "valid_table_ratio", | |
| "has_bboxes", | |
| "has_page_images", | |
| ] | |
| normalized = [{field: row.get(field, "") for field in fieldnames} for row in rows] | |
| _write_csv(path, normalized, fieldnames) | |
| def _write_chunk_rows_csv(path: Path, rows: list[dict]) -> None: | |
| fieldnames = [ | |
| "source_path", | |
| "doc_id", | |
| "strategy", | |
| "quality_score", | |
| "chunk_count", | |
| "avg_tokens", | |
| "max_tokens", | |
| "table_linked_chunks", | |
| "figure_linked_chunks", | |
| "visual_context_chunks", | |
| ] | |
| normalized = [{field: row.get(field, "") for field in fieldnames} for row in rows] | |
| _write_csv(path, normalized, fieldnames) | |
| def _write_structure_rows_csv(path: Path, rows: list[dict]) -> None: | |
| fieldnames = [ | |
| "source_path", | |
| "doc_id", | |
| "table_count", | |
| "valid_table_count", | |
| "table_exact", | |
| "figure_count", | |
| "captioned_figure_count", | |
| "figure_caption_correct", | |
| "reading_order_issue_count", | |
| "reading_order_health", | |
| "document_text_coverage", | |
| ] | |
| normalized = [{field: row.get(field, "") for field in fieldnames} for row in rows] | |
| _write_csv(path, normalized, fieldnames) | |
| def _write_chunk_quality_rows_csv(path: Path, rows: list[dict]) -> None: | |
| fieldnames = [ | |
| "source_path", | |
| "doc_id", | |
| "chunk_count", | |
| "boundary_precision", | |
| "parent_child_resolution", | |
| "provenance_completeness", | |
| "retrieval_readiness", | |
| "table_chunk_coverage", | |
| "figure_chunk_coverage", | |
| "avg_tokens", | |
| "max_tokens", | |
| ] | |
| normalized = [{field: row.get(field, "") for field in fieldnames} for row in rows] | |
| _write_csv(path, normalized, fieldnames) | |
| def _write_throughput_rows_csv(path: Path, rows: list[dict]) -> None: | |
| fieldnames = [ | |
| "source_path", | |
| "doc_id", | |
| "page_count", | |
| "elapsed_seconds", | |
| "pages_per_second", | |
| "elements_per_second", | |
| "chunks_per_second", | |
| "gpu_task_count", | |
| "runtime_device", | |
| "max_gpu_seconds_per_doc", | |
| ] | |
| normalized = [{field: row.get(field, "") for field in fieldnames} for row in rows] | |
| _write_csv(path, normalized, fieldnames) | |
| def _write_csv(path: Path, rows: list[dict], fieldnames: list[str]) -> None: | |
| with path.open("w", encoding="utf-8", newline="") as handle: | |
| writer = csv.DictWriter(handle, fieldnames=fieldnames) | |
| writer.writeheader() | |
| writer.writerows(rows) | |