Spaces:
Runtime error
Runtime error
| import random | |
| import re | |
| from typing import List, Union | |
| from interfaces import IProcess | |
| from helpers import get_freq_dict, load_text_file, remove_long_spaces | |
| from transformers import AutoTokenizer | |
| class LoadFile(IProcess): | |
| def execute(self, file_path: str): | |
| return load_text_file( | |
| file_path | |
| ) | |
| class LinesSplitter(IProcess): | |
| def __init__(self, sep: str) -> None: | |
| super().__init__() | |
| self.sep = sep | |
| def split(self, line): | |
| return line.split(self.sep) | |
| def execute(self, data: Union[List[str], str]) -> List[str]: | |
| if isinstance(data, str): | |
| return data.split(self.sep) | |
| results = [] | |
| for lines in map(self.split, data): | |
| results.extend(lines) | |
| return results | |
| class LengthFilter(IProcess): | |
| def __init__( | |
| self, min_length: int, max_length: int | |
| ) -> None: | |
| super().__init__() | |
| self.min_length = min_length | |
| self.max_length = max_length | |
| def execute(self, lines: List[str]): | |
| return list(filter( | |
| lambda x: self.min_length <= len(x) <= self.max_length, lines | |
| )) | |
| class WordsNumberFilter(IProcess): | |
| def __init__(self, min_words: int, max_words: int) -> None: | |
| super().__init__() | |
| self.min_words = min_words | |
| self.max_words = max_words | |
| def _is_valid(self, line: str) -> bool: | |
| return self.min_words < line.count(' ') < self.max_words | |
| def execute(self, lines: List[str]): | |
| return list(filter(self._is_valid, lines)) | |
| class TokenizerLengthFilter(IProcess): | |
| def __init__(self, max_length: int = 1024) -> None: | |
| super().__init__() | |
| self.max_length = max_length | |
| self.tokenizer = AutoTokenizer.from_pretrained("./tokenizer") | |
| def _is_valid(self, line: str) -> bool: | |
| data = self.tokenizer.batch_encode_plus([line], max_length=self.max_length, truncation=True,return_overflowing_tokens=True ) | |
| if len(data["input_ids"]) > 1: | |
| return True | |
| else: | |
| return False | |
| def execute(self, lines: List[str]): | |
| return list(filter(self._is_valid, lines)) | |
| class WordsFilter(IProcess): | |
| def __init__(self, words: List[str]) -> None: | |
| super().__init__() | |
| self.words = set(words) | |
| def _not_contain(self, line: str) -> bool: | |
| return not any(( | |
| word in line for word in self.words | |
| )) | |
| def execute(self, lines: List[str]): | |
| return list(filter(self._not_contain, lines)) | |
| class SoloCharFilter(IProcess): | |
| def _not_contain(self, line: str) -> bool: | |
| return re.search('^. | . | .$', line) is None | |
| def execute(self, lines: List[str]): | |
| return list(filter(self._not_contain, lines)) | |
| class NumbersFilter(IProcess): | |
| def _not_contain(self, line: str) -> bool: | |
| return re.search('[0-9]+', line) is None | |
| def execute(self, lines: List[str]): | |
| return list(filter(self._not_contain, lines)) | |
| class OOVFilter(IProcess): | |
| def __init__(self, max_oov: int) -> None: | |
| super().__init__() | |
| self.max_oov = max_oov | |
| self.__freq = {} | |
| def _is_valid(self, line: str): | |
| counter = 0 | |
| for word in line.split(' '): | |
| counter += (self.__freq[word] == 1) | |
| return counter < self.max_oov | |
| def execute(self, lines: List[str]): | |
| self.__freq = get_freq_dict(lines) | |
| return list(filter(self._is_valid, lines)) | |
| # text = ["کوردستان وڵاتی کوردانە هەی هەی هەی هەی", "کورد بوون گەوادیە", "ژیان سەختە"] | |
| # result = OOVFilter(5).execute(text) | |
| # print(result) | |
| class CharsRemover(IProcess): | |
| def __init__(self, chars: str) -> None: | |
| super().__init__() | |
| self.pat = f'[{chars}]' | |
| def remove(self, line: str) -> str: | |
| return re.sub(self.pat, '', line) | |
| def execute(self, lines: List[str]) -> List[str]: | |
| return list(map(self.remove, lines)) | |
| class RepeatedCharsCollapsor(IProcess): | |
| def __init__(self, max_repeteion: int) -> None: | |
| super().__init__() | |
| self.pat = r"(.)\1{}".format(f"{{{2},}}") | |
| def collaps(self, line: str) -> str: | |
| return re.sub(self.pat, r"\1" * 1, line) | |
| def execute(self, lines: List[str]) -> List[str]: | |
| return list(map(self.collaps, lines)) | |
| class ValidCharsKeeper(IProcess): | |
| def __init__(self, valid_chars: str, rep_with=' ') -> None: | |
| super().__init__() | |
| self.valid_chars = valid_chars | |
| self.rep_with = rep_with | |
| self.pat = f'[^{self.valid_chars}]' | |
| def __keep(self, line: str) -> str: | |
| return re.sub(self.pat, ' ', line) | |
| def execute(self, lines: List[str]) -> List[str]: | |
| return list(map(self.__keep, lines)) | |
| class SpacesRemover(IProcess): | |
| def __remove(self, line: str) -> str: | |
| return remove_long_spaces(line).strip() | |
| def execute(self, lines: List[str]): | |
| return list(map(self.__remove, lines)) | |
| class RandomCharsInjector(IProcess): | |
| def __init__(self, chars: str) -> None: | |
| super().__init__() | |
| self.chars = chars | |
| def get_char(self) -> str: | |
| return random.choice(self.chars) | |
| def execute(self, line: str): | |
| length = len(line) | |
| idx = random.randint(0, length - 1) | |
| return line[:idx] + self.get_char() + line[idx:] | |
| class PunctuationRemover(IProcess): | |
| def __init__(self) -> None: | |
| super().__init__() | |
| self.clean_punctuation = re.compile(r"(?<!\d)[.,;:'?!،.؟؛:»«](?!\d)") | |
| def __remove_punctuation(self, text: str): | |
| """Remove all punctuation from string, except if it's between digits""" | |
| return self.clean_punctuation.sub("", text) | |
| def execute(self, line: str): | |
| return self.__remove_punctuation(line) | |
| class RandomCharsSwapper(IProcess): | |
| def execute(self, line: str) -> str: | |
| length = len(line) | |
| idx = random.randint(0, length - 2) | |
| return line[:idx] + line[idx + 1] + line[idx] + line[idx + 2:] | |
| class RandomCharRemover(IProcess): | |
| def execute(self, line: str) -> str: | |
| length = len(line) | |
| idx = random.randint(0, length - 1) | |
| return line[:idx] + line[idx + 1:] | |
| class RandomWordsCollapsor(IProcess): | |
| def execute(self, line: str) -> str: | |
| indices = [ | |
| i for i, char in enumerate(line) | |
| if char == ' ' | |
| ] | |
| if len(indices) == 0: | |
| return line | |
| idx = random.choice(indices) | |
| return line[: idx] + line[idx + 1:] | |
| class RandomNeighborReplacer(IProcess): | |
| def __init__(self, keyboard_rows: List[str], blank: str) -> None: | |
| super().__init__() | |
| self.lines = keyboard_rows | |
| self.blank = blank | |
| self.n_rows = len(keyboard_rows) | |
| self._mapper = {} | |
| self.set_mapper() | |
| def __get_left( | |
| self, row_idx: int, col_idx: int | |
| ) -> List[str]: | |
| if col_idx == 0: | |
| return [] | |
| return [self.lines[row_idx][col_idx - 1]] | |
| def __get_right( | |
| self, row_idx: int, col_idx: int | |
| ) -> List[str]: | |
| if col_idx == (len(self.lines[row_idx]) - 1): | |
| return [] | |
| return self.lines[row_idx][col_idx + 1] | |
| def __get_upper( | |
| self, row_idx: int, col_idx: int | |
| ) -> List[str]: | |
| if row_idx == 0: | |
| return [] | |
| line = self.lines[row_idx - 1] | |
| start = max(0, col_idx - 1) | |
| end = min(len(line), col_idx + 2) | |
| return list(line[start: end]) | |
| def __get_lower( | |
| self, row_idx: int, col_idx: int | |
| ) -> List[str]: | |
| if row_idx == (self.n_rows - 1): | |
| return [] | |
| line = self.lines[row_idx + 1] | |
| start = max(0, col_idx - 1) | |
| end = min(len(line), col_idx + 2) | |
| return list(line[start: end]) | |
| def set_mapper(self) -> None: | |
| funcs = [ | |
| self.__get_left, | |
| self.__get_right, | |
| self.__get_upper, | |
| self.__get_lower | |
| ] | |
| for row_idx in range(self.n_rows): | |
| for col_idx in range(len(self.lines[row_idx])): | |
| items = [] | |
| for func in funcs: | |
| items.extend(func(row_idx, col_idx)) | |
| items = list( | |
| filter(lambda x: x != self.blank, items) | |
| ) | |
| char = self.lines[row_idx][col_idx] | |
| self._mapper[char] = items.copy() | |
| def get_char(self, char: str) -> str: | |
| if char not in self._mapper: | |
| return char | |
| return random.choice(self._mapper[char]) | |
| def execute(self, line: str) -> str: | |
| length = len(line) | |
| idx = random.randint(0, length - 1) | |
| return line[:idx] + self.get_char(line[idx]) + line[idx + 1:] | |
| class CharsNormalizer(IProcess): | |
| def __init__(self, mapper: dict) -> None: | |
| super().__init__() | |
| self.mapper = mapper | |
| def _normalize(self, line: str) -> str: | |
| for key, value in self.mapper.items(): | |
| line = line.replace(key, value) | |
| return line | |
| def execute(self, lines: List[str]): | |
| return list(filter(self._normalize, lines)) | |
| class SentencePermutation(IProcess): | |
| def __init__(self, sentences: List[str], augmentation_probability: float = 1) -> None: | |
| super().__init__() | |
| self.sentences = sentences | |
| self.augmentation_probability = augmentation_probability | |
| def _combine(self, text: str) -> str: | |
| if random.random() < self.augmentation_probability: | |
| sentences_to_sample = random.randint(0,10) | |
| augmentation_sentences = random.sample(self.sentences, sentences_to_sample) | |
| return text + " " + " ".join(augmentation_sentences) | |
| else: | |
| return text | |
| def execute(self, line: str) -> str: | |
| # return [self._combine(line) for line in lines] | |
| return self._combine(line) | |