| """ | |
| Data pipeline: streams and tokenizes OpenWebText for pretraining. | |
| Packs sequences to max_seq_len for efficiency (no padding waste). | |
| """ | |
| import os | |
| import torch | |
| from torch.utils.data import IterableDataset, DataLoader | |
| from datasets import load_dataset | |
| from transformers import AutoTokenizer | |
| def get_tokenizer(name: str = "mistralai/Mistral-7B-v0.1"): | |
| """Use Mistral's tokenizer — 32k vocab, BPE, well-trained on diverse data.""" | |
| tok = AutoTokenizer.from_pretrained(name, use_fast=True) | |
| if tok.pad_token is None: | |
| tok.pad_token = tok.eos_token | |
| return tok | |
| class PackedPretrainDataset(IterableDataset): | |
| """ | |
| Streams text from HuggingFace dataset, tokenizes on the fly, | |
| and packs into fixed-length sequences for maximum GPU utilization. | |
| """ | |
| def __init__(self, tokenizer, max_seq_len: int, split: str = "train", cache_dir: str = None, seed: int = 42): | |
| self.tokenizer = tokenizer | |
| self.max_seq_len = max_seq_len | |
| self.split = split | |
| self.cache_dir = cache_dir | |
| self.seed = seed | |
| self.eos_id = tokenizer.eos_token_id | |
| def _token_stream(self): | |
| ds = load_dataset( | |
| "HuggingFaceFW/fineweb-edu", | |
| name="sample-10BT", | |
| split=self.split, | |
| streaming=True, | |
| cache_dir=self.cache_dir, | |
| ) | |
| ds = ds.shuffle(seed=self.seed, buffer_size=10_000) | |
| for example in ds: | |
| text = example.get("text", "") | |
| if len(text.strip()) < 50: | |
| continue | |
| token_ids = self.tokenizer.encode(text, add_special_tokens=False) | |
| yield from token_ids | |
| yield self.eos_id | |
| def __iter__(self): | |
| buffer = [] | |
| for token_id in self._token_stream(): | |
| buffer.append(token_id) | |
| if len(buffer) == self.max_seq_len + 1: | |
| input_ids = torch.tensor(buffer[:-1], dtype=torch.long) | |
| labels = torch.tensor(buffer[1:], dtype=torch.long) | |
| yield input_ids, labels | |
| buffer = [] | |
| def create_dataloader(tokenizer, config, rank: int = 0, world_size: int = 1, seed_override: int = None): | |
| seed = seed_override if seed_override is not None else config.seed | |
| dataset = PackedPretrainDataset( | |
| tokenizer=tokenizer, | |
| max_seq_len=config.max_seq_len, | |
| split="train", | |
| cache_dir=config.data_cache_dir, | |
| seed=seed + rank, | |
| ) | |
| return DataLoader( | |
| dataset, | |
| batch_size=config.batch_size_per_gpu, | |
| num_workers=config.num_workers, | |
| pin_memory=True, | |
| prefetch_factor=4, | |
| ) | |