|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import tokenizers |
|
|
import sys |
|
|
import subprocess |
|
|
import logging |
|
|
import spacy |
|
|
import numpy as np |
|
|
from tokenizers.models import BPE |
|
|
from tokenizers.trainers import BpeTrainer |
|
|
from tokenizers.pre_tokenizers import Whitespace |
|
|
from tokenizers.normalizers import NFKC |
|
|
from transformers import PreTrainedTokenizerFast |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class SegmentationTokenizer: |
|
|
def __init__( |
|
|
self, |
|
|
vocab_size=32_768, |
|
|
min_frequency=2, |
|
|
max_length=1024 |
|
|
): |
|
|
self.max_length = max_length |
|
|
|
|
|
|
|
|
self.raw_tokenizer = tokenizers.Tokenizer( |
|
|
BPE(unk_token="[UNK]") |
|
|
) |
|
|
self.raw_tokenizer.normalizer = NFKC() |
|
|
self.raw_tokenizer.pre_tokenizer = Whitespace() |
|
|
|
|
|
self.trainer = BpeTrainer( |
|
|
vocab_size=vocab_size, |
|
|
min_frequency=min_frequency, |
|
|
special_tokens=["[PAD]", "[UNK]", "[CLS]", "[SEP]", "[MASK]"] |
|
|
) |
|
|
|
|
|
self._hf_tokenizer = None |
|
|
|
|
|
|
|
|
def build_iterator(self, dataset, batch_size=1024): |
|
|
batch = [] |
|
|
for item in dataset: |
|
|
batch.append("\n".join(item["text"]).replace("\n\n", "\n")) |
|
|
if len(batch) == batch_size: |
|
|
yield batch |
|
|
batch = [] |
|
|
if batch: |
|
|
yield batch |
|
|
|
|
|
def train_from_iterator(self, iterator): |
|
|
self.raw_tokenizer.train_from_iterator( |
|
|
iterator, trainer=self.trainer |
|
|
) |
|
|
|
|
|
|
|
|
def save(self, path): |
|
|
self.raw_tokenizer.save(path) |
|
|
|
|
|
def load(self, tokenizer_path): |
|
|
self._hf_tokenizer = PreTrainedTokenizerFast( |
|
|
tokenizer_file=tokenizer_path, |
|
|
unk_token="[UNK]", |
|
|
pad_token="[PAD]", |
|
|
cls_token="[CLS]", |
|
|
sep_token="[SEP]", |
|
|
mask_token="[MASK]" |
|
|
) |
|
|
return self |
|
|
|
|
|
|
|
|
def compute_unk_rate(self, corpus): |
|
|
unk_id = self._hf_tokenizer.convert_tokens_to_ids("[UNK]") |
|
|
|
|
|
total_tokens = 0 |
|
|
unk_tokens = 0 |
|
|
|
|
|
for text in corpus: |
|
|
enc = self._hf_tokenizer( |
|
|
text, |
|
|
add_special_tokens=False |
|
|
)["input_ids"] |
|
|
|
|
|
total_tokens += len(enc) |
|
|
unk_tokens += sum(1 for t in enc if t == unk_id) |
|
|
|
|
|
return unk_tokens / total_tokens if total_tokens > 0 else 0.0 |
|
|
|
|
|
def __call__( |
|
|
self, |
|
|
text, |
|
|
return_tensors="pt", |
|
|
padding=True, |
|
|
truncation=True |
|
|
): |
|
|
""" |
|
|
text: str or List[str] |
|
|
returns: dict with input_ids and attention_mask (torch.long) |
|
|
""" |
|
|
if self._hf_tokenizer is None: |
|
|
raise RuntimeError("Tokenizer not loaded. Call .load() first.") |
|
|
|
|
|
enc = self._hf_tokenizer( |
|
|
text, |
|
|
padding="max_length" if padding else False, |
|
|
truncation=truncation, |
|
|
max_length=self.max_length, |
|
|
return_tensors=return_tensors |
|
|
) |
|
|
|
|
|
return { |
|
|
"input_ids": enc["input_ids"], |
|
|
"attention_mask": enc["attention_mask"] |
|
|
} |
|
|
|
|
|
@property |
|
|
def vocab_size(self): |
|
|
if self._hf_tokenizer is None: |
|
|
raise RuntimeError("Tokenizer not loaded.") |
|
|
return self._hf_tokenizer.vocab_size |
|
|
|
|
|
def __repr__(self): |
|
|
return f"<SegmentationTokenizer vocab_size={self.trainer.vocab_size}>" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class SentenceSegmenter: |
|
|
def __init__( |
|
|
self, |
|
|
max_sentences: int, |
|
|
spacy_model: str = "es_core_news_sm", |
|
|
logger: logging.Logger | None = None |
|
|
): |
|
|
self.max_sentences = max_sentences |
|
|
self.logger = self._get_logger(logger) |
|
|
self.nlp = self.__build_model__(spacy_model, logger=self.logger) |
|
|
|
|
|
@staticmethod |
|
|
def __build_model__(sentence_tokenizer_model: str, logger: logging.Logger) -> spacy.language.Language: |
|
|
""" |
|
|
Download the pre-trained sentence tokenizer model. |
|
|
:param sentence_tokenizer_model: The sentence tokenizer model to download. |
|
|
:return: The spacy language model. |
|
|
""" |
|
|
try: |
|
|
spacy_model = spacy.load(sentence_tokenizer_model) |
|
|
except OSError: |
|
|
result = subprocess.run( |
|
|
[sys.executable, "-m", "spacy", "download", sentence_tokenizer_model], |
|
|
capture_output=True, |
|
|
text=True |
|
|
) |
|
|
|
|
|
if result.returncode != 0: |
|
|
logger.error(f'[BEAST-Tokenizer]: Loading {sentence_tokenizer_model} failed.') |
|
|
raise RuntimeError(f"[BEAST-Tokenizer]: Error while downloading '{sentence_tokenizer_model}'") |
|
|
|
|
|
spacy_model = spacy.load(sentence_tokenizer_model) |
|
|
logger.info('[BEAST-Tokenizer]: Successfully downloaded the pre-trained sentence tokenizer model.') |
|
|
|
|
|
if 'parser' not in spacy_model.pipe_names: |
|
|
logger.error(f'[BEAST-Tokenizer]: The SpaCy model needs a parser installed.') |
|
|
raise RuntimeError(f'[BEAST-Tokenizer]: The SpaCy model needs a parser installed.') |
|
|
else: |
|
|
spacy_model.add_pipe("newline_segmenter_keep_exact", before="parser") |
|
|
|
|
|
return spacy_model |
|
|
|
|
|
@staticmethod |
|
|
def _get_logger(logger): |
|
|
if logger is None: |
|
|
logger = logging.getLogger(__name__) |
|
|
logger.addHandler(logging.NullHandler()) |
|
|
return logger |
|
|
|
|
|
def __call__(self, texts: list[str]) -> dict: |
|
|
sentences = list() |
|
|
sentence_candidates = list() |
|
|
sentence_boundaries = list() |
|
|
sentence_masking = list() |
|
|
|
|
|
for article in texts: |
|
|
doc = self.nlp(article) |
|
|
for idx, sent in enumerate(doc.sents): |
|
|
|
|
|
if idx == 0: |
|
|
|
|
|
sentence_candidates.append(1) |
|
|
sentence_boundaries.append(1) |
|
|
elif sent.text.endswith("\n"): |
|
|
|
|
|
sentence_candidates.append(1) |
|
|
sentence_boundaries.append(0) |
|
|
else: |
|
|
sentence_candidates.append(0) |
|
|
sentence_boundaries.append(0) |
|
|
|
|
|
sentences.append(sent.text.replace('\n', '').strip()) |
|
|
sentence_masking.append(1) |
|
|
|
|
|
if len(sentences) >= self.max_sentences: |
|
|
self.logger.warning(f"Maximum number of sentences reached: {self.max_sentences}") |
|
|
break |
|
|
|
|
|
if len(sentences) >= self.max_sentences: |
|
|
break |
|
|
|
|
|
|
|
|
while len(sentences) < self.max_sentences: |
|
|
sentences.append("") |
|
|
sentence_candidates.append(0) |
|
|
sentence_boundaries.append(0) |
|
|
sentence_masking.append(0) |
|
|
|
|
|
return { |
|
|
"sentences": sentences, |
|
|
"sentence_candidates": np.array(sentence_candidates, dtype=np.int8), |
|
|
"sentence_boundaries": np.array(sentence_boundaries, dtype=np.int8), |
|
|
"sentence_mask": np.array(sentence_masking, dtype=np.int8) |
|
|
} |
|
|
|
|
|
|
|
|
@spacy.Language.component("newline_segmenter_keep_exact") |
|
|
def newline_segmenter_keep_exact(doc): |
|
|
for token in doc[:-1]: |
|
|
if token.text == "\n": |
|
|
doc[token.i + 1].is_sent_start = True |
|
|
return doc |
|
|
|
|
|
|
|
|
|
|
|
|