zeroshotGPU / zsgdp /benchmarks /parser_quality.py
Arjunvir Singh
Initial commit: zeroshotGPU MVP with full eval surface
db06ffa
"""Parser-quality benchmark helpers."""
from __future__ import annotations
import csv
from pathlib import Path
from statistics import mean
from time import perf_counter
from zsgdp.benchmarks.ablations import ablation_plan
from zsgdp.benchmarks.chunking_quality import chunking_quality_record, score_chunking_quality
from zsgdp.benchmarks.datasets import DatasetDocument, get_dataset_loader
from zsgdp.benchmarks.ground_truth import (
doclaynet_layout_truths,
omnidocbench_formula_truths,
omnidocbench_layout_truths,
omnidocbench_table_truths,
parsed_formula_records,
parsed_layout_predictions,
parsed_table_records,
)
from zsgdp.benchmarks.per_parser_metrics import compute_per_parser_metrics
from zsgdp.benchmarks.retrieval import run_retrieval_for_document
from zsgdp.benchmarks.structure_quality import score_structure_quality, structure_quality_record
from zsgdp.benchmarks.throughput import summarize_throughput, throughput_record
from zsgdp.config import load_config
from zsgdp.pipeline import parse_document
from zsgdp.utils import write_json
from zsgdp.verify.formula_extraction import compute_formula_extraction
from zsgdp.verify.layout_f1 import compute_layout_f1
from zsgdp.verify.retrieval import compute_retrieval_metrics
from zsgdp.verify.table_structure import compute_table_structure_score
GROUND_TRUTH_ADAPTERS = {
"doclaynet": doclaynet_layout_truths,
"omnidocbench": omnidocbench_layout_truths,
}
TABLE_TRUTH_ADAPTERS = {
"omnidocbench": omnidocbench_table_truths,
}
FORMULA_TRUTH_ADAPTERS = {
"omnidocbench": omnidocbench_formula_truths,
}
def score_parser_quality(records: list[dict]) -> dict[str, float]:
if not records:
return {"mean_quality_score": 0.0}
return {
"mean_quality_score": sum(float(record.get("quality_score", 0.0)) for record in records) / len(records)
}
def run_parser_benchmark(
input_dir: str | Path,
output_dir: str | Path,
*,
config_path: str | Path | None = None,
selected_parsers: list[str] | None = None,
dataset_name: str = "custom_folder",
) -> dict:
input_root = Path(input_dir)
output_root = Path(output_dir)
output_root.mkdir(parents=True, exist_ok=True)
loader = get_dataset_loader(dataset_name)
bench_config = load_config(config_path)
documents: list[dict] = []
parser_rows: list[dict] = []
chunk_rows: list[dict] = []
structure_rows: list[dict] = []
chunk_quality_rows: list[dict] = []
throughput_rows: list[dict] = []
repair_rows: list[dict] = []
layout_rows: list[dict] = []
table_structure_rows: list[dict] = []
formula_rows: list[dict] = []
retrieval_rows: list[dict] = []
per_parser_rows: list[dict] = []
observed_chunk_strategies: set[str] = set()
for dataset_document in loader(input_root):
path = dataset_document.path
doc_out = output_root / "parsed" / (dataset_document.doc_id or path.stem)
started = perf_counter()
parsed = parse_document(path, doc_out, config_path=config_path, selected_parsers=selected_parsers)
elapsed_seconds = perf_counter() - started
chunk_strategy_counts = _chunk_strategy_counts(parsed.chunks)
observed_chunk_strategies.update(chunk_strategy_counts)
disagreement = parsed.provenance.get("parser_disagreement", {}) or {}
repair_success = parsed.provenance.get("repair_success", {}) or {}
layout_metrics = _layout_metrics(dataset_document, parsed)
table_metrics = _table_structure_metrics(dataset_document, parsed)
formula_metrics = _formula_extraction_metrics(dataset_document, parsed)
retrieval_metrics = _retrieval_metrics(parsed, bench_config)
contribution = _parser_contribution(parsed)
per_parser = _compute_per_parser_block(dataset_document, parsed)
doc_record = {
"source_path": str(path),
"dataset_id": dataset_document.dataset_id,
"dataset_doc_id": dataset_document.doc_id,
"has_ground_truth": dataset_document.ground_truth is not None,
"doc_id": parsed.doc_id,
"file_type": parsed.file_type,
"quality_score": parsed.quality_report.score,
"element_count": len(parsed.elements),
"table_count": len(parsed.tables),
"figure_count": len(parsed.figures),
"chunk_count": len(parsed.chunks),
"chunk_strategy_counts": chunk_strategy_counts,
"chunk_quality_metrics": _chunk_quality_metrics(parsed.quality_report.metrics),
"elapsed_seconds": elapsed_seconds,
"parser_metrics": parsed.provenance.get("parser_metrics", {}),
"parser_failures": parsed.provenance.get("parser_failures", {}),
"parser_disagreement_rate": float(disagreement.get("disagreement_rate", 0.0)),
"candidate_count": int(disagreement.get("candidate_count", 0)),
"conflict_count": int(disagreement.get("conflict_count", 0)),
"repair_resolution_rate": float(repair_success.get("repair_resolution_rate", 1.0)),
"repair_regression_rate": float(repair_success.get("repair_regression_rate", 0.0)),
"repair_iteration_count": int(repair_success.get("iteration_count", 0)),
"layout_f1": layout_metrics["summary"]["class_aware_f1"],
"layout_class_agnostic_f1": layout_metrics["summary"]["class_agnostic_f1"],
"layout_evaluated": layout_metrics["summary"]["evaluated"],
"table_structure_score": table_metrics["summary"]["mean_table_score"],
"table_match_rate": table_metrics["summary"]["table_match_rate"],
"table_structure_evaluated": table_metrics["summary"]["evaluated"],
"formula_cer": formula_metrics["summary"]["mean_cer"],
"formula_accuracy": formula_metrics["summary"]["mean_accuracy"],
"formula_exact_match_rate": formula_metrics["summary"]["exact_match_rate"],
"formula_evaluated": formula_metrics["summary"]["evaluated"],
"retrieval_recall_at_1": retrieval_metrics["summary"]["recall_at_1"],
"retrieval_recall_at_5": retrieval_metrics["summary"]["recall_at_5"],
"retrieval_mrr": retrieval_metrics["summary"]["mean_reciprocal_rank"],
"retrieval_query_count": retrieval_metrics["summary"]["query_count"],
"retrieval_evaluated": retrieval_metrics["summary"]["evaluated"],
"parser_contribution_counts": contribution["counts"],
"parser_contribution_fractions": contribution["fractions"],
"per_parser_metrics": per_parser,
}
documents.append(doc_record)
for parser_name, block in per_parser.items():
per_parser_rows.append(_per_parser_row(path, parsed, dataset_document, parser_name, block))
repair_rows.append(_repair_row(path, parsed, repair_success, disagreement))
if layout_metrics["summary"]["evaluated"]:
layout_rows.append(_layout_row(path, parsed, dataset_document, layout_metrics))
if table_metrics["summary"]["evaluated"]:
table_structure_rows.append(_table_structure_row(path, parsed, dataset_document, table_metrics))
if formula_metrics["summary"]["evaluated"]:
formula_rows.append(_formula_row(path, parsed, dataset_document, formula_metrics))
if retrieval_metrics["summary"]["evaluated"]:
retrieval_rows.append(_retrieval_row(path, parsed, retrieval_metrics))
for parser_name, metrics in doc_record["parser_metrics"].items():
parser_rows.append(
{
"source_path": str(path),
"doc_id": parsed.doc_id,
"parser": parser_name,
**metrics,
}
)
chunk_rows.extend(_chunk_strategy_rows(path, parsed))
structure_rows.append(structure_quality_record(parsed, str(path)))
chunk_quality_rows.append(chunking_quality_record(parsed, str(path)))
throughput_rows.append(throughput_record(parsed, str(path), elapsed_seconds))
config = bench_config
parser_contribution_summary = _aggregate_parser_contributions(documents)
summary = {
"dataset_name": dataset_name,
"dataset_root": str(input_root),
"document_count": len(documents),
"parser_contribution_summary": parser_contribution_summary,
"mean_quality_score": mean([doc["quality_score"] for doc in documents]) if documents else 0.0,
"mean_parser_disagreement_rate": _mean_value(documents, "parser_disagreement_rate"),
"mean_repair_resolution_rate": _mean_value(documents, "repair_resolution_rate"),
"mean_repair_regression_rate": _mean_value(documents, "repair_regression_rate"),
"mean_layout_f1": _mean_value(
[doc for doc in documents if doc.get("layout_evaluated")], "layout_f1"
),
"mean_layout_class_agnostic_f1": _mean_value(
[doc for doc in documents if doc.get("layout_evaluated")], "layout_class_agnostic_f1"
),
"layout_evaluated_count": sum(1 for doc in documents if doc.get("layout_evaluated")),
"mean_table_structure_score": _mean_value(
[doc for doc in documents if doc.get("table_structure_evaluated")], "table_structure_score"
),
"mean_table_match_rate": _mean_value(
[doc for doc in documents if doc.get("table_structure_evaluated")], "table_match_rate"
),
"table_structure_evaluated_count": sum(1 for doc in documents if doc.get("table_structure_evaluated")),
"mean_formula_cer": _mean_value(
[doc for doc in documents if doc.get("formula_evaluated")], "formula_cer"
),
"mean_formula_accuracy": _mean_value(
[doc for doc in documents if doc.get("formula_evaluated")], "formula_accuracy"
),
"formula_evaluated_count": sum(1 for doc in documents if doc.get("formula_evaluated")),
"mean_retrieval_recall_at_1": _mean_value(
[doc for doc in documents if doc.get("retrieval_evaluated")], "retrieval_recall_at_1"
),
"mean_retrieval_recall_at_5": _mean_value(
[doc for doc in documents if doc.get("retrieval_evaluated")], "retrieval_recall_at_5"
),
"mean_retrieval_mrr": _mean_value(
[doc for doc in documents if doc.get("retrieval_evaluated")], "retrieval_mrr"
),
"retrieval_evaluated_count": sum(1 for doc in documents if doc.get("retrieval_evaluated")),
"documents": documents,
"parser_leaderboard": _parser_leaderboard(parser_rows),
"per_parser_gt_leaderboard": _per_parser_gt_leaderboard(per_parser_rows),
"chunk_strategy_leaderboard": _chunk_strategy_leaderboard(chunk_rows),
"structure_quality": score_structure_quality(structure_rows),
"chunking_quality": score_chunking_quality(chunk_quality_rows),
"throughput": summarize_throughput(throughput_rows),
"ablation_plan": ablation_plan(config, observed_chunk_strategies),
}
write_json(output_root / "results.json", summary)
write_json(output_root / "ablations.json", summary["ablation_plan"])
_write_leaderboard_csv(output_root / "leaderboard.csv", summary["parser_leaderboard"])
_write_parser_rows_csv(output_root / "parser_runs.csv", parser_rows)
_write_chunk_rows_csv(output_root / "chunk_runs.csv", chunk_rows)
_write_structure_rows_csv(output_root / "structure_runs.csv", structure_rows)
_write_chunk_quality_rows_csv(output_root / "chunk_quality.csv", chunk_quality_rows)
_write_throughput_rows_csv(output_root / "throughput_runs.csv", throughput_rows)
_write_repair_rows_csv(output_root / "repair_runs.csv", repair_rows)
_write_layout_rows_csv(output_root / "layout_runs.csv", layout_rows)
_write_table_structure_rows_csv(output_root / "table_structure_runs.csv", table_structure_rows)
_write_formula_rows_csv(output_root / "formula_runs.csv", formula_rows)
_write_retrieval_rows_csv(output_root / "retrieval_runs.csv", retrieval_rows)
_write_per_parser_rows_csv(output_root / "per_parser_metrics.csv", per_parser_rows)
_write_per_parser_gt_leaderboard_csv(
output_root / "per_parser_gt_leaderboard.csv", summary["per_parser_gt_leaderboard"]
)
return summary
def _per_parser_gt_leaderboard(rows: list[dict]) -> list[dict]:
"""Aggregate per-document per-parser rows into one leaderboard row per parser.
A metric contributes to a parser's mean only when that parser actually
had a non-zero prediction count for that metric on that document; this
keeps "0.00 from no predictions" from dragging the mean down for parsers
that simply don't emit bboxes (text/markdown). The number of documents
contributing to each metric is reported alongside the mean.
"""
grouped: dict[str, list[dict]] = {}
for row in rows:
grouped.setdefault(row["parser"], []).append(row)
leaderboard: list[dict] = []
for parser_name, parser_rows in grouped.items():
layout_rows = [row for row in parser_rows if row.get("layout_evaluated")]
table_rows = [row for row in parser_rows if row.get("table_evaluated")]
formula_rows = [row for row in parser_rows if row.get("formula_evaluated")]
leaderboard.append(
{
"parser": parser_name,
"document_count": len(parser_rows),
"layout_evaluated_count": len(layout_rows),
"mean_layout_class_aware_f1": _mean_value(layout_rows, "layout_class_aware_f1"),
"mean_layout_class_agnostic_f1": _mean_value(layout_rows, "layout_class_agnostic_f1"),
"mean_layout_precision": _mean_value(layout_rows, "layout_class_aware_precision"),
"mean_layout_recall": _mean_value(layout_rows, "layout_class_aware_recall"),
"table_evaluated_count": len(table_rows),
"mean_table_structure_score": _mean_value(table_rows, "table_structure_score"),
"mean_table_match_rate": _mean_value(table_rows, "table_match_rate"),
"mean_table_cell_content_f1": _mean_value(table_rows, "table_cell_content_f1"),
"formula_evaluated_count": len(formula_rows),
"mean_formula_cer": _mean_value(formula_rows, "formula_cer"),
"mean_formula_accuracy": _mean_value(formula_rows, "formula_accuracy"),
"mean_formula_exact_match_rate": _mean_value(formula_rows, "formula_exact_match_rate"),
"mean_element_count": _mean_value(parser_rows, "element_count"),
"mean_table_count": _mean_value(parser_rows, "table_count"),
"mean_figure_count": _mean_value(parser_rows, "figure_count"),
}
)
return sorted(
leaderboard,
key=lambda row: (
row["mean_layout_class_aware_f1"],
row["mean_table_structure_score"],
-row["mean_formula_cer"],
),
reverse=True,
)
def _write_per_parser_gt_leaderboard_csv(path: Path, rows: list[dict]) -> None:
fieldnames = [
"parser",
"document_count",
"layout_evaluated_count",
"mean_layout_class_aware_f1",
"mean_layout_class_agnostic_f1",
"mean_layout_precision",
"mean_layout_recall",
"table_evaluated_count",
"mean_table_structure_score",
"mean_table_match_rate",
"mean_table_cell_content_f1",
"formula_evaluated_count",
"mean_formula_cer",
"mean_formula_accuracy",
"mean_formula_exact_match_rate",
"mean_element_count",
"mean_table_count",
"mean_figure_count",
]
normalized = [{field: row.get(field, "") for field in fieldnames} for row in rows]
_write_csv(path, normalized, fieldnames)
def _compute_per_parser_block(dataset_document: DatasetDocument, parsed) -> dict[str, dict[str, Any]]:
layout_adapter = GROUND_TRUTH_ADAPTERS.get(dataset_document.dataset_id)
table_adapter = TABLE_TRUTH_ADAPTERS.get(dataset_document.dataset_id)
formula_adapter = FORMULA_TRUTH_ADAPTERS.get(dataset_document.dataset_id)
gt = dataset_document.ground_truth
layout_truths = layout_adapter(gt) if (layout_adapter and gt is not None) else None
table_truths = table_adapter(gt) if (table_adapter and gt is not None) else None
formula_truths = formula_adapter(gt) if (formula_adapter and gt is not None) else None
if not (layout_truths or table_truths or formula_truths):
return {}
return compute_per_parser_metrics(
parsed,
layout_truths=layout_truths or None,
table_truths=table_truths or None,
formula_truths=formula_truths or None,
)
def _per_parser_row(
path: Path,
parsed,
dataset_document: DatasetDocument,
parser_name: str,
block: dict[str, Any],
) -> dict[str, Any]:
layout = block.get("layout") or {}
table = block.get("table_structure") or {}
formula = block.get("formula") or {}
return {
"source_path": str(path),
"doc_id": parsed.doc_id,
"dataset_id": dataset_document.dataset_id,
"parser": parser_name,
"element_count": int(block.get("element_count", 0)),
"table_count": int(block.get("table_count", 0)),
"figure_count": int(block.get("figure_count", 0)),
"layout_evaluated": "layout" in block,
"table_evaluated": "table_structure" in block,
"formula_evaluated": "formula" in block,
"layout_prediction_count": int(layout.get("prediction_count", 0)),
"layout_class_aware_f1": float(layout.get("class_aware_f1", 0.0)),
"layout_class_aware_precision": float(layout.get("class_aware_precision", 0.0)),
"layout_class_aware_recall": float(layout.get("class_aware_recall", 0.0)),
"layout_class_agnostic_f1": float(layout.get("class_agnostic_f1", 0.0)),
"table_structure_score": float(table.get("mean_table_score", 0.0)),
"table_match_rate": float(table.get("table_match_rate", 0.0)),
"table_cell_content_f1": float(table.get("mean_cell_content_f1", 0.0)),
"formula_cer": float(formula.get("mean_cer", 0.0)) if formula else 0.0,
"formula_accuracy": float(formula.get("mean_accuracy", 0.0)) if formula else 0.0,
"formula_exact_match_rate": float(formula.get("exact_match_rate", 0.0)) if formula else 0.0,
}
def _write_per_parser_rows_csv(path: Path, rows: list[dict]) -> None:
fieldnames = [
"source_path",
"doc_id",
"dataset_id",
"parser",
"element_count",
"table_count",
"figure_count",
"layout_evaluated",
"table_evaluated",
"formula_evaluated",
"layout_prediction_count",
"layout_class_aware_f1",
"layout_class_aware_precision",
"layout_class_aware_recall",
"layout_class_agnostic_f1",
"table_structure_score",
"table_match_rate",
"table_cell_content_f1",
"formula_cer",
"formula_accuracy",
"formula_exact_match_rate",
]
normalized = [{field: row.get(field, "") for field in fieldnames} for row in rows]
_write_csv(path, normalized, fieldnames)
def _retrieval_metrics(parsed, config: dict | None = None) -> dict:
from zsgdp.benchmarks.embedding_retriever import build_retriever
retriever = build_retriever(config) if config else None
run = run_retrieval_for_document(parsed, retriever=retriever)
if not run["evaluated"]:
return {
"summary": {
"evaluated": False,
"query_count": 0,
"recall_at_1": 0.0,
"recall_at_5": 0.0,
"mean_reciprocal_rank": 0.0,
},
"metrics": None,
"reason": run.get("reason"),
}
metrics = compute_retrieval_metrics(
((result["retrieved"], result["truths"]) for result in run["results"]),
)
return {
"summary": {
"evaluated": True,
"query_count": int(metrics["query_count"]),
"recall_at_1": float(metrics["recall_at_k"].get(1, 0.0)),
"recall_at_5": float(metrics["recall_at_k"].get(5, 0.0)),
"mean_reciprocal_rank": float(metrics["mean_reciprocal_rank"]),
},
"metrics": metrics,
"reason": None,
}
def _retrieval_row(path: Path, parsed, retrieval_metrics: dict) -> dict:
detail = retrieval_metrics["metrics"] or {}
recall = detail.get("recall_at_k", {})
return {
"source_path": str(path),
"doc_id": parsed.doc_id,
"query_count": int(detail.get("query_count", 0)),
"recall_at_1": float(recall.get(1, 0.0)),
"recall_at_3": float(recall.get(3, 0.0)),
"recall_at_5": float(recall.get(5, 0.0)),
"mean_reciprocal_rank": float(detail.get("mean_reciprocal_rank", 0.0)),
"citation_accuracy_at_5": float(detail.get("citation_accuracy_at_k", {}).get(5, 0.0)),
}
def _parser_contribution(parsed) -> dict[str, Any]:
"""Count which parser produced each merged element/table/figure.
Counts are over the *post-merge* output, not the pre-merge candidates.
This is a "contribution" view (which parser's output survived) rather
than an "ablation" view (which parser would do best alone).
"""
counts: dict[str, int] = {}
for element in parsed.elements:
counts[element.source_parser] = counts.get(element.source_parser, 0) + 1
for table in parsed.tables:
counts[table.source_parser] = counts.get(table.source_parser, 0) + 1
for figure in parsed.figures:
counts[figure.source_parser] = counts.get(figure.source_parser, 0) + 1
total = sum(counts.values())
fractions = {parser: (count / total) for parser, count in counts.items()} if total else {}
return {"counts": counts, "fractions": fractions, "total": total}
def _aggregate_parser_contributions(documents: list[dict]) -> dict[str, Any]:
parser_totals: dict[str, int] = {}
grand_total = 0
for doc in documents:
counts = doc.get("parser_contribution_counts") or {}
if not isinstance(counts, dict):
continue
for parser, count in counts.items():
parser_totals[parser] = parser_totals.get(parser, 0) + int(count)
grand_total += int(count)
fractions = {parser: (count / grand_total) for parser, count in parser_totals.items()} if grand_total else {}
return {"counts": dict(sorted(parser_totals.items())), "fractions": dict(sorted(fractions.items())), "total": grand_total}
def _write_retrieval_rows_csv(path: Path, rows: list[dict]) -> None:
fieldnames = [
"source_path",
"doc_id",
"query_count",
"recall_at_1",
"recall_at_3",
"recall_at_5",
"mean_reciprocal_rank",
"citation_accuracy_at_5",
]
normalized = [{field: row.get(field, "") for field in fieldnames} for row in rows]
_write_csv(path, normalized, fieldnames)
def _table_structure_metrics(dataset_document: DatasetDocument, parsed) -> dict:
adapter = TABLE_TRUTH_ADAPTERS.get(dataset_document.dataset_id)
if adapter is None or dataset_document.ground_truth is None:
return {
"summary": {"evaluated": False, "mean_table_score": 1.0, "table_match_rate": 1.0},
"metrics": None,
"reason": "no_table_truth_adapter" if adapter is None else "no_ground_truth",
}
truths = adapter(dataset_document.ground_truth)
predictions = parsed_table_records(parsed)
if not truths and not predictions:
return {
"summary": {"evaluated": False, "mean_table_score": 1.0, "table_match_rate": 1.0},
"metrics": None,
"reason": "no_truths_and_no_predictions",
}
metrics = compute_table_structure_score(predictions, truths)
return {
"summary": {
"evaluated": True,
"mean_table_score": float(metrics["mean_table_score"]),
"table_match_rate": float(metrics["table_match_rate"]),
},
"metrics": metrics,
"reason": None,
}
def _formula_extraction_metrics(dataset_document: DatasetDocument, parsed) -> dict:
adapter = FORMULA_TRUTH_ADAPTERS.get(dataset_document.dataset_id)
if adapter is None or dataset_document.ground_truth is None:
return {
"summary": {"evaluated": False, "mean_cer": 0.0, "mean_accuracy": 1.0, "exact_match_rate": 1.0},
"metrics": None,
"reason": "no_formula_truth_adapter" if adapter is None else "no_ground_truth",
}
truths = adapter(dataset_document.ground_truth)
predictions = parsed_formula_records(parsed)
if not truths and not predictions:
return {
"summary": {"evaluated": False, "mean_cer": 0.0, "mean_accuracy": 1.0, "exact_match_rate": 1.0},
"metrics": None,
"reason": "no_truths_and_no_predictions",
}
metrics = compute_formula_extraction(predictions, truths)
return {
"summary": {
"evaluated": True,
"mean_cer": float(metrics["mean_cer"]),
"mean_accuracy": float(metrics["mean_accuracy"]),
"exact_match_rate": float(metrics["exact_match_rate"]),
},
"metrics": metrics,
"reason": None,
}
def _table_structure_row(path: Path, parsed, dataset_document: DatasetDocument, metrics: dict) -> dict:
detail = metrics["metrics"] or {}
return {
"source_path": str(path),
"doc_id": parsed.doc_id,
"dataset_id": dataset_document.dataset_id,
"prediction_count": int(detail.get("prediction_count", 0)),
"truth_count": int(detail.get("truth_count", 0)),
"matched_pair_count": int(detail.get("matched_pair_count", 0)),
"table_match_rate": float(detail.get("table_match_rate", 0.0)),
"mean_table_score": float(detail.get("mean_table_score", 0.0)),
"mean_shape_similarity": float(detail.get("mean_shape_similarity", 0.0)),
"mean_cell_content_f1": float(detail.get("mean_cell_content_f1", 0.0)),
"table_count_delta": int(detail.get("table_count_delta", 0)),
}
def _formula_row(path: Path, parsed, dataset_document: DatasetDocument, metrics: dict) -> dict:
detail = metrics["metrics"] or {}
return {
"source_path": str(path),
"doc_id": parsed.doc_id,
"dataset_id": dataset_document.dataset_id,
"prediction_count": int(detail.get("prediction_count", 0)),
"truth_count": int(detail.get("truth_count", 0)),
"matched_pair_count": int(detail.get("matched_pair_count", 0)),
"mean_cer": float(detail.get("mean_cer", 1.0)),
"mean_accuracy": float(detail.get("mean_accuracy", 0.0)),
"exact_match_rate": float(detail.get("exact_match_rate", 0.0)),
"formula_precision": float(detail.get("formula_precision", 0.0)),
"formula_recall": float(detail.get("formula_recall", 0.0)),
}
def _write_table_structure_rows_csv(path: Path, rows: list[dict]) -> None:
fieldnames = [
"source_path",
"doc_id",
"dataset_id",
"prediction_count",
"truth_count",
"matched_pair_count",
"table_match_rate",
"mean_table_score",
"mean_shape_similarity",
"mean_cell_content_f1",
"table_count_delta",
]
normalized = [{field: row.get(field, "") for field in fieldnames} for row in rows]
_write_csv(path, normalized, fieldnames)
def _write_formula_rows_csv(path: Path, rows: list[dict]) -> None:
fieldnames = [
"source_path",
"doc_id",
"dataset_id",
"prediction_count",
"truth_count",
"matched_pair_count",
"mean_cer",
"mean_accuracy",
"exact_match_rate",
"formula_precision",
"formula_recall",
]
normalized = [{field: row.get(field, "") for field in fieldnames} for row in rows]
_write_csv(path, normalized, fieldnames)
def _layout_metrics(dataset_document: DatasetDocument, parsed) -> dict:
adapter = GROUND_TRUTH_ADAPTERS.get(dataset_document.dataset_id)
if adapter is None or dataset_document.ground_truth is None:
return {
"summary": {"evaluated": False, "class_aware_f1": 0.0, "class_agnostic_f1": 0.0},
"metrics": None,
"reason": "no_ground_truth_adapter" if adapter is None else "no_ground_truth",
}
truths = adapter(dataset_document.ground_truth)
predictions = parsed_layout_predictions(parsed)
if not truths and not predictions:
return {
"summary": {"evaluated": False, "class_aware_f1": 0.0, "class_agnostic_f1": 0.0},
"metrics": None,
"reason": "no_truths_and_no_predictions",
}
metrics = compute_layout_f1(predictions, truths)
return {
"summary": {
"evaluated": True,
"class_aware_f1": float(metrics["class_aware"]["f1"]),
"class_agnostic_f1": float(metrics["class_agnostic"]["f1"]),
},
"metrics": metrics,
"reason": None,
}
def _layout_row(path: Path, parsed, dataset_document: DatasetDocument, layout_metrics: dict) -> dict:
metrics = layout_metrics["metrics"] or {}
class_aware = metrics.get("class_aware", {})
class_agnostic = metrics.get("class_agnostic", {})
return {
"source_path": str(path),
"doc_id": parsed.doc_id,
"dataset_id": dataset_document.dataset_id,
"iou_threshold": float(metrics.get("iou_threshold", 0.5)),
"prediction_count": int(metrics.get("prediction_count", 0)),
"truth_count": int(metrics.get("truth_count", 0)),
"class_aware_precision": float(class_aware.get("precision", 0.0)),
"class_aware_recall": float(class_aware.get("recall", 0.0)),
"class_aware_f1": float(class_aware.get("f1", 0.0)),
"class_agnostic_precision": float(class_agnostic.get("precision", 0.0)),
"class_agnostic_recall": float(class_agnostic.get("recall", 0.0)),
"class_agnostic_f1": float(class_agnostic.get("f1", 0.0)),
}
def _write_layout_rows_csv(path: Path, rows: list[dict]) -> None:
fieldnames = [
"source_path",
"doc_id",
"dataset_id",
"iou_threshold",
"prediction_count",
"truth_count",
"class_aware_precision",
"class_aware_recall",
"class_aware_f1",
"class_agnostic_precision",
"class_agnostic_recall",
"class_agnostic_f1",
]
normalized = [{field: row.get(field, "") for field in fieldnames} for row in rows]
_write_csv(path, normalized, fieldnames)
def _repair_row(path: Path, parsed, repair_success: dict, disagreement: dict) -> dict:
return {
"source_path": str(path),
"doc_id": parsed.doc_id,
"candidate_count": int(disagreement.get("candidate_count", 0)),
"parser_disagreement_rate": float(disagreement.get("disagreement_rate", 0.0)),
"conflict_count": int(disagreement.get("conflict_count", 0)),
"iteration_count": int(repair_success.get("iteration_count", 0)),
"total_actions": int(repair_success.get("total_actions", 0)),
"score_delta": float(repair_success.get("score_delta", 0.0)),
"pre_repair_blocking_count": int(repair_success.get("pre_repair_blocking_count", 0)),
"post_repair_blocking_count": int(repair_success.get("post_repair_blocking_count", 0)),
"resolved_blocking_count": int(repair_success.get("resolved_blocking_count", 0)),
"regressed_blocking_count": int(repair_success.get("regressed_blocking_count", 0)),
"repair_resolution_rate": float(repair_success.get("repair_resolution_rate", 1.0)),
"repair_regression_rate": float(repair_success.get("repair_regression_rate", 0.0)),
}
def _write_repair_rows_csv(path: Path, rows: list[dict]) -> None:
fieldnames = [
"source_path",
"doc_id",
"candidate_count",
"parser_disagreement_rate",
"conflict_count",
"iteration_count",
"total_actions",
"score_delta",
"pre_repair_blocking_count",
"post_repair_blocking_count",
"resolved_blocking_count",
"regressed_blocking_count",
"repair_resolution_rate",
"repair_regression_rate",
]
normalized = [{field: row.get(field, "") for field in fieldnames} for row in rows]
_write_csv(path, normalized, fieldnames)
def _chunk_strategy_counts(chunks: list) -> dict[str, int]:
counts: dict[str, int] = {}
for chunk in chunks:
counts[chunk.strategy] = counts.get(chunk.strategy, 0) + 1
return counts
def _chunk_quality_metrics(metrics: dict) -> dict:
keys = [
"chunk_count",
"parent_chunk_count",
"child_chunk_count",
"avg_chunk_tokens",
"max_chunk_tokens",
"table_chunk_coverage",
"figure_chunk_coverage",
]
return {key: metrics[key] for key in keys if key in metrics}
def _chunk_strategy_rows(path: Path, parsed) -> list[dict]:
grouped: dict[str, list] = {}
for chunk in parsed.chunks:
grouped.setdefault(chunk.strategy, []).append(chunk)
rows: list[dict] = []
for strategy, chunks in sorted(grouped.items()):
token_counts = [chunk.token_count for chunk in chunks]
rows.append(
{
"source_path": str(path),
"doc_id": parsed.doc_id,
"strategy": strategy,
"quality_score": parsed.quality_report.score,
"chunk_count": len(chunks),
"avg_tokens": mean(token_counts) if token_counts else 0.0,
"max_tokens": max(token_counts) if token_counts else 0,
"table_linked_chunks": sum(1 for chunk in chunks if chunk.table_ids),
"figure_linked_chunks": sum(1 for chunk in chunks if chunk.figure_ids),
"visual_context_chunks": sum(1 for chunk in chunks if chunk.requires_visual_context),
}
)
return rows
def _chunk_strategy_leaderboard(rows: list[dict]) -> list[dict]:
grouped: dict[str, list[dict]] = {}
for row in rows:
grouped.setdefault(row["strategy"], []).append(row)
leaderboard: list[dict] = []
for strategy, strategy_rows in grouped.items():
leaderboard.append(
{
"strategy": strategy,
"runs": len(strategy_rows),
"total_chunks": sum(int(row.get("chunk_count", 0)) for row in strategy_rows),
"mean_chunk_count": _mean_value(strategy_rows, "chunk_count"),
"mean_avg_tokens": _mean_value(strategy_rows, "avg_tokens"),
"mean_max_tokens": _mean_value(strategy_rows, "max_tokens"),
"mean_quality_score": _mean_value(strategy_rows, "quality_score"),
"total_table_linked_chunks": sum(int(row.get("table_linked_chunks", 0)) for row in strategy_rows),
"total_figure_linked_chunks": sum(int(row.get("figure_linked_chunks", 0)) for row in strategy_rows),
"total_visual_context_chunks": sum(int(row.get("visual_context_chunks", 0)) for row in strategy_rows),
}
)
return sorted(leaderboard, key=lambda row: (row["mean_quality_score"], row["total_chunks"]), reverse=True)
def _parser_leaderboard(rows: list[dict]) -> list[dict]:
grouped: dict[str, list[dict]] = {}
for row in rows:
grouped.setdefault(row["parser"], []).append(row)
leaderboard: list[dict] = []
for parser_name, parser_rows in grouped.items():
successes = [row for row in parser_rows if not row.get("failed")]
leaderboard.append(
{
"parser": parser_name,
"runs": len(parser_rows),
"successes": len(successes),
"failures": len(parser_rows) - len(successes),
"mean_elapsed_seconds": _mean_value(successes, "elapsed_seconds"),
"mean_text_coverage_ratio": _mean_value(successes, "text_coverage_ratio"),
"mean_element_count": _mean_value(successes, "element_count"),
"mean_table_count": _mean_value(successes, "table_count"),
"mean_figure_count": _mean_value(successes, "figure_count"),
"mean_valid_table_ratio": _mean_value(successes, "valid_table_ratio"),
}
)
return sorted(
leaderboard,
key=lambda row: (row["mean_text_coverage_ratio"], row["mean_valid_table_ratio"], -row["mean_elapsed_seconds"]),
reverse=True,
)
def _mean_value(rows: list[dict], key: str) -> float:
values = [float(row[key]) for row in rows if row.get(key) is not None]
return mean(values) if values else 0.0
def _write_leaderboard_csv(path: Path, rows: list[dict]) -> None:
fieldnames = [
"parser",
"runs",
"successes",
"failures",
"mean_elapsed_seconds",
"mean_text_coverage_ratio",
"mean_element_count",
"mean_table_count",
"mean_figure_count",
"mean_valid_table_ratio",
]
_write_csv(path, rows, fieldnames)
def _write_parser_rows_csv(path: Path, rows: list[dict]) -> None:
fieldnames = [
"source_path",
"doc_id",
"parser",
"failed",
"error",
"elapsed_seconds",
"page_count",
"element_count",
"table_count",
"figure_count",
"text_chars",
"expected_text_chars",
"text_coverage_ratio",
"valid_table_ratio",
"has_bboxes",
"has_page_images",
]
normalized = [{field: row.get(field, "") for field in fieldnames} for row in rows]
_write_csv(path, normalized, fieldnames)
def _write_chunk_rows_csv(path: Path, rows: list[dict]) -> None:
fieldnames = [
"source_path",
"doc_id",
"strategy",
"quality_score",
"chunk_count",
"avg_tokens",
"max_tokens",
"table_linked_chunks",
"figure_linked_chunks",
"visual_context_chunks",
]
normalized = [{field: row.get(field, "") for field in fieldnames} for row in rows]
_write_csv(path, normalized, fieldnames)
def _write_structure_rows_csv(path: Path, rows: list[dict]) -> None:
fieldnames = [
"source_path",
"doc_id",
"table_count",
"valid_table_count",
"table_exact",
"figure_count",
"captioned_figure_count",
"figure_caption_correct",
"reading_order_issue_count",
"reading_order_health",
"document_text_coverage",
]
normalized = [{field: row.get(field, "") for field in fieldnames} for row in rows]
_write_csv(path, normalized, fieldnames)
def _write_chunk_quality_rows_csv(path: Path, rows: list[dict]) -> None:
fieldnames = [
"source_path",
"doc_id",
"chunk_count",
"boundary_precision",
"parent_child_resolution",
"provenance_completeness",
"retrieval_readiness",
"table_chunk_coverage",
"figure_chunk_coverage",
"avg_tokens",
"max_tokens",
]
normalized = [{field: row.get(field, "") for field in fieldnames} for row in rows]
_write_csv(path, normalized, fieldnames)
def _write_throughput_rows_csv(path: Path, rows: list[dict]) -> None:
fieldnames = [
"source_path",
"doc_id",
"page_count",
"elapsed_seconds",
"pages_per_second",
"elements_per_second",
"chunks_per_second",
"gpu_task_count",
"runtime_device",
"max_gpu_seconds_per_doc",
]
normalized = [{field: row.get(field, "") for field in fieldnames} for row in rows]
_write_csv(path, normalized, fieldnames)
def _write_csv(path: Path, rows: list[dict], fieldnames: list[str]) -> None:
with path.open("w", encoding="utf-8", newline="") as handle:
writer = csv.DictWriter(handle, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(rows)