from __future__ import annotations import json import re from collections import OrderedDict from collections.abc import Iterable from pathlib import Path from .streaming import CorpusPlanEntry, StreamDocument, iter_corpus_plan_documents DEFAULT_CACHE_BYTE_LIMIT = 3 * 1024 * 1024 * 1024 DEFAULT_SHARD_BYTE_LIMIT = 256 * 1024 * 1024 _SAFE_NAME_PATTERN = re.compile(r"[^A-Za-z0-9_.-]+") def _safe_source_name(name: str) -> str: cleaned = _SAFE_NAME_PATTERN.sub("-", name.strip()).strip("-._") return cleaned or "source" def _jsonl_bytes(record: dict[str, object]) -> bytes: return (json.dumps(record, ensure_ascii=False, separators=(",", ":")) + "\n").encode("utf-8") def _file_entry_for_group( *, source: str, path: Path, document: StreamDocument, rows: int, ) -> dict[str, object]: return { "source": "file", "name": source, "path": str(path.resolve()), "limit": rows, "weight": document.weight, "readout_weight": document.readout_weight, "transition_weight": document.transition_weight, "min_words": 1, "max_words": 0, "min_alpha_ratio": 0.0, "allowed_languages": [], "streaming": True, } def materialize_corpus_plan( plan: Iterable[CorpusPlanEntry], output_dir: str | Path, *, max_bytes: int = DEFAULT_CACHE_BYTE_LIMIT, shard_bytes: int = DEFAULT_SHARD_BYTE_LIMIT, log_every: int = 0, ) -> dict[str, object]: if max_bytes <= 0: raise ValueError("max_bytes must be positive.") if shard_bytes <= 0: raise ValueError("shard_bytes must be positive.") output = Path(output_dir) output.mkdir(parents=True, exist_ok=True) bytes_written = 0 documents_written = 0 source_counts: OrderedDict[str, int] = OrderedDict() file_entries: list[dict[str, object]] = [] open_handles: dict[str, object] = {} open_paths: dict[str, Path] = {} open_sizes: dict[str, int] = {} shard_indices: dict[str, int] = {} first_documents: dict[str, StreamDocument] = {} def close_all() -> None: for handle in open_handles.values(): handle.close() def open_next_shard(source: str) -> object: handle = open_handles.pop(source, None) if handle is not None: handle.close() shard_index = shard_indices.get(source, 0) shard_indices[source] = shard_index + 1 path = output / f"{_safe_source_name(source)}-{shard_index:04d}.jsonl" open_paths[source] = path open_sizes[source] = 0 new_handle = path.open("w", encoding="utf-8", newline="\n") open_handles[source] = new_handle return new_handle try: for document in iter_corpus_plan_documents(plan): source = document.source or "source" record = { "text": document.text, "language": document.language, "source": source, } if document.preference_rejected_text: record["preference_rejected_text"] = document.preference_rejected_text encoded = _jsonl_bytes(record) if bytes_written + len(encoded) > max_bytes: break handle = open_handles.get(source) if handle is None: handle = open_next_shard(source) if open_sizes[source] > 0 and open_sizes[source] + len(encoded) > shard_bytes: path = open_paths[source] rows = source_counts.get(str(path), 0) if rows > 0: file_entries.append( _file_entry_for_group( source=source, path=path, document=first_documents[str(path)], rows=rows, ) ) handle = open_next_shard(source) path_key = str(open_paths[source]) if path_key not in first_documents: first_documents[path_key] = document handle.write(encoded.decode("utf-8")) open_sizes[source] += len(encoded) bytes_written += len(encoded) documents_written += 1 source_counts[path_key] = source_counts.get(path_key, 0) + 1 if log_every > 0 and documents_written % log_every == 0: print( f"[materialize] wrote {documents_written} documents " f"({bytes_written} bytes)", flush=True, ) finally: close_all() emitted_paths = {entry["path"] for entry in file_entries} for path_key, rows in source_counts.items(): path = Path(path_key) if str(path.resolve()) in emitted_paths: continue if rows <= 0: continue source = path.stem.rsplit("-", 1)[0] file_entries.append( _file_entry_for_group( source=source, path=path, document=first_documents[path_key], rows=rows, ) ) plan_path = output / "materialized-plan.json" manifest_path = output / "materialized-manifest.json" plan_payload = { "schema_version": "reframr.materialized_plan.v1", "sources": file_entries, "notes": [ "Materialized from a Reframr corpus plan with normalized JSONL rows.", "Raw upstream dataset repositories are not cached by this file.", ], } manifest = { "status": "materialized", "documents_written": documents_written, "bytes_written": bytes_written, "max_bytes": max_bytes, "shard_bytes": shard_bytes, "source_count": len(file_entries), "plan_path": str(plan_path.resolve()), } plan_path.write_text(json.dumps(plan_payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") manifest_path.write_text(json.dumps(manifest, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") return {**manifest, "manifest_path": str(manifest_path.resolve())}