File size: 6,330 Bytes
d10c06c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
#!/usr/bin/env python3
"""
merge_chunks.py

Merge multiple JSONL chunk files into a single chunks.jsonl, with safety checks:
- validates required fields
- fails on duplicate chunk_id
- optional: merges manifests into one manifest.json
- outputs a merge_report.json (doc_id counts, totals, sources)

Usage examples:

  # simplest: merge two chunk files
  python scripts/merge_chunks.py \
    --out data/normalized_v1_1/chunks.jsonl \
    data/normalized_v1_1/chunks_books.jsonl \
    data/normalized_v1_1/chunks_articles.jsonl

  # also merge manifests (optional)
  python scripts/merge_chunks.py \
    --out data/normalized_v1_1/chunks.jsonl \
    --out-manifest data/normalized_v1_1/manifest.json \
    --manifest data/normalized_v1_1/manifest_books.json \
    --manifest data/normalized_v1_1/manifest_articles.json \
    data/normalized_v1_1/chunks_books.jsonl \
    data/normalized_v1_1/chunks_articles.jsonl
"""

import argparse
import json
import os
from collections import Counter
from datetime import datetime
from typing import Dict, Iterable, List, Optional


REQUIRED_FIELDS = ("chunk_id", "doc_id", "doc_title", "text")


def iter_jsonl(path: str) -> Iterable[Dict]:
    with open(path, "r", encoding="utf-8") as f:
        for line_no, line in enumerate(f, start=1):
            line = line.strip()
            if not line:
                continue
            try:
                obj = json.loads(line)
            except json.JSONDecodeError as e:
                raise RuntimeError(f"Invalid JSON in {path} at line {line_no}: {e}") from e
            yield obj


def validate_record(obj: Dict, path: str) -> None:
    missing = [k for k in REQUIRED_FIELDS if k not in obj or obj[k] in (None, "")]
    if missing:
        raise RuntimeError(f"Missing required fields {missing} in {path} for chunk_id={obj.get('chunk_id')!r}")


def load_manifest(path: str) -> Dict:
    with open(path, "r", encoding="utf-8") as f:
        return json.load(f)


def merge_manifests(manifest_paths: List[str]) -> Dict:
    """
    Assumes each manifest has:
      {
        "generated_at": "...",
        "documents": [ { "id": ..., "title": ..., "format": ..., "filename": ..., ... }, ... ]
      }
    We combine documents by `id` (first wins, duplicates skipped).
    """
    combined_docs = []
    seen_ids = set()

    for p in manifest_paths:
        mf = load_manifest(p)
        for doc in mf.get("documents", []):
            doc_id = doc.get("id")
            if not doc_id:
                continue
            if doc_id in seen_ids:
                continue
            seen_ids.add(doc_id)
            combined_docs.append(doc)

    return {
        "generated_at": datetime.utcnow().isoformat() + "Z",
        "documents": combined_docs,
        "sources": manifest_paths,
    }


def main():
    ap = argparse.ArgumentParser()
    ap.add_argument("--out", required=True, help="Output merged chunks.jsonl path")
    ap.add_argument("--out-manifest", default=None, help="Output merged manifest.json path (optional)")
    ap.add_argument("--manifest", action="append", default=[], help="Input manifest.json path (repeatable, optional)")
    ap.add_argument("--out-report", default=None, help="Output merge report JSON path (default: <out_dir>/merge_report.json)")
    ap.add_argument("--allow-duplicate-chunk-id", action="store_true",
                    help="If set, duplicate chunk_id records are skipped instead of failing (NOT recommended).")
    ap.add_argument("inputs", nargs="+", help="Input chunk JSONL files to merge, in order")
    args = ap.parse_args()

    out_path = args.out
    out_dir = os.path.dirname(out_path) or "."
    os.makedirs(out_dir, exist_ok=True)

    report_path = args.out_report or os.path.join(out_dir, "merge_report.json")

    seen_chunk_ids = set()
    doc_counts = Counter()
    total_written = 0
    inputs_summary = []

    with open(out_path, "w", encoding="utf-8") as out_f:
        for in_path in args.inputs:
            if not os.path.exists(in_path):
                raise FileNotFoundError(in_path)

            in_written = 0
            in_skipped_dupes = 0

            for obj in iter_jsonl(in_path):
                validate_record(obj, in_path)

                cid = str(obj["chunk_id"])
                if cid in seen_chunk_ids:
                    if args.allow_duplicate_chunk_id:
                        in_skipped_dupes += 1
                        continue
                    raise RuntimeError(f"Duplicate chunk_id detected: {cid} (from {in_path}). Aborting.")

                seen_chunk_ids.add(cid)

                out_f.write(json.dumps(obj, ensure_ascii=False) + "\n")
                total_written += 1
                in_written += 1
                doc_counts[str(obj["doc_id"])] += 1

            inputs_summary.append({
                "input": in_path,
                "written": in_written,
                "skipped_duplicate_chunk_id": in_skipped_dupes,
            })

    # Optional: merged manifest
    if args.out_manifest:
        if not args.manifest:
            raise RuntimeError("--out-manifest provided but no --manifest inputs were provided.")
        merged = merge_manifests(args.manifest)
        with open(args.out_manifest, "w", encoding="utf-8") as f:
            json.dump(merged, f, indent=2, ensure_ascii=False)

    # Merge report
    report = {
        "generated_at": datetime.utcnow().isoformat() + "Z",
        "out_chunks": out_path,
        "out_manifest": args.out_manifest,
        "total_chunks_written": total_written,
        "unique_chunk_ids": len(seen_chunk_ids),
        "doc_id_counts": dict(doc_counts),
        "inputs": inputs_summary,
        "notes": [
            "If total_chunks_written != unique_chunk_ids, you allowed duplicate chunk_id skipping.",
            "Pin this merged file as your dataset v1.1.0 before embedding/indexing.",
        ],
    }
    with open(report_path, "w", encoding="utf-8") as f:
        json.dump(report, f, indent=2, ensure_ascii=False)

    print(f"[OK] Wrote merged chunks: {out_path}")
    if args.out_manifest:
        print(f"[OK] Wrote merged manifest: {args.out_manifest}")
    print(f"[OK] Wrote merge report: {report_path}")
    print(f"[OK] Total chunks: {total_written} | Unique chunk_ids: {len(seen_chunk_ids)}")


if __name__ == "__main__":
    main()