Reframr-RFM-v2-Base / reframr /materialize.py
OkeyMeta's picture
Add Reframr-RFM-v2-Base release files
52da7b7 verified
from __future__ import annotations
import json
import re
from collections import OrderedDict
from collections.abc import Iterable
from pathlib import Path
from .streaming import CorpusPlanEntry, StreamDocument, iter_corpus_plan_documents
DEFAULT_CACHE_BYTE_LIMIT = 3 * 1024 * 1024 * 1024
DEFAULT_SHARD_BYTE_LIMIT = 256 * 1024 * 1024
_SAFE_NAME_PATTERN = re.compile(r"[^A-Za-z0-9_.-]+")
def _safe_source_name(name: str) -> str:
cleaned = _SAFE_NAME_PATTERN.sub("-", name.strip()).strip("-._")
return cleaned or "source"
def _jsonl_bytes(record: dict[str, object]) -> bytes:
return (json.dumps(record, ensure_ascii=False, separators=(",", ":")) + "\n").encode("utf-8")
def _file_entry_for_group(
*,
source: str,
path: Path,
document: StreamDocument,
rows: int,
) -> dict[str, object]:
return {
"source": "file",
"name": source,
"path": str(path.resolve()),
"limit": rows,
"weight": document.weight,
"readout_weight": document.readout_weight,
"transition_weight": document.transition_weight,
"min_words": 1,
"max_words": 0,
"min_alpha_ratio": 0.0,
"allowed_languages": [],
"streaming": True,
}
def materialize_corpus_plan(
plan: Iterable[CorpusPlanEntry],
output_dir: str | Path,
*,
max_bytes: int = DEFAULT_CACHE_BYTE_LIMIT,
shard_bytes: int = DEFAULT_SHARD_BYTE_LIMIT,
log_every: int = 0,
) -> dict[str, object]:
if max_bytes <= 0:
raise ValueError("max_bytes must be positive.")
if shard_bytes <= 0:
raise ValueError("shard_bytes must be positive.")
output = Path(output_dir)
output.mkdir(parents=True, exist_ok=True)
bytes_written = 0
documents_written = 0
source_counts: OrderedDict[str, int] = OrderedDict()
file_entries: list[dict[str, object]] = []
open_handles: dict[str, object] = {}
open_paths: dict[str, Path] = {}
open_sizes: dict[str, int] = {}
shard_indices: dict[str, int] = {}
first_documents: dict[str, StreamDocument] = {}
def close_all() -> None:
for handle in open_handles.values():
handle.close()
def open_next_shard(source: str) -> object:
handle = open_handles.pop(source, None)
if handle is not None:
handle.close()
shard_index = shard_indices.get(source, 0)
shard_indices[source] = shard_index + 1
path = output / f"{_safe_source_name(source)}-{shard_index:04d}.jsonl"
open_paths[source] = path
open_sizes[source] = 0
new_handle = path.open("w", encoding="utf-8", newline="\n")
open_handles[source] = new_handle
return new_handle
try:
for document in iter_corpus_plan_documents(plan):
source = document.source or "source"
record = {
"text": document.text,
"language": document.language,
"source": source,
}
if document.preference_rejected_text:
record["preference_rejected_text"] = document.preference_rejected_text
encoded = _jsonl_bytes(record)
if bytes_written + len(encoded) > max_bytes:
break
handle = open_handles.get(source)
if handle is None:
handle = open_next_shard(source)
if open_sizes[source] > 0 and open_sizes[source] + len(encoded) > shard_bytes:
path = open_paths[source]
rows = source_counts.get(str(path), 0)
if rows > 0:
file_entries.append(
_file_entry_for_group(
source=source,
path=path,
document=first_documents[str(path)],
rows=rows,
)
)
handle = open_next_shard(source)
path_key = str(open_paths[source])
if path_key not in first_documents:
first_documents[path_key] = document
handle.write(encoded.decode("utf-8"))
open_sizes[source] += len(encoded)
bytes_written += len(encoded)
documents_written += 1
source_counts[path_key] = source_counts.get(path_key, 0) + 1
if log_every > 0 and documents_written % log_every == 0:
print(
f"[materialize] wrote {documents_written} documents "
f"({bytes_written} bytes)",
flush=True,
)
finally:
close_all()
emitted_paths = {entry["path"] for entry in file_entries}
for path_key, rows in source_counts.items():
path = Path(path_key)
if str(path.resolve()) in emitted_paths:
continue
if rows <= 0:
continue
source = path.stem.rsplit("-", 1)[0]
file_entries.append(
_file_entry_for_group(
source=source,
path=path,
document=first_documents[path_key],
rows=rows,
)
)
plan_path = output / "materialized-plan.json"
manifest_path = output / "materialized-manifest.json"
plan_payload = {
"schema_version": "reframr.materialized_plan.v1",
"sources": file_entries,
"notes": [
"Materialized from a Reframr corpus plan with normalized JSONL rows.",
"Raw upstream dataset repositories are not cached by this file.",
],
}
manifest = {
"status": "materialized",
"documents_written": documents_written,
"bytes_written": bytes_written,
"max_bytes": max_bytes,
"shard_bytes": shard_bytes,
"source_count": len(file_entries),
"plan_path": str(plan_path.resolve()),
}
plan_path.write_text(json.dumps(plan_payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
manifest_path.write_text(json.dumps(manifest, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
return {**manifest, "manifest_path": str(manifest_path.resolve())}