File size: 4,798 Bytes
77d62e8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import sentencepiece as spm
import os
import json


class MTPTokenizer:
    """Tokenizer using SentencePiece BPE"""
    
    def __init__(self, model_path=None):
        self.sp = None
        self.model_path = model_path
        
        if model_path and os.path.exists(model_path):
            self.load(model_path)
    
    def train(self, corpus_path, vocab_size=4000, model_prefix='mtp_tokenizer'):
        """Train SentencePiece BPE tokenizer on corpus"""
        
        # Extract text from JSONL corpus
        texts = []
        with open(corpus_path, 'r', encoding='utf-8') as f:
            for line in f:
                data = json.loads(line)
                if 'instruction' in data:
                    texts.append(data['instruction'])
                if 'response' in data:
                    texts.append(data['response'])
        
        # Save temporary text file
        temp_file = 'temp_corpus.txt'
        with open(temp_file, 'w', encoding='utf-8') as f:
            f.write('\n'.join(texts))
        
        # Calculate optimal vocab size based on corpus
        total_chars = sum(len(text) for text in texts)
        max_vocab = min(vocab_size, int(total_chars * 0.15))  # Heuristic: ~15% of chars
        
        print(f"   → Corpus stats: {len(texts)} texts, {total_chars} characters")
        print(f"   → Adjusted vocab size: {max_vocab} (requested: {vocab_size})")
        
        # Train SentencePiece with adjusted parameters
        try:
            spm.SentencePieceTrainer.train(
                input=temp_file,
                model_prefix=model_prefix,
                vocab_size=max_vocab,
                model_type='bpe',
                pad_id=0,
                unk_id=1,
                bos_id=2,
                eos_id=3,
                character_coverage=1.0,
                normalization_rule_name='identity',
                num_threads=4,
                split_digits=True,
                allow_whitespace_only_pieces=False,
                byte_fallback=False,
                max_sentencepiece_length=16
            )
        except RuntimeError as e:
            if "Vocabulary size too high" in str(e):
                # Extract suggested max from error and retry
                import re
                match = re.search(r'value <= (\d+)', str(e))
                if match:
                    suggested_max = int(match.group(1))
                    print(f"   → Retrying with vocab size: {suggested_max}")
                    spm.SentencePieceTrainer.train(
                        input=temp_file,
                        model_prefix=model_prefix,
                        vocab_size=suggested_max,
                        model_type='bpe',
                        pad_id=0,
                        unk_id=1,
                        bos_id=2,
                        eos_id=3,
                        character_coverage=1.0,
                        normalization_rule_name='identity',
                        num_threads=4,
                        split_digits=True,
                        allow_whitespace_only_pieces=False,
                        byte_fallback=False,
                        max_sentencepiece_length=16
                    )
                else:
                    raise
            else:
                raise
        
        # Clean up
        os.remove(temp_file)
        
        # Load the trained model
        self.model_path = f"{model_prefix}.model"
        self.load(self.model_path)
        
        print(f"✓ Tokenizer trained: {self.vocab_size()} tokens")
        print(f"✓ Model saved: {self.model_path}")
    
    def load(self, model_path):
        """Load trained tokenizer"""
        self.sp = spm.SentencePieceProcessor()
        self.sp.load(model_path)
        self.model_path = model_path
    
    def encode(self, text):
        """Encode text to token IDs"""
        if self.sp is None:
            raise ValueError("Tokenizer not loaded. Train or load a model first.")
        return self.sp.encode_as_ids(text)
    
    def decode(self, ids):
        """Decode token IDs to text"""
        if self.sp is None:
            raise ValueError("Tokenizer not loaded. Train or load a model first.")
        return self.sp.decode_ids(ids)
    
    def vocab_size(self):
        """Get vocabulary size"""
        if self.sp is None:
            return 0
        return self.sp.get_piece_size()
    
    def bos_id(self):
        """Beginning of sentence token ID"""
        return self.sp.bos_id()
    
    def eos_id(self):
        """End of sentence token ID"""
        return self.sp.eos_id()
    
    def pad_id(self):
        """Padding token ID"""
        return self.sp.pad_id()
    
    def unk_id(self):
        """Unknown token ID"""
        return self.sp.unk_id()