File size: 3,254 Bytes
657d287 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 | """Parse all downloaded documents into uniform ParsedDoc JSON files.
For each file in data/raw/{compliance,credit}/ that has a paired .meta.json,
parse it and write to data/processed/{compliance,credit}/parsed/<doc_id>.json.
Idempotent: skips parsed files that already exist (delete to re-parse).
"""
from __future__ import annotations
import json
import sys
import traceback
from pathlib import Path
from tqdm import tqdm
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT))
from pipelines.shared.document_parser import parse_document # noqa: E402
RAW_DIR = ROOT / "data" / "raw"
PROCESSED_DIR = ROOT / "data" / "processed"
def gather_documents() -> list[tuple[str, Path, dict]]:
"""Return [(module, raw_path, metadata_dict)] for every parseable doc."""
out: list[tuple[str, Path, dict]] = []
for module in ("compliance", "credit"):
raw_module_dir = RAW_DIR / module
if not raw_module_dir.exists():
continue
for meta_file in sorted(raw_module_dir.glob("*.meta.json")):
doc_id = meta_file.stem.replace(".meta", "")
metadata = json.loads(meta_file.read_text())
# The raw file shares the doc_id stem; figure out its extension.
candidates = list(raw_module_dir.glob(f"{doc_id}.*"))
candidates = [c for c in candidates if not c.name.endswith(".meta.json")]
if not candidates:
print(f" ! no raw file found for {doc_id}", file=sys.stderr)
continue
raw_path = candidates[0]
out.append((module, raw_path, metadata))
return out
def main() -> int:
docs = gather_documents()
print(f"Found {len(docs)} documents to parse")
n_ok = 0
n_skipped = 0
n_failed = 0
failures: list[tuple[str, str]] = []
for module, raw_path, metadata in tqdm(docs, desc="parsing"):
out_dir = PROCESSED_DIR / module / "parsed"
out_dir.mkdir(parents=True, exist_ok=True)
out_path = out_dir / f"{metadata['doc_id']}.json"
if out_path.exists() and out_path.stat().st_size > 0:
n_skipped += 1
continue
try:
doc = parse_document(raw_path, metadata, module)
out_path.write_text(json.dumps(doc.to_dict(), indent=None))
n_ok += 1
except Exception as e:
n_failed += 1
failures.append((metadata["doc_id"], f"{type(e).__name__}: {e}"))
traceback.print_exc(file=sys.stderr)
summary = {
"n_total": len(docs),
"n_ok": n_ok,
"n_skipped": n_skipped,
"n_failed": n_failed,
"failures": [{"doc_id": d, "error": e} for d, e in failures],
}
summary_path = PROCESSED_DIR / "_parse_summary.json"
summary_path.parent.mkdir(parents=True, exist_ok=True)
summary_path.write_text(json.dumps(summary, indent=2))
print(f"\n=== Parse summary ===")
print(f" ok: {n_ok}")
print(f" skipped: {n_skipped} (already parsed)")
print(f" failed: {n_failed}")
if failures:
print("\n--- failures ---")
for d, e in failures:
print(f" {d}: {e}")
return 1
return 0
if __name__ == "__main__":
sys.exit(main())
|