| import sentencepiece as spm | |
| import os | |
| import json | |
| class MTPTokenizer: | |
| def __init__(self, model_path=None): | |
| self.sp = None | |
| self.model_path = model_path | |
| if model_path and os.path.exists(model_path): | |
| self.load(model_path) | |
| def train(self, corpus_path, vocab_size=4000, model_prefix='mtp_tokenizer'): | |
| texts = [] | |
| with open(corpus_path, 'r', encoding='utf-8') as f: | |
| for line in f: | |
| line = line.strip() | |
| if not line: continue | |
| try: | |
| data = json.loads(line) | |
| if 'instruction' in data: | |
| texts.append(data['instruction']) | |
| if 'input' in data and data['input'].strip(): | |
| texts.append(data['input']) | |
| if 'output' in data: | |
| texts.append(data['output']) | |
| except: continue | |
| if not texts: raise ValueError("Corpus vacío") | |
| temp_file = 'temp_corpus.txt' | |
| with open(temp_file, 'w', encoding='utf-8') as f: | |
| f.write('\n'.join(texts)) | |
| total_chars = sum(len(text) for text in texts) | |
| min_vocab = 4000 | |
| max_vocab = max(min_vocab, int(total_chars * 0.15)) | |
| try: | |
| spm.SentencePieceTrainer.train( | |
| input=temp_file, | |
| model_prefix=model_prefix, | |
| vocab_size=max_vocab, | |
| model_type='bpe', | |
| pad_id=0, unk_id=1, bos_id=2, eos_id=3, | |
| character_coverage=1.0, | |
| normalization_rule_name='identity', | |
| num_threads=2, | |
| split_digits=True, | |
| max_sentencepiece_length=16 | |
| ) | |
| except RuntimeError as e: | |
| if "Vocabulary size too high" in str(e): | |
| import re | |
| match = re.search(r'value <= (\d+)', str(e)) | |
| if match: | |
| spm.SentencePieceTrainer.train( | |
| input=temp_file, | |
| model_prefix=model_prefix, | |
| vocab_size=int(match.group(1)), | |
| model_type='bpe', | |
| pad_id=0, unk_id=1, bos_id=2, eos_id=3, | |
| character_coverage=1.0, | |
| normalization_rule_name='identity', | |
| num_threads=2 | |
| ) | |
| os.remove(temp_file) | |
| self.model_path = f"{model_prefix}.model" | |
| self.load(self.model_path) | |
| print(f"✓ Tokenizer trained: {self.vocab_size()} tokens") | |
| def load(self, model_path): | |
| self.sp = spm.SentencePieceProcessor() | |
| self.sp.load(model_path) | |
| self.model_path = model_path | |
| def encode(self, text): | |
| if self.sp is None: raise ValueError("Tokenizer not loaded") | |
| return self.sp.encode_as_ids(text) | |
| def decode(self, ids): | |
| if self.sp is None: raise ValueError("Tokenizer not loaded") | |
| return self.sp.decode_ids(ids) | |
| def vocab_size(self): | |
| if self.sp is None: return 0 | |
| return self.sp.get_piece_size() | |
| def bos_id(self): return self.sp.bos_id() | |
| def eos_id(self): return self.sp.eos_id() | |
| def pad_id(self): return self.sp.pad_id() | |
| def unk_id(self): return self.sp.unk_id() | |