File size: 4,680 Bytes
563bb6a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 | import sentencepiece as spm
import os
import json
class MTPTokenizer:
"""Tokenizer using SentencePiece BPE - Optimizado para formato instruction-response"""
def __init__(self, model_path=None):
self.sp = None
self.model_path = model_path
if model_path and os.path.exists(model_path):
self.load(model_path)
def train(self, corpus_path, vocab_size=4000, model_prefix='mtp_tokenizer'):
"""Train SentencePiece BPE tokenizer en corpus con formato JSONL"""
# Extraer texto de corpus JSONL
texts = []
print(f" → Procesando corpus para entrenar tokenizer...")
with open(corpus_path, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
data = json.loads(line)
# Agregar todos los campos de texto disponibles
if 'instruction' in data:
texts.append(data['instruction'].strip())
if 'context' in data and data['context'].strip():
texts.append(data['context'].strip())
if 'response' in data:
texts.append(data['response'].strip())
except json.JSONDecodeError:
continue
# Filtrar textos vacíos
texts = [t for t in texts if t and t.strip()]
if not texts:
raise ValueError("No se encontraron textos válidos en el corpus")
# Guardar archivo temporal
temp_file = 'temp_corpus.txt'
with open(temp_file, 'w', encoding='utf-8') as f:
for text in texts:
f.write(text + '\n')
# Estadísticas
total_chars = sum(len(text) for text in texts)
max_vocab = min(vocab_size, max(256, int(total_chars * 0.15))) # Heurística mejorada
print(f" → Corpus stats: {len(texts)} textos, {total_chars} caracteres")
print(f" → Vocabulario ajustado: {max_vocab} (solicitado: {vocab_size})")
# Parámetros optimizados para Q&A
spm.SentencePieceTrainer.train(
input=temp_file,
model_prefix=model_prefix,
vocab_size=max_vocab,
model_type='bpe',
pad_id=0,
unk_id=1,
bos_id=2,
eos_id=3,
character_coverage=1.0,
normalization_rule_name='identity', # No normalizar para mantener formato
num_threads=4,
split_digits=True,
allow_whitespace_only_pieces=False,
byte_fallback=False,
max_sentencepiece_length=16,
add_dummy_prefix=False, # Importante para mantener inicio de textos
remove_extra_whitespaces=False # Mantener formato exacto
)
# Limpiar
os.remove(temp_file)
# Cargar modelo entrenado
self.model_path = f"{model_prefix}.model"
self.load(self.model_path)
print(f"✓ Tokenizer entrenado: {self.vocab_size()} tokens")
print(f"✓ Modelo guardado: {self.model_path}")
def load(self, model_path):
"""Load trained tokenizer"""
self.sp = spm.SentencePieceProcessor()
self.sp.load(model_path)
self.model_path = model_path
def encode(self, text):
"""Encode text to token IDs"""
if self.sp is None:
raise ValueError("Tokenizer not loaded. Train or load a model first.")
return self.sp.encode_as_ids(text)
def decode(self, ids):
"""Decode token IDs to text"""
if self.sp is None:
raise ValueError("Tokenizer not loaded. Train or load a model first.")
return self.sp.decode_ids(ids)
def vocab_size(self):
"""Get vocabulary size"""
if self.sp is None:
return 0
return self.sp.get_piece_size()
def bos_id(self):
"""Beginning of sentence token ID"""
return self.sp.bos_id()
def eos_id(self):
"""End of sentence token ID"""
return self.sp.eos_id()
def pad_id(self):
"""Padding token ID"""
return self.sp.pad_id()
def unk_id(self):
"""Unknown token ID"""
return self.sp.unk_id() |