| from __future__ import annotations |
|
|
| import json |
| import re |
| from collections import OrderedDict |
| from collections.abc import Iterable |
| from pathlib import Path |
|
|
| from .streaming import CorpusPlanEntry, StreamDocument, iter_corpus_plan_documents |
|
|
|
|
| DEFAULT_CACHE_BYTE_LIMIT = 3 * 1024 * 1024 * 1024 |
| DEFAULT_SHARD_BYTE_LIMIT = 256 * 1024 * 1024 |
| _SAFE_NAME_PATTERN = re.compile(r"[^A-Za-z0-9_.-]+") |
|
|
|
|
| def _safe_source_name(name: str) -> str: |
| cleaned = _SAFE_NAME_PATTERN.sub("-", name.strip()).strip("-._") |
| return cleaned or "source" |
|
|
|
|
| def _jsonl_bytes(record: dict[str, object]) -> bytes: |
| return (json.dumps(record, ensure_ascii=False, separators=(",", ":")) + "\n").encode("utf-8") |
|
|
|
|
| def _file_entry_for_group( |
| *, |
| source: str, |
| path: Path, |
| document: StreamDocument, |
| rows: int, |
| ) -> dict[str, object]: |
| return { |
| "source": "file", |
| "name": source, |
| "path": str(path.resolve()), |
| "limit": rows, |
| "weight": document.weight, |
| "readout_weight": document.readout_weight, |
| "transition_weight": document.transition_weight, |
| "min_words": 1, |
| "max_words": 0, |
| "min_alpha_ratio": 0.0, |
| "allowed_languages": [], |
| "streaming": True, |
| } |
|
|
|
|
| def materialize_corpus_plan( |
| plan: Iterable[CorpusPlanEntry], |
| output_dir: str | Path, |
| *, |
| max_bytes: int = DEFAULT_CACHE_BYTE_LIMIT, |
| shard_bytes: int = DEFAULT_SHARD_BYTE_LIMIT, |
| log_every: int = 0, |
| ) -> dict[str, object]: |
| if max_bytes <= 0: |
| raise ValueError("max_bytes must be positive.") |
| if shard_bytes <= 0: |
| raise ValueError("shard_bytes must be positive.") |
|
|
| output = Path(output_dir) |
| output.mkdir(parents=True, exist_ok=True) |
|
|
| bytes_written = 0 |
| documents_written = 0 |
| source_counts: OrderedDict[str, int] = OrderedDict() |
| file_entries: list[dict[str, object]] = [] |
| open_handles: dict[str, object] = {} |
| open_paths: dict[str, Path] = {} |
| open_sizes: dict[str, int] = {} |
| shard_indices: dict[str, int] = {} |
| first_documents: dict[str, StreamDocument] = {} |
|
|
| def close_all() -> None: |
| for handle in open_handles.values(): |
| handle.close() |
|
|
| def open_next_shard(source: str) -> object: |
| handle = open_handles.pop(source, None) |
| if handle is not None: |
| handle.close() |
| shard_index = shard_indices.get(source, 0) |
| shard_indices[source] = shard_index + 1 |
| path = output / f"{_safe_source_name(source)}-{shard_index:04d}.jsonl" |
| open_paths[source] = path |
| open_sizes[source] = 0 |
| new_handle = path.open("w", encoding="utf-8", newline="\n") |
| open_handles[source] = new_handle |
| return new_handle |
|
|
| try: |
| for document in iter_corpus_plan_documents(plan): |
| source = document.source or "source" |
| record = { |
| "text": document.text, |
| "language": document.language, |
| "source": source, |
| } |
| if document.preference_rejected_text: |
| record["preference_rejected_text"] = document.preference_rejected_text |
| encoded = _jsonl_bytes(record) |
| if bytes_written + len(encoded) > max_bytes: |
| break |
|
|
| handle = open_handles.get(source) |
| if handle is None: |
| handle = open_next_shard(source) |
| if open_sizes[source] > 0 and open_sizes[source] + len(encoded) > shard_bytes: |
| path = open_paths[source] |
| rows = source_counts.get(str(path), 0) |
| if rows > 0: |
| file_entries.append( |
| _file_entry_for_group( |
| source=source, |
| path=path, |
| document=first_documents[str(path)], |
| rows=rows, |
| ) |
| ) |
| handle = open_next_shard(source) |
|
|
| path_key = str(open_paths[source]) |
| if path_key not in first_documents: |
| first_documents[path_key] = document |
| handle.write(encoded.decode("utf-8")) |
| open_sizes[source] += len(encoded) |
| bytes_written += len(encoded) |
| documents_written += 1 |
| source_counts[path_key] = source_counts.get(path_key, 0) + 1 |
| if log_every > 0 and documents_written % log_every == 0: |
| print( |
| f"[materialize] wrote {documents_written} documents " |
| f"({bytes_written} bytes)", |
| flush=True, |
| ) |
| finally: |
| close_all() |
|
|
| emitted_paths = {entry["path"] for entry in file_entries} |
| for path_key, rows in source_counts.items(): |
| path = Path(path_key) |
| if str(path.resolve()) in emitted_paths: |
| continue |
| if rows <= 0: |
| continue |
| source = path.stem.rsplit("-", 1)[0] |
| file_entries.append( |
| _file_entry_for_group( |
| source=source, |
| path=path, |
| document=first_documents[path_key], |
| rows=rows, |
| ) |
| ) |
|
|
| plan_path = output / "materialized-plan.json" |
| manifest_path = output / "materialized-manifest.json" |
| plan_payload = { |
| "schema_version": "reframr.materialized_plan.v1", |
| "sources": file_entries, |
| "notes": [ |
| "Materialized from a Reframr corpus plan with normalized JSONL rows.", |
| "Raw upstream dataset repositories are not cached by this file.", |
| ], |
| } |
| manifest = { |
| "status": "materialized", |
| "documents_written": documents_written, |
| "bytes_written": bytes_written, |
| "max_bytes": max_bytes, |
| "shard_bytes": shard_bytes, |
| "source_count": len(file_entries), |
| "plan_path": str(plan_path.resolve()), |
| } |
| plan_path.write_text(json.dumps(plan_payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") |
| manifest_path.write_text(json.dumps(manifest, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") |
| return {**manifest, "manifest_path": str(manifest_path.resolve())} |
|
|