Arabic
arabic
tokenizer
morphology
nlp
dialect
df-arc / tokenization_df_arc.py
fr3on's picture
Release v1.1: PMI Phrase Merging & Smart Morphology
4730d8d verified
raw
history blame
11.5 kB
"""
DF-Arc Tokenizer
Morphology-aware, dialect-inclusive tokenization for Arabic LLMs.
"""
import json
import os
import re
import unicodedata
from typing import List, Dict, Any, Optional, Tuple, Union
from transformers import PreTrainedTokenizerFast
from tokenizers import Tokenizer
class ArabicNormalizer:
"""Normalizes Arabic text with configurable rules."""
DIACRITICS_PATTERN = re.compile(r'[\u064B-\u0652]')
TATWEEL_PATTERN = re.compile(r'\u0640')
ALEF_PATTERN = re.compile(r'[أإآ]')
YEH_PATTERN = re.compile(r'ى')
TEH_MARBUTA_PATTERN = re.compile(r'ة')
REPEATS_PATTERN = re.compile(r'(.)\1{2,}')
URL_PATTERN = re.compile(r'http\S+|www\S+|https\S+', re.MULTILINE)
EMAIL_PATTERN = re.compile(r'\S+@\S+')
WHITESPACE_PATTERN = re.compile(r'\s+')
def __init__(self,
unify_alef: bool = True,
unify_yeh: bool = True,
unify_teh_marbuta: bool = True,
remove_diacritics: bool = True,
remove_tatweel: bool = True,
remove_repeats: bool = True):
self.unify_alef = unify_alef
self.unify_yeh = unify_yeh
self.unify_teh_marbuta = unify_teh_marbuta
self.remove_diacritics = remove_diacritics
self.remove_tatweel = remove_tatweel
self.remove_repeats = remove_repeats
def normalize(self, text: str) -> str:
if not text:
return ""
text = unicodedata.normalize("NFKC", text)
text = self.URL_PATTERN.sub('', text)
text = self.EMAIL_PATTERN.sub('', text)
if self.remove_diacritics:
text = self.DIACRITICS_PATTERN.sub('', text)
if self.remove_tatweel:
text = self.TATWEEL_PATTERN.sub('', text)
if self.unify_alef:
text = self.ALEF_PATTERN.sub('ا', text)
if self.unify_yeh:
text = self.YEH_PATTERN.sub('ي', text)
if self.unify_teh_marbuta:
text = self.TEH_MARBUTA_PATTERN.sub('ه', text)
if self.remove_repeats:
text = self.REPEATS_PATTERN.sub(r'\1', text)
text = self.WHITESPACE_PATTERN.sub(' ', text).strip()
return text
class MorphologicalPreTokenizer:
"""
Rule-based Arabic morphological pre-tokenizer.
Segments Arabic words into prefix-stem-suffix units.
"""
PREFIXES = ['و', 'ف', 'ب', 'ك', 'ل', 'ال', 'س', 'وال', 'بال', 'كال', 'لل', 'فال']
SUFFIXES = ['ني', 'نا', 'ك', 'كم', 'ه', 'ها', 'هم', 'هن', 'ي', 'ون', 'ين', 'ان', 'ت', 'وا', 'ة']
# Common entities/words to protect from segmentation (embedded fallback)
DEFAULT_EXCEPTIONS = {
"الله", "محمد", "عبدالله", "عبدالرحمن", "مكة", "بغداد", "دمشق", "القاهرة", "بيروت", "عمان",
"الرياض", "جدة", "الكويت", "دبي", "أبوظبي", "المنامة", "الدوحة", "مسقط", "ليبيا", "تونس",
"الجزائر", "المغرب", "فلسطين", "الأردن", "لبنان", "سوريا", "العراق", "مصر", "السودان", "اليمن",
"أمريكا", "أوروبا", "آسيا", "أفريقيا", "ترامب", "بايدن", "جوجل", "فيسبوك", "أمازون", "مايكروسوفت",
"أبل", "سامسونج", "سوني", "هواوي", "مرسيدس", "بي إم دبليو", "تويوتا", "هوندا", "فورد", "شيفروليه",
"تسلا", "ناسا", "إيلون ماسك", "مارك زوكربيرج", "بيل جيتس", "ستيف جوبز", "ألبرت أينشتاين",
"إسحاق نيوتن", "داروين", "بيتهوفن", "موتزارت", "شكسبير", "دوستويفسكي", "تولستوي", "نجيب محفوظ",
"طه حسين", "العقاد", "المنفلوطي", "جبران خليل جبران", "محمود درويش", "نزار قباني"
}
def __init__(self, min_stem_length: int = 2, exceptions: Optional[List[str]] = None):
self.min_stem_length = min_stem_length
# Merge user exceptions with defaults using frozenset for immutability and O(1) lookups
user_exceptions = set(exceptions) if exceptions else set()
self.exceptions = frozenset(self.DEFAULT_EXCEPTIONS.union(user_exceptions))
self.prefixes = sorted(self.PREFIXES, key=len, reverse=True)
self.suffixes = sorted(self.SUFFIXES, key=len, reverse=True)
self.arabic_pattern = re.compile(r'[\u0600-\u06FF]+')
def segment_word(self, word: str) -> List[str]:
if not word or not self.arabic_pattern.fullmatch(word):
return [word]
if word in self.exceptions:
return [word]
original = word
segments = []
prefix = ""
for p in self.prefixes:
if word.startswith(p) and len(word) - len(p) >= self.min_stem_length:
prefix = p
word = word[len(p):]
break
suffix = ""
for s in self.suffixes:
if word.endswith(s) and len(word) - len(s) >= self.min_stem_length:
suffix = s
word = word[:-len(s)]
break
if prefix: segments.append(prefix)
segments.append(word)
if suffix: segments.append(suffix)
if len(word) < self.min_stem_length:
return [original]
return segments
def segment_text(self, text: str) -> str:
words = text.split()
segmented_words = [
'_'.join(self.segment_word(word)) for word in words
]
return ' '.join(segmented_words)
class PhraseMerger:
"""Detects and merges common word n-grams."""
def __init__(self, phrases_file: Optional[str] = None):
self.phrase_vocab = {}
self.max_ngram = 3
self.merge_char = ""
if phrases_file:
self.load_phrases(phrases_file)
def load_phrases(self, path: str) -> None:
try:
with open(path, 'r', encoding='utf-8') as f:
loaded_vocab = json.load(f)
self.phrase_vocab = {}
for phrase_str, freq in loaded_vocab.items():
ngram = tuple(phrase_str.split())
self.phrase_vocab[ngram] = freq
self.max_ngram = max(self.max_ngram, len(ngram))
except FileNotFoundError:
pass
def merge_phrases(self, text: str) -> str:
if not self.phrase_vocab:
return text
words = text.split()
result = []
i = 0
while i < len(words):
matched = False
for n in range(self.max_ngram, 1, -1):
if i + n <= len(words):
ngram = tuple(words[i:i+n])
if ngram in self.phrase_vocab:
result.append(self.merge_char.join(ngram))
i += n
matched = True
break
if not matched:
result.append(words[i])
i += 1
return ' '.join(result)
class DFArcTokenizer(PreTrainedTokenizerFast):
"""
DF-Arc: Morphology-aware Arabic Tokenizer.
Wrapper around PreTrainedTokenizerFast that applies custom normalization,
morphological segmentation, and phrase merging before tokenization.
"""
vocab_files_names = {
"vocab_file": "tokenizer.json",
"tokenizer_file": "tokenizer.json",
"phrases_file": "phrase_vocab.json"
}
def __init__(
self,
vocab_file: Optional[str] = None,
tokenizer_file: Optional[str] = None,
phrases_file: Optional[str] = None,
normalization_config: Optional[Dict[str, bool]] = None,
min_stem_length: int = 2,
exceptions_file: Optional[str] = None,
**kwargs
):
self.normalizer_helper = ArabicNormalizer(**(normalization_config or {}))
# Load user-provided exceptions if file exists
user_exceptions = []
if exceptions_file and os.path.exists(exceptions_file):
try:
with open(exceptions_file, 'r', encoding='utf-8') as f:
user_exceptions = [line.strip() for line in f if line.strip()]
except OSError:
# If file read fails, we just won't have custom exceptions
# The MorphologicalPreTokenizer has embedded defaults now.
pass
self.morph_helper = MorphologicalPreTokenizer(
min_stem_length=min_stem_length,
exceptions=user_exceptions
)
self.phrase_helper = PhraseMerger(phrases_file=phrases_file)
super().__init__(
vocab_file=vocab_file,
tokenizer_file=tokenizer_file,
**kwargs
)
def _batch_encode_plus(self, batch_text_or_text_pairs: Union[str, List[str], List[Tuple[str, str]]], *args, **kwargs):
def preprocess(text: str) -> str:
if not text:
return ""
t = self.normalizer_helper.normalize(text)
t = self.morph_helper.segment_text(t)
t = self.phrase_helper.merge_phrases(t)
return t
if isinstance(batch_text_or_text_pairs, str):
batch_text_or_text_pairs = preprocess(batch_text_or_text_pairs)
elif isinstance(batch_text_or_text_pairs, (list, tuple)):
processed = []
for item in batch_text_or_text_pairs:
if isinstance(item, str):
processed.append(preprocess(item))
elif isinstance(item, (list, tuple)):
processed.append((preprocess(item[0]), preprocess(item[1])))
else:
processed.append(item)
batch_text_or_text_pairs = processed
return super()._batch_encode_plus(batch_text_or_text_pairs, *args, **kwargs)
def encode(self, text, *args, **kwargs):
if isinstance(text, str):
text = self.normalizer_helper.normalize(text)
text = self.morph_helper.segment_text(text)
text = self.phrase_helper.merge_phrases(text)
return super().encode(text, *args, **kwargs)
def decode(self, token_ids, skip_special_tokens=False, clean_up_tokenization_spaces=None, **kwargs):
"""
Override decode to force use of convert_tokens_to_string for readable output.
"""
# Ensure token_ids is a list of ints
if isinstance(token_ids, int):
token_ids = [token_ids]
# Convert to tokens
tokens = self.convert_ids_to_tokens(token_ids, skip_special_tokens=skip_special_tokens)
# Convert to string using our custom logic
return self.convert_tokens_to_string(tokens)
def convert_tokens_to_string(self, tokens: List[str]) -> str:
"""Converts a sequence of tokens into a single string."""
text = " ".join(tokens)
# Remove internal morphological underscores (e.g., 'w_s_y' -> 'wsy')
# We use a regex to ensure we only remove underscores that are
# acting as connectors between Arabic segments, preserving snake_case.
arabic_range = r'[\u0600-\u06FF]'
return re.sub(rf'(?<={arabic_range})_|_(?={arabic_range})', '', text)