""" Data pipeline: streams and tokenizes OpenWebText for pretraining. Packs sequences to max_seq_len for efficiency (no padding waste). """ import os import torch from torch.utils.data import IterableDataset, DataLoader from datasets import load_dataset from transformers import AutoTokenizer def get_tokenizer(name: str = "mistralai/Mistral-7B-v0.1"): """Use Mistral's tokenizer — 32k vocab, BPE, well-trained on diverse data.""" tok = AutoTokenizer.from_pretrained(name, use_fast=True) if tok.pad_token is None: tok.pad_token = tok.eos_token return tok class PackedPretrainDataset(IterableDataset): """ Streams text from HuggingFace dataset, tokenizes on the fly, and packs into fixed-length sequences for maximum GPU utilization. """ def __init__(self, tokenizer, max_seq_len: int, split: str = "train", cache_dir: str = None, seed: int = 42): self.tokenizer = tokenizer self.max_seq_len = max_seq_len self.split = split self.cache_dir = cache_dir self.seed = seed self.eos_id = tokenizer.eos_token_id def _token_stream(self): ds = load_dataset( "HuggingFaceFW/fineweb-edu", name="sample-10BT", split=self.split, streaming=True, cache_dir=self.cache_dir, ) ds = ds.shuffle(seed=self.seed, buffer_size=10_000) for example in ds: text = example.get("text", "") if len(text.strip()) < 50: continue token_ids = self.tokenizer.encode(text, add_special_tokens=False) yield from token_ids yield self.eos_id def __iter__(self): buffer = [] for token_id in self._token_stream(): buffer.append(token_id) if len(buffer) == self.max_seq_len + 1: input_ids = torch.tensor(buffer[:-1], dtype=torch.long) labels = torch.tensor(buffer[1:], dtype=torch.long) yield input_ids, labels buffer = [] def create_dataloader(tokenizer, config, rank: int = 0, world_size: int = 1, seed_override: int = None): seed = seed_override if seed_override is not None else config.seed dataset = PackedPretrainDataset( tokenizer=tokenizer, max_seq_len=config.max_seq_len, split="train", cache_dir=config.data_cache_dir, seed=seed + rank, ) return DataLoader( dataset, batch_size=config.batch_size_per_gpu, num_workers=config.num_workers, pin_memory=True, prefetch_factor=4, )