File size: 2,628 Bytes
eaf2faa | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 | """
Data pipeline: streams and tokenizes OpenWebText for pretraining.
Packs sequences to max_seq_len for efficiency (no padding waste).
"""
import os
import torch
from torch.utils.data import IterableDataset, DataLoader
from datasets import load_dataset
from transformers import AutoTokenizer
def get_tokenizer(name: str = "mistralai/Mistral-7B-v0.1"):
"""Use Mistral's tokenizer — 32k vocab, BPE, well-trained on diverse data."""
tok = AutoTokenizer.from_pretrained(name, use_fast=True)
if tok.pad_token is None:
tok.pad_token = tok.eos_token
return tok
class PackedPretrainDataset(IterableDataset):
"""
Streams text from HuggingFace dataset, tokenizes on the fly,
and packs into fixed-length sequences for maximum GPU utilization.
"""
def __init__(self, tokenizer, max_seq_len: int, split: str = "train", cache_dir: str = None, seed: int = 42):
self.tokenizer = tokenizer
self.max_seq_len = max_seq_len
self.split = split
self.cache_dir = cache_dir
self.seed = seed
self.eos_id = tokenizer.eos_token_id
def _token_stream(self):
ds = load_dataset(
"HuggingFaceFW/fineweb-edu",
name="sample-10BT",
split=self.split,
streaming=True,
cache_dir=self.cache_dir,
)
ds = ds.shuffle(seed=self.seed, buffer_size=10_000)
for example in ds:
text = example.get("text", "")
if len(text.strip()) < 50:
continue
token_ids = self.tokenizer.encode(text, add_special_tokens=False)
yield from token_ids
yield self.eos_id
def __iter__(self):
buffer = []
for token_id in self._token_stream():
buffer.append(token_id)
if len(buffer) == self.max_seq_len + 1:
input_ids = torch.tensor(buffer[:-1], dtype=torch.long)
labels = torch.tensor(buffer[1:], dtype=torch.long)
yield input_ids, labels
buffer = []
def create_dataloader(tokenizer, config, rank: int = 0, world_size: int = 1, seed_override: int = None):
seed = seed_override if seed_override is not None else config.seed
dataset = PackedPretrainDataset(
tokenizer=tokenizer,
max_seq_len=config.max_seq_len,
split="train",
cache_dir=config.data_cache_dir,
seed=seed + rank,
)
return DataLoader(
dataset,
batch_size=config.batch_size_per_gpu,
num_workers=config.num_workers,
pin_memory=True,
prefetch_factor=4,
)
|