frankenstallm / data /prepare.py
pathcosmos's picture
feat: Add data pipeline scripts + phase reports (Tier 3 - reproducibility)
b3d361d verified
"""
Prepare raw text files (or a HuggingFace dataset) for LLM training.
Tokenizes all input text, concatenates all token IDs into a single flat
sequence, splits into train / validation sets, and saves each as a uint16
numpy binary file (.bin) ready for TextDataset / PackedDataset.
Usage — glob of local text files:
python data/prepare.py \
--input "data/raw/*.txt" \
--output data/train.bin \
--val_output data/val.bin \
--tokenizer tokenizer/tokenizer.json \
--val_split 0.005 \
--seed 42
Usage — HuggingFace dataset (streaming):
python data/prepare.py \
--hf_dataset allenai/c4 \
--hf_subset en \
--hf_split train \
--hf_text_col text \
--output data/train.bin \
--val_output data/val.bin \
--tokenizer tokenizer/tokenizer.json \
--val_split 0.005
"""
from __future__ import annotations
import argparse
import glob
import os
import random
import sys
from pathlib import Path
import numpy as np
from tokenizers import Tokenizer
from tqdm import tqdm
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def load_tokenizer(tokenizer_path: str) -> Tokenizer:
path = Path(tokenizer_path)
if not path.exists():
raise FileNotFoundError(f"Tokenizer not found: {path}")
return Tokenizer.from_file(str(path))
def find_input_files(pattern: str) -> list[str]:
"""Resolve a glob pattern or a plain file path to a list of files."""
if any(c in pattern for c in ("*", "?", "[")):
files = sorted(glob.glob(pattern, recursive=True))
else:
files = [pattern] if Path(pattern).exists() else []
if not files:
raise FileNotFoundError(f"No files matched pattern: {pattern!r}")
return files
def tokenize_file(path: str, tokenizer: Tokenizer) -> list[int]:
"""Read a single text file and return its token IDs."""
with open(path, "r", encoding="utf-8", errors="replace") as fh:
text = fh.read()
return tokenizer.encode(text).ids
def derive_val_path(output_path: Path, val_output_arg: str | None) -> Path:
"""Return the val .bin path, either explicitly provided or auto-derived."""
if val_output_arg:
return Path(val_output_arg)
# If the stem contains "train", swap it for "val".
if "train" in output_path.name:
candidate = output_path.parent / output_path.name.replace("train", "val")
if candidate != output_path:
return candidate
# Generic fallback: append _val before the suffix.
return output_path.with_name(output_path.stem + "_val" + output_path.suffix)
def save_bin(tokens: list[int], path: Path) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
np.array(tokens, dtype=np.uint16).tofile(str(path))
def _fmt_bytes(n_tokens: int) -> str:
"""Return a human-readable size string for a uint16 token array."""
nbytes = n_tokens * 2 # uint16 = 2 bytes per token
for unit in ("B", "KB", "MB", "GB", "TB"):
if nbytes < 1024:
return f"{nbytes:.1f} {unit}"
nbytes /= 1024
return f"{nbytes:.1f} PB"
# ---------------------------------------------------------------------------
# Source iterators
# ---------------------------------------------------------------------------
def iter_tokens_from_files(
input_files: list[str],
tokenizer: Tokenizer,
seed: int,
) -> tuple[list[int], int]:
"""
Tokenize every file, shuffle at file level, flatten, and return
(all_tokens_shuffled, file_count).
"""
per_file_tokens: list[list[int]] = []
for fpath in tqdm(input_files, desc="Tokenizing", unit="file"):
per_file_tokens.append(tokenize_file(fpath, tokenizer))
rng = random.Random(seed)
rng.shuffle(per_file_tokens)
all_tokens: list[int] = []
for toks in per_file_tokens:
all_tokens.extend(toks)
return all_tokens, len(input_files)
def iter_tokens_from_hf(
hf_dataset: str,
hf_subset: str | None,
hf_split: str,
hf_text_col: str,
tokenizer: Tokenizer,
) -> tuple[list[int], int]:
"""
Stream a HuggingFace dataset row-by-row, tokenize each row's text column,
and return (all_tokens, row_count).
Rows are appended in streaming order; no shuffle is performed here because
the stream may be very large. A seed-based split by position is used later.
"""
try:
from datasets import load_dataset
except ImportError:
raise ImportError(
"The 'datasets' package is required for --hf_dataset. "
"Install it with: pip install datasets"
)
print(f"Streaming HuggingFace dataset: {hf_dataset}"
+ (f" / {hf_subset}" if hf_subset else "")
+ f" split={hf_split}")
ds = load_dataset(
hf_dataset,
hf_subset,
split=hf_split,
streaming=True,
trust_remote_code=True,
)
all_tokens: list[int] = []
row_count = 0
pbar = tqdm(desc="Tokenizing rows", unit="row")
for row in ds:
text = row.get(hf_text_col, "")
if text:
all_tokens.extend(tokenizer.encode(text).ids)
row_count += 1
pbar.update(1)
pbar.close()
return all_tokens, row_count
# ---------------------------------------------------------------------------
# Argument parsing
# ---------------------------------------------------------------------------
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description=(
"Tokenize text sources and save as uint16 binary files for LLM training. "
"Accepts either a glob of local text files (--input) or a HuggingFace "
"dataset (--hf_dataset)."
)
)
# --- Input source (mutually exclusive) ---
source = parser.add_mutually_exclusive_group()
source.add_argument(
"--input",
default=None,
help='Glob pattern or path to a single text file, e.g. "data/raw/*.txt"',
)
source.add_argument(
"--hf_dataset",
default=None,
metavar="DATASET",
help="HuggingFace dataset name, e.g. allenai/c4 (alternative to --input)",
)
# --- HuggingFace-specific options ---
parser.add_argument(
"--hf_subset",
default=None,
metavar="SUBSET",
help="Dataset subset / config name, e.g. 'en' for allenai/c4",
)
parser.add_argument(
"--hf_split",
default="train",
metavar="SPLIT",
help="Dataset split to use (default: train)",
)
parser.add_argument(
"--hf_text_col",
default="text",
metavar="COLUMN",
help="Name of the text column in the dataset (default: text)",
)
# --- Output paths ---
parser.add_argument(
"--output",
required=True,
help="Output path for the training binary, e.g. data/train.bin",
)
parser.add_argument(
"--val_output",
default=None,
metavar="PATH",
help=(
"Explicit output path for the validation binary "
"(default: auto-derived from --output, e.g. train.bin → val.bin)"
),
)
# --- Tokenizer ---
parser.add_argument(
"--tokenizer",
default="tokenizer/tokenizer.json",
help="Path to a trained tokenizer JSON file (default: tokenizer/tokenizer.json)",
)
# --- Split / reproducibility ---
parser.add_argument(
"--val_split",
type=float,
default=0.005,
help="Fraction of tokens reserved for validation (default: 0.005)",
)
parser.add_argument(
"--seed",
type=int,
default=42,
help="Random seed for reproducible train/val split (default: 42)",
)
args = parser.parse_args()
# Require at least one input source.
if args.input is None and args.hf_dataset is None:
parser.error("One of --input or --hf_dataset is required.")
return args
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> None:
args = parse_args()
# ---- Load tokenizer ----
tokenizer = load_tokenizer(args.tokenizer)
vocab_size = tokenizer.get_vocab_size()
# Warn early if IDs could overflow uint16.
if vocab_size > 65535:
print(
"WARNING: vocab_size > 65535; token IDs above 65535 will be "
"truncated when cast to uint16.",
file=sys.stderr,
)
# ---- Collect tokens from the chosen source ----
if args.hf_dataset:
all_tokens, source_count = iter_tokens_from_hf(
hf_dataset=args.hf_dataset,
hf_subset=args.hf_subset,
hf_split=args.hf_split,
hf_text_col=args.hf_text_col,
tokenizer=tokenizer,
)
source_label = f"{source_count:,} rows"
else:
input_files = find_input_files(args.input)
print(f"Found {len(input_files)} input file(s).")
all_tokens, source_count = iter_tokens_from_files(
input_files=input_files,
tokenizer=tokenizer,
seed=args.seed,
)
source_label = f"{source_count:,} files"
total_tokens = len(all_tokens)
# ---- Split into train / val ----
val_size = max(1, int(total_tokens * args.val_split))
train_size = total_tokens - val_size
train_tokens = all_tokens[:train_size]
val_tokens = all_tokens[train_size:]
# ---- Resolve output paths ----
train_path = Path(args.output)
val_path = derive_val_path(train_path, args.val_output)
# ---- Save ----
print(f"\nSaving train data -> {train_path}")
save_bin(train_tokens, train_path)
print(f"Saving val data -> {val_path}")
save_bin(val_tokens, val_path)
# ---- Final stats ----
tokens_per_step = 8 * 2048 * 4 * 8 # bs=8, seq=2048, accum=4, 8 GPUs
estimated_steps = train_size // tokens_per_step
print()
print(f"Tokenizer: {args.tokenizer} (vocab_size={vocab_size:,})")
print(f"Total tokens: {total_tokens:,}")
print(
f"Train tokens: {train_size:,}"
f" (stored in {train_path}, {_fmt_bytes(train_size)})"
)
print(
f"Val tokens: {val_size:,}"
f" (stored in {val_path}, {_fmt_bytes(val_size)})"
)
print(
f"Estimated steps (bs=8, seq=2048, 8 GPUs, accum=4): {estimated_steps:,}"
)
if __name__ == "__main__":
main()