import sentencepiece as spm import os import json class MTPTokenizer: def __init__(self, model_path=None): self.sp = None self.model_path = model_path if model_path and os.path.exists(model_path): self.load(model_path) def train(self, corpus_path, vocab_size=4000, model_prefix='mtp_tokenizer'): texts = [] with open(corpus_path, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if not line: continue try: data = json.loads(line) if 'instruction' in data: texts.append(data['instruction']) if 'input' in data and data['input'].strip(): texts.append(data['input']) if 'output' in data: texts.append(data['output']) except: continue if not texts: raise ValueError("Corpus vacío") temp_file = 'temp_corpus.txt' with open(temp_file, 'w', encoding='utf-8') as f: f.write('\n'.join(texts)) total_chars = sum(len(text) for text in texts) min_vocab = 4000 max_vocab = max(min_vocab, int(total_chars * 0.15)) try: spm.SentencePieceTrainer.train( input=temp_file, model_prefix=model_prefix, vocab_size=max_vocab, model_type='bpe', pad_id=0, unk_id=1, bos_id=2, eos_id=3, character_coverage=1.0, normalization_rule_name='identity', num_threads=2, split_digits=True, max_sentencepiece_length=16 ) except RuntimeError as e: if "Vocabulary size too high" in str(e): import re match = re.search(r'value <= (\d+)', str(e)) if match: spm.SentencePieceTrainer.train( input=temp_file, model_prefix=model_prefix, vocab_size=int(match.group(1)), model_type='bpe', pad_id=0, unk_id=1, bos_id=2, eos_id=3, character_coverage=1.0, normalization_rule_name='identity', num_threads=2 ) os.remove(temp_file) self.model_path = f"{model_prefix}.model" self.load(self.model_path) print(f"✓ Tokenizer trained: {self.vocab_size()} tokens") def load(self, model_path): self.sp = spm.SentencePieceProcessor() self.sp.load(model_path) self.model_path = model_path def encode(self, text): if self.sp is None: raise ValueError("Tokenizer not loaded") return self.sp.encode_as_ids(text) def decode(self, ids): if self.sp is None: raise ValueError("Tokenizer not loaded") return self.sp.decode_ids(ids) def vocab_size(self): if self.sp is None: return 0 return self.sp.get_piece_size() def bos_id(self): return self.sp.bos_id() def eos_id(self): return self.sp.eos_id() def pad_id(self): return self.sp.pad_id() def unk_id(self): return self.sp.unk_id()