Spaces:
Sleeping
Sleeping
File size: 5,723 Bytes
c6a48e0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
#!/usr/bin/env python3
"""
Quick spot check for Docling parsing quality.
Usage:
python scripts/eval_spot_check.py /path/to/documents
python scripts/eval_spot_check.py /path/to/single/file.pdf
Outputs a visual summary of how Docling parsed each document.
"""
import sys
import os
from pathlib import Path
from collections import Counter
# Add project root to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.ingestion.docling_loader import (
load_document_with_docling,
load_documents_with_docling,
SUPPORTED_EXTENSIONS,
ParsedDocument
)
def print_header(text: str, char: str = "="):
"""Print a formatted header."""
print(f"\n{char * 60}")
print(f" {text}")
print(f"{char * 60}")
def analyze_document(doc: ParsedDocument, verbose: bool = True) -> dict:
"""Analyze a single parsed document and return metrics."""
# Count elements by type
type_counts = Counter(el.element_type for el in doc.elements)
# Check for potential issues
issues = []
if doc.status != "OK":
issues.append(f"Status: {doc.status} - {doc.error}")
if len(doc.elements) == 0:
issues.append("No elements extracted!")
if doc.chars == 0:
issues.append("Zero characters extracted!")
if type_counts.get("table", 0) == 0 and doc.format == ".pdf":
# PDFs often have tables - flag if none found
issues.append("No tables detected (may be expected)")
# Calculate metrics
metrics = {
"filename": doc.filename,
"format": doc.format,
"status": doc.status,
"total_elements": len(doc.elements),
"total_chars": doc.chars,
"total_words": doc.words,
"page_count": doc.page_count,
"element_types": dict(type_counts),
"issues": issues
}
if verbose:
print_header(f"{doc.filename} ({doc.format})", "-")
print(f" Status: {doc.status}")
print(f" Elements: {len(doc.elements)}")
print(f" Characters: {doc.chars:,}")
print(f" Words: {doc.words:,}")
if doc.page_count:
print(f" Pages: {doc.page_count}")
print(f"\n Element breakdown:")
for el_type, count in sorted(type_counts.items()):
print(f" {el_type}: {count}")
if issues:
print(f"\n ⚠️ Potential issues:")
for issue in issues:
print(f" - {issue}")
# Show sample elements
print(f"\n Sample elements (first 5):")
for i, el in enumerate(doc.elements[:5]):
text_preview = el.text[:80].replace('\n', ' ')
if len(el.text) > 80:
text_preview += "..."
print(f" [{el.element_type}] {text_preview}")
# Show table preview if any
tables = [el for el in doc.elements if el.element_type == "table"]
if tables:
print(f"\n Table preview (first table):")
table_text = tables[0].text[:300].replace('\n', '\n ')
print(f" {table_text}")
if len(tables[0].text) > 300:
print(" ...")
return metrics
def run_spot_check(path: str, verbose: bool = True):
"""Run spot check on a file or directory."""
path = Path(path)
print_header("DOCLING PARSING SPOT CHECK")
print(f" Path: {path}")
print(f" Supported formats: {', '.join(sorted(SUPPORTED_EXTENSIONS))}")
all_metrics = []
if path.is_file():
# Single file
doc = load_document_with_docling(str(path))
metrics = analyze_document(doc, verbose=verbose)
all_metrics.append(metrics)
elif path.is_dir():
# Directory
docs = load_documents_with_docling(str(path), recursive=True)
print(f" Found {len(docs)} documents")
for doc in docs:
metrics = analyze_document(doc, verbose=verbose)
all_metrics.append(metrics)
else:
print(f" ERROR: Path not found: {path}")
return []
# Summary
print_header("SUMMARY")
ok_count = sum(1 for m in all_metrics if m["status"] == "OK")
total_elements = sum(m["total_elements"] for m in all_metrics)
total_chars = sum(m["total_chars"] for m in all_metrics)
print(f" Documents processed: {len(all_metrics)}")
print(f" Successful (OK): {ok_count}")
print(f" Failed/Skipped: {len(all_metrics) - ok_count}")
print(f" Total elements: {total_elements}")
print(f" Total characters: {total_chars:,}")
# Aggregate element types
all_types = Counter()
for m in all_metrics:
all_types.update(m["element_types"])
print(f"\n Element types across all docs:")
for el_type, count in sorted(all_types.items(), key=lambda x: -x[1]):
print(f" {el_type}: {count}")
# All issues
all_issues = []
for m in all_metrics:
for issue in m["issues"]:
all_issues.append(f"{m['filename']}: {issue}")
if all_issues:
print(f"\n ⚠️ Issues found:")
for issue in all_issues[:10]:
print(f" - {issue}")
if len(all_issues) > 10:
print(f" ... and {len(all_issues) - 10} more")
else:
print(f"\n ✅ No issues detected")
return all_metrics
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python scripts/eval_spot_check.py /path/to/documents")
print("\nExamples:")
print(" python scripts/eval_spot_check.py ./tests/eval_data/documents")
print(" python scripts/eval_spot_check.py ./report.pdf")
sys.exit(1)
target_path = sys.argv[1]
verbose = "--quiet" not in sys.argv
run_spot_check(target_path, verbose=verbose)
|