File size: 2,628 Bytes
eaf2faa
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
"""
Data pipeline: streams and tokenizes OpenWebText for pretraining.
Packs sequences to max_seq_len for efficiency (no padding waste).
"""

import os
import torch
from torch.utils.data import IterableDataset, DataLoader
from datasets import load_dataset
from transformers import AutoTokenizer


def get_tokenizer(name: str = "mistralai/Mistral-7B-v0.1"):
    """Use Mistral's tokenizer — 32k vocab, BPE, well-trained on diverse data."""
    tok = AutoTokenizer.from_pretrained(name, use_fast=True)
    if tok.pad_token is None:
        tok.pad_token = tok.eos_token
    return tok


class PackedPretrainDataset(IterableDataset):
    """
    Streams text from HuggingFace dataset, tokenizes on the fly,
    and packs into fixed-length sequences for maximum GPU utilization.
    """

    def __init__(self, tokenizer, max_seq_len: int, split: str = "train", cache_dir: str = None, seed: int = 42):
        self.tokenizer = tokenizer
        self.max_seq_len = max_seq_len
        self.split = split
        self.cache_dir = cache_dir
        self.seed = seed
        self.eos_id = tokenizer.eos_token_id

    def _token_stream(self):
        ds = load_dataset(
            "HuggingFaceFW/fineweb-edu",
            name="sample-10BT",
            split=self.split,
            streaming=True,
            cache_dir=self.cache_dir,
        )
        ds = ds.shuffle(seed=self.seed, buffer_size=10_000)

        for example in ds:
            text = example.get("text", "")
            if len(text.strip()) < 50:
                continue
            token_ids = self.tokenizer.encode(text, add_special_tokens=False)
            yield from token_ids
            yield self.eos_id

    def __iter__(self):
        buffer = []
        for token_id in self._token_stream():
            buffer.append(token_id)
            if len(buffer) == self.max_seq_len + 1:
                input_ids = torch.tensor(buffer[:-1], dtype=torch.long)
                labels = torch.tensor(buffer[1:], dtype=torch.long)
                yield input_ids, labels
                buffer = []


def create_dataloader(tokenizer, config, rank: int = 0, world_size: int = 1, seed_override: int = None):
    seed = seed_override if seed_override is not None else config.seed
    dataset = PackedPretrainDataset(
        tokenizer=tokenizer,
        max_seq_len=config.max_seq_len,
        split="train",
        cache_dir=config.data_cache_dir,
        seed=seed + rank,
    )
    return DataLoader(
        dataset,
        batch_size=config.batch_size_per_gpu,
        num_workers=config.num_workers,
        pin_memory=True,
        prefetch_factor=4,
    )