File size: 4,944 Bytes
eb05668
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
from tokenizers import Tokenizer, models, pre_tokenizers, trainers, processors
from tokenizers.implementations import ByteLevelBPETokenizer
import os
from utils import load_config, setup_logging
from glob import glob
from tqdm import tqdm
import json
import torch

class CustomTokenizer:
    """Wrapper around ByteLevelBPETokenizer with additional functionality."""
    def __init__(self, tokenizer):
        self.tokenizer = tokenizer
        self._vocab_size = len(tokenizer.get_vocab())
        self.pad_token_id = tokenizer.token_to_id("<|pad|>")
        self.eos_token_id = tokenizer.token_to_id("<|endoftext|>")
    
    def get_vocab_size(self):
        return self._vocab_size
    
    def batch_encode(self, texts, padding=True, truncation=True, max_length=None, return_tensors=None):
        encodings = self.tokenizer.encode_batch(texts)
        if max_length and truncation:
            encodings = [enc.ids[:max_length] for enc in encodings]
        if padding:
            max_len = max(len(enc.ids) for enc in encodings)
            padded = []
            for enc in encodings:
                pad_length = max_len - len(enc.ids)
                padded.append(enc.ids + [self.pad_token_id] * pad_length)
            encodings = padded
        if return_tensors == "pt":
            return {
                "input_ids": torch.tensor(encodings),
                "attention_mask": torch.ones_like(torch.tensor(encodings))
            }
        return {"input_ids": encodings}
    
    def decode(self, token_ids):
        """Decode a list of token IDs back to a string."""
        if isinstance(token_ids, torch.Tensor):
            token_ids = token_ids.tolist()
        
        # Filter out padding tokens
        token_ids = [t for t in token_ids if t != self.pad_token_id]
        
        # Use the underlying tokenizer's decode method
        return self.tokenizer.decode(token_ids)

def train_tokenizer(config):
    """Trains a custom BPE tokenizer using the tokenizers library."""
    setup_logging()

    model_path = config["tokenizer"]["model_path"]
    vocab_size = config["tokenizer"].get("vocab_size", 50000)
    min_frequency = config["tokenizer"].get("min_frequency", 2)
    
    # Create output directory if it doesn't exist
    os.makedirs(model_path, exist_ok=True)

    # Initialize a new tokenizer
    tokenizer = ByteLevelBPETokenizer()

    # Get all text files from the data directory
    data_files = glob(os.path.join("data/raw", "*.txt"))
    if not data_files:
        raise ValueError("No text files found in data/raw directory")

    print(f"Training tokenizer on {len(data_files)} files...")
    print(f"Target vocab size: {vocab_size}")
    print(f"Min frequency: {min_frequency}")

    # Train the tokenizer
    tokenizer.train(
        files=data_files,
        vocab_size=vocab_size,
        min_frequency=min_frequency,
        special_tokens=[
            "<|endoftext|>",  # End of text token
            "<|pad|>",        # Padding token
            "<|unk|>",        # Unknown token
            "<|mask|>"        # Mask token
        ]
    )

    # Save the tokenizer files
    tokenizer.save_model(model_path)
    
    # Save the tokenizer configuration
    tokenizer_config = {
        "vocab_size": vocab_size,
        "min_frequency": min_frequency,
        "model_type": "byte_level_bpe",
        "special_tokens": {
            "eos_token": "<|endoftext|>",
            "pad_token": "<|pad|>",
            "unk_token": "<|unk|>",
            "mask_token": "<|mask|>"
        }
    }
    
    with open(os.path.join(model_path, "tokenizer_config.json"), "w") as f:
        json.dump(tokenizer_config, f, indent=2)

    print(f"Tokenizer trained and saved to {model_path}")
    return tokenizer

def get_tokenizer(config):
    """Loads a trained tokenizer."""
    model_path = config["tokenizer"]["model_path"]
    
    if not os.path.exists(os.path.join(model_path, "vocab.json")):
        raise ValueError(f"No tokenizer found at {model_path}. Please train the tokenizer first.")
    
    base_tokenizer = ByteLevelBPETokenizer(
        os.path.join(model_path, "vocab.json"),
        os.path.join(model_path, "merges.txt")
    )
    
    # Add special tokens if they don't exist
    special_tokens = {
        "eos_token": "<|endoftext|>",
        "pad_token": "<|pad|>",
        "unk_token": "<|unk|>",
        "mask_token": "<|mask|>"
    }
    base_tokenizer.add_special_tokens(list(special_tokens.values()))
    
    # Create wrapped tokenizer
    tokenizer = CustomTokenizer(base_tokenizer)
    
    print(f"ByteLevelBPE tokenizer loaded successfully. Vocab size: {tokenizer.get_vocab_size()}")
    return tokenizer

if __name__ == "__main__":
    config = load_config()
    train_tokenizer(config)
    print("Tokenizer training complete.")