#!/usr/bin/env python3 """ merge_chunks.py Merge multiple JSONL chunk files into a single chunks.jsonl, with safety checks: - validates required fields - fails on duplicate chunk_id - optional: merges manifests into one manifest.json - outputs a merge_report.json (doc_id counts, totals, sources) Usage examples: # simplest: merge two chunk files python scripts/merge_chunks.py \ --out data/normalized_v1_1/chunks.jsonl \ data/normalized_v1_1/chunks_books.jsonl \ data/normalized_v1_1/chunks_articles.jsonl # also merge manifests (optional) python scripts/merge_chunks.py \ --out data/normalized_v1_1/chunks.jsonl \ --out-manifest data/normalized_v1_1/manifest.json \ --manifest data/normalized_v1_1/manifest_books.json \ --manifest data/normalized_v1_1/manifest_articles.json \ data/normalized_v1_1/chunks_books.jsonl \ data/normalized_v1_1/chunks_articles.jsonl """ import argparse import json import os from collections import Counter from datetime import datetime from typing import Dict, Iterable, List, Optional REQUIRED_FIELDS = ("chunk_id", "doc_id", "doc_title", "text") def iter_jsonl(path: str) -> Iterable[Dict]: with open(path, "r", encoding="utf-8") as f: for line_no, line in enumerate(f, start=1): line = line.strip() if not line: continue try: obj = json.loads(line) except json.JSONDecodeError as e: raise RuntimeError(f"Invalid JSON in {path} at line {line_no}: {e}") from e yield obj def validate_record(obj: Dict, path: str) -> None: missing = [k for k in REQUIRED_FIELDS if k not in obj or obj[k] in (None, "")] if missing: raise RuntimeError(f"Missing required fields {missing} in {path} for chunk_id={obj.get('chunk_id')!r}") def load_manifest(path: str) -> Dict: with open(path, "r", encoding="utf-8") as f: return json.load(f) def merge_manifests(manifest_paths: List[str]) -> Dict: """ Assumes each manifest has: { "generated_at": "...", "documents": [ { "id": ..., "title": ..., "format": ..., "filename": ..., ... }, ... ] } We combine documents by `id` (first wins, duplicates skipped). """ combined_docs = [] seen_ids = set() for p in manifest_paths: mf = load_manifest(p) for doc in mf.get("documents", []): doc_id = doc.get("id") if not doc_id: continue if doc_id in seen_ids: continue seen_ids.add(doc_id) combined_docs.append(doc) return { "generated_at": datetime.utcnow().isoformat() + "Z", "documents": combined_docs, "sources": manifest_paths, } def main(): ap = argparse.ArgumentParser() ap.add_argument("--out", required=True, help="Output merged chunks.jsonl path") ap.add_argument("--out-manifest", default=None, help="Output merged manifest.json path (optional)") ap.add_argument("--manifest", action="append", default=[], help="Input manifest.json path (repeatable, optional)") ap.add_argument("--out-report", default=None, help="Output merge report JSON path (default: /merge_report.json)") ap.add_argument("--allow-duplicate-chunk-id", action="store_true", help="If set, duplicate chunk_id records are skipped instead of failing (NOT recommended).") ap.add_argument("inputs", nargs="+", help="Input chunk JSONL files to merge, in order") args = ap.parse_args() out_path = args.out out_dir = os.path.dirname(out_path) or "." os.makedirs(out_dir, exist_ok=True) report_path = args.out_report or os.path.join(out_dir, "merge_report.json") seen_chunk_ids = set() doc_counts = Counter() total_written = 0 inputs_summary = [] with open(out_path, "w", encoding="utf-8") as out_f: for in_path in args.inputs: if not os.path.exists(in_path): raise FileNotFoundError(in_path) in_written = 0 in_skipped_dupes = 0 for obj in iter_jsonl(in_path): validate_record(obj, in_path) cid = str(obj["chunk_id"]) if cid in seen_chunk_ids: if args.allow_duplicate_chunk_id: in_skipped_dupes += 1 continue raise RuntimeError(f"Duplicate chunk_id detected: {cid} (from {in_path}). Aborting.") seen_chunk_ids.add(cid) out_f.write(json.dumps(obj, ensure_ascii=False) + "\n") total_written += 1 in_written += 1 doc_counts[str(obj["doc_id"])] += 1 inputs_summary.append({ "input": in_path, "written": in_written, "skipped_duplicate_chunk_id": in_skipped_dupes, }) # Optional: merged manifest if args.out_manifest: if not args.manifest: raise RuntimeError("--out-manifest provided but no --manifest inputs were provided.") merged = merge_manifests(args.manifest) with open(args.out_manifest, "w", encoding="utf-8") as f: json.dump(merged, f, indent=2, ensure_ascii=False) # Merge report report = { "generated_at": datetime.utcnow().isoformat() + "Z", "out_chunks": out_path, "out_manifest": args.out_manifest, "total_chunks_written": total_written, "unique_chunk_ids": len(seen_chunk_ids), "doc_id_counts": dict(doc_counts), "inputs": inputs_summary, "notes": [ "If total_chunks_written != unique_chunk_ids, you allowed duplicate chunk_id skipping.", "Pin this merged file as your dataset v1.1.0 before embedding/indexing.", ], } with open(report_path, "w", encoding="utf-8") as f: json.dump(report, f, indent=2, ensure_ascii=False) print(f"[OK] Wrote merged chunks: {out_path}") if args.out_manifest: print(f"[OK] Wrote merged manifest: {args.out_manifest}") print(f"[OK] Wrote merge report: {report_path}") print(f"[OK] Total chunks: {total_written} | Unique chunk_ids: {len(seen_chunk_ids)}") if __name__ == "__main__": main()