| """ |
| DF-Arc Tokenizer |
| Morphology-aware, dialect-inclusive tokenization for Arabic LLMs. |
| """ |
| import json |
| import os |
| import re |
| import unicodedata |
| from typing import List, Dict, Any, Optional, Tuple, Union |
|
|
| from transformers import PreTrainedTokenizerFast |
| from tokenizers import Tokenizer |
|
|
| class ArabicNormalizer: |
| """Normalizes Arabic text with configurable rules.""" |
| |
| DIACRITICS_PATTERN = re.compile(r'[\u064B-\u0652]') |
| TATWEEL_PATTERN = re.compile(r'\u0640') |
| ALEF_PATTERN = re.compile(r'[أإآ]') |
| YEH_PATTERN = re.compile(r'ى') |
| TEH_MARBUTA_PATTERN = re.compile(r'ة') |
| REPEATS_PATTERN = re.compile(r'(.)\1{2,}') |
| URL_PATTERN = re.compile(r'http\S+|www\S+|https\S+', re.MULTILINE) |
| EMAIL_PATTERN = re.compile(r'\S+@\S+') |
| WHITESPACE_PATTERN = re.compile(r'\s+') |
| |
| def __init__(self, |
| unify_alef: bool = True, |
| unify_yeh: bool = True, |
| unify_teh_marbuta: bool = True, |
| remove_diacritics: bool = True, |
| remove_tatweel: bool = True, |
| remove_repeats: bool = True): |
| self.unify_alef = unify_alef |
| self.unify_yeh = unify_yeh |
| self.unify_teh_marbuta = unify_teh_marbuta |
| self.remove_diacritics = remove_diacritics |
| self.remove_tatweel = remove_tatweel |
| self.remove_repeats = remove_repeats |
|
|
| def normalize(self, text: str) -> str: |
| if not text: |
| return "" |
| text = unicodedata.normalize("NFKC", text) |
| text = self.URL_PATTERN.sub('', text) |
| text = self.EMAIL_PATTERN.sub('', text) |
| if self.remove_diacritics: |
| text = self.DIACRITICS_PATTERN.sub('', text) |
| if self.remove_tatweel: |
| text = self.TATWEEL_PATTERN.sub('', text) |
| if self.unify_alef: |
| text = self.ALEF_PATTERN.sub('ا', text) |
| if self.unify_yeh: |
| text = self.YEH_PATTERN.sub('ي', text) |
| if self.unify_teh_marbuta: |
| text = self.TEH_MARBUTA_PATTERN.sub('ه', text) |
| if self.remove_repeats: |
| text = self.REPEATS_PATTERN.sub(r'\1', text) |
| text = self.WHITESPACE_PATTERN.sub(' ', text).strip() |
| return text |
|
|
| class MorphologicalPreTokenizer: |
| """ |
| Rule-based Arabic morphological pre-tokenizer. |
| Segments Arabic words into prefix-stem-suffix units. |
| """ |
| |
| PREFIXES = ['و', 'ف', 'ب', 'ك', 'ل', 'ال', 'س', 'وال', 'بال', 'كال', 'لل', 'فال'] |
| SUFFIXES = ['ني', 'نا', 'ك', 'كم', 'ه', 'ها', 'هم', 'هن', 'ي', 'ون', 'ين', 'ان', 'ت', 'وا', 'ة'] |
| |
| |
| DEFAULT_EXCEPTIONS = { |
| "الله", "محمد", "عبدالله", "عبدالرحمن", "مكة", "بغداد", "دمشق", "القاهرة", "بيروت", "عمان", |
| "الرياض", "جدة", "الكويت", "دبي", "أبوظبي", "المنامة", "الدوحة", "مسقط", "ليبيا", "تونس", |
| "الجزائر", "المغرب", "فلسطين", "الأردن", "لبنان", "سوريا", "العراق", "مصر", "السودان", "اليمن", |
| "أمريكا", "أوروبا", "آسيا", "أفريقيا", "ترامب", "بايدن", "جوجل", "فيسبوك", "أمازون", "مايكروسوفت", |
| "أبل", "سامسونج", "سوني", "هواوي", "مرسيدس", "بي إم دبليو", "تويوتا", "هوندا", "فورد", "شيفروليه", |
| "تسلا", "ناسا", "إيلون ماسك", "مارك زوكربيرج", "بيل جيتس", "ستيف جوبز", "ألبرت أينشتاين", |
| "إسحاق نيوتن", "داروين", "بيتهوفن", "موتزارت", "شكسبير", "دوستويفسكي", "تولستوي", "نجيب محفوظ", |
| "طه حسين", "العقاد", "المنفلوطي", "جبران خليل جبران", "محمود درويش", "نزار قباني" |
| } |
|
|
| def __init__(self, min_stem_length: int = 2, exceptions: Optional[List[str]] = None): |
| self.min_stem_length = min_stem_length |
| |
| user_exceptions = set(exceptions) if exceptions else set() |
| self.exceptions = frozenset(self.DEFAULT_EXCEPTIONS.union(user_exceptions)) |
| |
| self.prefixes = sorted(self.PREFIXES, key=len, reverse=True) |
| self.suffixes = sorted(self.SUFFIXES, key=len, reverse=True) |
| self.arabic_pattern = re.compile(r'[\u0600-\u06FF]+') |
| |
| def segment_word(self, word: str) -> List[str]: |
| if not word or not self.arabic_pattern.fullmatch(word): |
| return [word] |
| |
| if word in self.exceptions: |
| return [word] |
| |
| original = word |
| segments = [] |
| prefix = "" |
| for p in self.prefixes: |
| if word.startswith(p) and len(word) - len(p) >= self.min_stem_length: |
| prefix = p |
| word = word[len(p):] |
| break |
| |
| suffix = "" |
| for s in self.suffixes: |
| if word.endswith(s) and len(word) - len(s) >= self.min_stem_length: |
| suffix = s |
| word = word[:-len(s)] |
| break |
| |
| if prefix: segments.append(prefix) |
| segments.append(word) |
| if suffix: segments.append(suffix) |
| |
| if len(word) < self.min_stem_length: |
| return [original] |
| return segments |
| |
| def segment_text(self, text: str) -> str: |
| words = text.split() |
| segmented_words = [ |
| '_'.join(self.segment_word(word)) for word in words |
| ] |
| return ' '.join(segmented_words) |
|
|
| class PhraseMerger: |
| """Detects and merges common word n-grams.""" |
| |
| def __init__(self, phrases_file: Optional[str] = None): |
| self.phrase_vocab = {} |
| self.max_ngram = 3 |
| self.merge_char = "" |
| if phrases_file: |
| self.load_phrases(phrases_file) |
| |
| def load_phrases(self, path: str) -> None: |
| try: |
| with open(path, 'r', encoding='utf-8') as f: |
| loaded_vocab = json.load(f) |
| self.phrase_vocab = {} |
| for phrase_str, freq in loaded_vocab.items(): |
| ngram = tuple(phrase_str.split()) |
| self.phrase_vocab[ngram] = freq |
| self.max_ngram = max(self.max_ngram, len(ngram)) |
| except FileNotFoundError: |
| pass |
| |
| def merge_phrases(self, text: str) -> str: |
| if not self.phrase_vocab: |
| return text |
| |
| words = text.split() |
| result = [] |
| i = 0 |
| while i < len(words): |
| matched = False |
| for n in range(self.max_ngram, 1, -1): |
| if i + n <= len(words): |
| ngram = tuple(words[i:i+n]) |
| if ngram in self.phrase_vocab: |
| result.append(self.merge_char.join(ngram)) |
| i += n |
| matched = True |
| break |
| if not matched: |
| result.append(words[i]) |
| i += 1 |
| return ' '.join(result) |
|
|
| class DFArcTokenizer(PreTrainedTokenizerFast): |
| """ |
| DF-Arc: Morphology-aware Arabic Tokenizer. |
| Wrapper around PreTrainedTokenizerFast that applies custom normalization, |
| morphological segmentation, and phrase merging before tokenization. |
| """ |
| |
| def __init__( |
| self, |
| vocab_file: Optional[str] = None, |
| tokenizer_file: Optional[str] = None, |
| phrases_file: Optional[str] = None, |
| normalization_config: Optional[Dict[str, bool]] = None, |
| min_stem_length: int = 2, |
| exceptions_file: Optional[str] = None, |
| **kwargs |
| ): |
| self.normalizer_helper = ArabicNormalizer(**(normalization_config or {})) |
| |
| |
| user_exceptions = [] |
| if exceptions_file and os.path.exists(exceptions_file): |
| try: |
| with open(exceptions_file, 'r', encoding='utf-8') as f: |
| user_exceptions = [line.strip() for line in f if line.strip()] |
| except OSError: |
| |
| |
| pass |
| |
| self.morph_helper = MorphologicalPreTokenizer( |
| min_stem_length=min_stem_length, |
| exceptions=user_exceptions |
| ) |
| self.phrase_helper = PhraseMerger(phrases_file=phrases_file) |
| |
| super().__init__( |
| vocab_file=vocab_file, |
| tokenizer_file=tokenizer_file, |
| **kwargs |
| ) |
|
|
| def _batch_encode_plus(self, batch_text_or_text_pairs: Union[str, List[str], List[Tuple[str, str]]], *args, **kwargs): |
| def preprocess(text: str) -> str: |
| if not text: |
| return "" |
| t = self.normalizer_helper.normalize(text) |
| t = self.morph_helper.segment_text(t) |
| t = self.phrase_helper.merge_phrases(t) |
| return t |
|
|
| if isinstance(batch_text_or_text_pairs, str): |
| batch_text_or_text_pairs = preprocess(batch_text_or_text_pairs) |
| elif isinstance(batch_text_or_text_pairs, (list, tuple)): |
| processed = [] |
| for item in batch_text_or_text_pairs: |
| if isinstance(item, str): |
| processed.append(preprocess(item)) |
| elif isinstance(item, (list, tuple)): |
| processed.append((preprocess(item[0]), preprocess(item[1]))) |
| else: |
| processed.append(item) |
| batch_text_or_text_pairs = processed |
| |
| return super()._batch_encode_plus(batch_text_or_text_pairs, *args, **kwargs) |
|
|
| def encode(self, text, *args, **kwargs): |
| if isinstance(text, str): |
| text = self.normalizer_helper.normalize(text) |
| text = self.morph_helper.segment_text(text) |
| text = self.phrase_helper.merge_phrases(text) |
| return super().encode(text, *args, **kwargs) |
|
|
| def decode(self, token_ids, skip_special_tokens=False, clean_up_tokenization_spaces=None, **kwargs): |
| """ |
| Override decode to force use of convert_tokens_to_string for readable output. |
| """ |
| |
| if isinstance(token_ids, int): |
| token_ids = [token_ids] |
| |
| |
| tokens = self.convert_ids_to_tokens(token_ids, skip_special_tokens=skip_special_tokens) |
| |
| |
| return self.convert_tokens_to_string(tokens) |
|
|
| def convert_tokens_to_string(self, tokens: List[str]) -> str: |
| """Converts a sequence of tokens into a single string.""" |
| text = " ".join(tokens) |
| |
| |
| |
| |
| arabic_range = r'[\u0600-\u06FF]' |
| return re.sub(rf'(?<={arabic_range})_|_(?={arabic_range})', '', text) |
|
|
|
|
|
|
|
|