Agentic_RAG / scripts /merge_chunks.py
Oleksii Obolonskyi
Initial commit
d10c06c
#!/usr/bin/env python3
"""
merge_chunks.py
Merge multiple JSONL chunk files into a single chunks.jsonl, with safety checks:
- validates required fields
- fails on duplicate chunk_id
- optional: merges manifests into one manifest.json
- outputs a merge_report.json (doc_id counts, totals, sources)
Usage examples:
# simplest: merge two chunk files
python scripts/merge_chunks.py \
--out data/normalized_v1_1/chunks.jsonl \
data/normalized_v1_1/chunks_books.jsonl \
data/normalized_v1_1/chunks_articles.jsonl
# also merge manifests (optional)
python scripts/merge_chunks.py \
--out data/normalized_v1_1/chunks.jsonl \
--out-manifest data/normalized_v1_1/manifest.json \
--manifest data/normalized_v1_1/manifest_books.json \
--manifest data/normalized_v1_1/manifest_articles.json \
data/normalized_v1_1/chunks_books.jsonl \
data/normalized_v1_1/chunks_articles.jsonl
"""
import argparse
import json
import os
from collections import Counter
from datetime import datetime
from typing import Dict, Iterable, List, Optional
REQUIRED_FIELDS = ("chunk_id", "doc_id", "doc_title", "text")
def iter_jsonl(path: str) -> Iterable[Dict]:
with open(path, "r", encoding="utf-8") as f:
for line_no, line in enumerate(f, start=1):
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
except json.JSONDecodeError as e:
raise RuntimeError(f"Invalid JSON in {path} at line {line_no}: {e}") from e
yield obj
def validate_record(obj: Dict, path: str) -> None:
missing = [k for k in REQUIRED_FIELDS if k not in obj or obj[k] in (None, "")]
if missing:
raise RuntimeError(f"Missing required fields {missing} in {path} for chunk_id={obj.get('chunk_id')!r}")
def load_manifest(path: str) -> Dict:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
def merge_manifests(manifest_paths: List[str]) -> Dict:
"""
Assumes each manifest has:
{
"generated_at": "...",
"documents": [ { "id": ..., "title": ..., "format": ..., "filename": ..., ... }, ... ]
}
We combine documents by `id` (first wins, duplicates skipped).
"""
combined_docs = []
seen_ids = set()
for p in manifest_paths:
mf = load_manifest(p)
for doc in mf.get("documents", []):
doc_id = doc.get("id")
if not doc_id:
continue
if doc_id in seen_ids:
continue
seen_ids.add(doc_id)
combined_docs.append(doc)
return {
"generated_at": datetime.utcnow().isoformat() + "Z",
"documents": combined_docs,
"sources": manifest_paths,
}
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--out", required=True, help="Output merged chunks.jsonl path")
ap.add_argument("--out-manifest", default=None, help="Output merged manifest.json path (optional)")
ap.add_argument("--manifest", action="append", default=[], help="Input manifest.json path (repeatable, optional)")
ap.add_argument("--out-report", default=None, help="Output merge report JSON path (default: <out_dir>/merge_report.json)")
ap.add_argument("--allow-duplicate-chunk-id", action="store_true",
help="If set, duplicate chunk_id records are skipped instead of failing (NOT recommended).")
ap.add_argument("inputs", nargs="+", help="Input chunk JSONL files to merge, in order")
args = ap.parse_args()
out_path = args.out
out_dir = os.path.dirname(out_path) or "."
os.makedirs(out_dir, exist_ok=True)
report_path = args.out_report or os.path.join(out_dir, "merge_report.json")
seen_chunk_ids = set()
doc_counts = Counter()
total_written = 0
inputs_summary = []
with open(out_path, "w", encoding="utf-8") as out_f:
for in_path in args.inputs:
if not os.path.exists(in_path):
raise FileNotFoundError(in_path)
in_written = 0
in_skipped_dupes = 0
for obj in iter_jsonl(in_path):
validate_record(obj, in_path)
cid = str(obj["chunk_id"])
if cid in seen_chunk_ids:
if args.allow_duplicate_chunk_id:
in_skipped_dupes += 1
continue
raise RuntimeError(f"Duplicate chunk_id detected: {cid} (from {in_path}). Aborting.")
seen_chunk_ids.add(cid)
out_f.write(json.dumps(obj, ensure_ascii=False) + "\n")
total_written += 1
in_written += 1
doc_counts[str(obj["doc_id"])] += 1
inputs_summary.append({
"input": in_path,
"written": in_written,
"skipped_duplicate_chunk_id": in_skipped_dupes,
})
# Optional: merged manifest
if args.out_manifest:
if not args.manifest:
raise RuntimeError("--out-manifest provided but no --manifest inputs were provided.")
merged = merge_manifests(args.manifest)
with open(args.out_manifest, "w", encoding="utf-8") as f:
json.dump(merged, f, indent=2, ensure_ascii=False)
# Merge report
report = {
"generated_at": datetime.utcnow().isoformat() + "Z",
"out_chunks": out_path,
"out_manifest": args.out_manifest,
"total_chunks_written": total_written,
"unique_chunk_ids": len(seen_chunk_ids),
"doc_id_counts": dict(doc_counts),
"inputs": inputs_summary,
"notes": [
"If total_chunks_written != unique_chunk_ids, you allowed duplicate chunk_id skipping.",
"Pin this merged file as your dataset v1.1.0 before embedding/indexing.",
],
}
with open(report_path, "w", encoding="utf-8") as f:
json.dump(report, f, indent=2, ensure_ascii=False)
print(f"[OK] Wrote merged chunks: {out_path}")
if args.out_manifest:
print(f"[OK] Wrote merged manifest: {args.out_manifest}")
print(f"[OK] Wrote merge report: {report_path}")
print(f"[OK] Total chunks: {total_written} | Unique chunk_ids: {len(seen_chunk_ids)}")
if __name__ == "__main__":
main()