""" Text Preprocessor Module ======================== Provides language-specific text preprocessing pipelines for English, Chinese, and Khmer languages. """ import re import unicodedata from typing import List, Optional, Tuple, Callable from dataclasses import dataclass import logging logger = logging.getLogger(__name__) @dataclass class PreprocessingStats: """Statistics from preprocessing operation.""" original_count: int = 0 processed_count: int = 0 empty_removed: int = 0 duplicates_removed: int = 0 avg_length_before: float = 0.0 avg_length_after: float = 0.0 class TextPreprocessor: """ Language-aware text preprocessor for multilingual NLP tasks. Supports: - English (en): Standard NLP preprocessing with stopword removal - Chinese (zh): Character-level processing with jieba segmentation - Khmer (km): Unicode normalization for Khmer script """ # Common URL and email patterns URL_PATTERN = re.compile( r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+' ) EMAIL_PATTERN = re.compile(r'[\w\.-]+@[\w\.-]+\.\w+') # Language-specific patterns CHINESE_CHAR_PATTERN = re.compile(r'[\u4e00-\u9fff]+') KHMER_CHAR_PATTERN = re.compile(r'[\u1780-\u17FF]+') def __init__(self, language: str = "en"): """ Initialize the preprocessor for a specific language. Args: language: Language code ('en', 'zh', 'km') """ self.language = language self._jieba_initialized = False self._setup_language_resources() def _setup_language_resources(self): """Setup language-specific resources.""" # English stopwords (basic list) self.english_stopwords = { 'a', 'an', 'the', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by', 'from', 'as', 'is', 'was', 'are', 'were', 'been', 'be', 'have', 'has', 'had', 'do', 'does', 'did', 'will', 'would', 'could', 'should', 'may', 'might', 'must', 'this', 'that', 'these', 'those', 'i', 'you', 'he', 'she', 'it', 'we', 'they', 'what', 'which', 'who', 'when', 'where', 'why', 'how' } # Chinese stopwords (basic list) self.chinese_stopwords = { '的', '了', '和', '是', '就', '都', '而', '及', '与', '着', '之', '用', '于', '但', '并', '等', '被', '让', '给', '在', '也', '很', '只', '又', '这', '那', '些', '把', '比', '去' } # Khmer stopwords (basic list) self.khmer_stopwords = { 'និង', 'ឬ', 'ដែល', 'នៅ', 'ក្នុង', 'ពី', 'ទៅ', 'មាន', 'ជា', 'នេះ', 'នោះ', 'គេ', 'ខ្ញុំ', 'យើង', 'គាត់', 'នាង', 'វា' } def _init_jieba(self): """Lazy initialization of jieba for Chinese tokenization.""" if not self._jieba_initialized: try: import jieba jieba.setLogLevel(logging.WARNING) self._jieba = jieba self._jieba_initialized = True logger.info("Jieba initialized for Chinese tokenization") except ImportError: logger.warning("Jieba not installed. Using character-level tokenization for Chinese.") self._jieba = None self._jieba_initialized = True def normalize_unicode(self, text: str) -> str: """Normalize Unicode text to NFC form.""" return unicodedata.normalize('NFC', text) def remove_urls(self, text: str) -> str: """Remove URLs from text.""" return self.URL_PATTERN.sub(' [URL] ', text) def remove_emails(self, text: str) -> str: """Remove email addresses from text.""" return self.EMAIL_PATTERN.sub(' [EMAIL] ', text) def remove_extra_whitespace(self, text: str) -> str: """Remove extra whitespace and normalize spacing.""" return ' '.join(text.split()) def to_lowercase(self, text: str) -> str: """Convert text to lowercase.""" return text.lower() # ==================== English Preprocessing ==================== def preprocess_english(self, text: str, remove_stopwords: bool = False) -> str: """ Preprocess English text. Args: text: Input text remove_stopwords: Whether to remove common stopwords Returns: Preprocessed text """ # Normalize unicode text = self.normalize_unicode(text) # Remove URLs and emails text = self.remove_urls(text) text = self.remove_emails(text) # Lowercase text = self.to_lowercase(text) # Remove special characters but keep alphanumeric and basic punctuation text = re.sub(r'[^\w\s\.\,\!\?\-]', ' ', text) # Remove extra whitespace text = self.remove_extra_whitespace(text) # Optionally remove stopwords if remove_stopwords: words = text.split() words = [w for w in words if w not in self.english_stopwords] text = ' '.join(words) return text.strip() # ==================== Chinese Preprocessing ==================== def preprocess_chinese(self, text: str, segment_words: bool = True, remove_stopwords: bool = False) -> str: """ Preprocess Chinese text. Args: text: Input text (can contain both Chinese and English) segment_words: Whether to apply word segmentation remove_stopwords: Whether to remove common stopwords Returns: Preprocessed text """ # Normalize unicode text = self.normalize_unicode(text) # Remove URLs and emails text = self.remove_urls(text) text = self.remove_emails(text) # Convert full-width characters to half-width text = self._fullwidth_to_halfwidth(text) # Remove non-Chinese and non-alphanumeric characters # Keep Chinese characters, alphanumeric, and basic punctuation text = re.sub(r'[^\u4e00-\u9fff\w\s\。\,\!\?\、]', ' ', text) # Word segmentation if segment_words: self._init_jieba() if self._jieba: words = list(self._jieba.cut(text)) text = ' '.join(words) # Remove stopwords if remove_stopwords: words = text.split() words = [w for w in words if w not in self.chinese_stopwords and w not in self.english_stopwords] text = ' '.join(words) # Remove extra whitespace text = self.remove_extra_whitespace(text) return text.strip() def _fullwidth_to_halfwidth(self, text: str) -> str: """Convert full-width characters to half-width.""" result = [] for char in text: code = ord(char) # Full-width ASCII variants (excluding space) if 0xFF01 <= code <= 0xFF5E: code -= 0xFEE0 # Full-width space elif code == 0x3000: code = 0x0020 result.append(chr(code)) return ''.join(result) # ==================== Khmer Preprocessing ==================== def preprocess_khmer(self, text: str, normalize_spacing: bool = True) -> str: """ Preprocess Khmer text. Khmer script has unique characteristics: - No spaces between words - Complex consonant clusters - Dependent vowels and signs Args: text: Input text normalize_spacing: Whether to normalize whitespace Returns: Preprocessed text """ # Normalize unicode (important for Khmer) text = self.normalize_unicode(text) # Remove URLs and emails text = self.remove_urls(text) text = self.remove_emails(text) # Remove zero-width characters that might interfere text = re.sub(r'[\u200b\u200c\u200d\ufeff]', '', text) # Keep Khmer characters, ASCII alphanumeric, and basic punctuation # Khmer Unicode range: U+1780–U+17FF text = re.sub(r'[^\u1780-\u17FF\w\s\.\,\!\?]', ' ', text) if normalize_spacing: text = self.remove_extra_whitespace(text) return text.strip() # ==================== Main Interface ==================== def preprocess(self, text: str, **kwargs) -> str: """ Preprocess text according to the configured language. Args: text: Input text to preprocess **kwargs: Additional language-specific options Returns: Preprocessed text """ if not text or not isinstance(text, str): return "" if self.language == "en": return self.preprocess_english(text, **kwargs) elif self.language == "zh": return self.preprocess_chinese(text, **kwargs) elif self.language == "km": return self.preprocess_khmer(text, **kwargs) else: # Default: basic preprocessing logger.warning(f"Unknown language '{self.language}', using basic preprocessing") text = self.normalize_unicode(text) text = self.remove_urls(text) text = self.remove_emails(text) text = self.remove_extra_whitespace(text) return text.strip() def preprocess_batch(self, texts: List[str], remove_empty: bool = True, remove_duplicates: bool = False, **kwargs) -> Tuple[List[str], PreprocessingStats]: """ Preprocess a batch of texts. Args: texts: List of texts to preprocess remove_empty: Remove empty strings after preprocessing remove_duplicates: Remove duplicate texts **kwargs: Additional preprocessing options Returns: Tuple of (processed texts, preprocessing statistics) """ stats = PreprocessingStats( original_count=len(texts), avg_length_before=sum(len(t) for t in texts) / max(len(texts), 1) ) # Preprocess all texts processed = [self.preprocess(text, **kwargs) for text in texts] # Remove empty strings if remove_empty: original_len = len(processed) processed = [t for t in processed if t.strip()] stats.empty_removed = original_len - len(processed) # Remove duplicates while preserving order if remove_duplicates: seen = set() unique = [] for t in processed: if t not in seen: seen.add(t) unique.append(t) stats.duplicates_removed = len(processed) - len(unique) processed = unique stats.processed_count = len(processed) stats.avg_length_after = sum(len(t) for t in processed) / max(len(processed), 1) return processed, stats def detect_language(self, text: str) -> str: """ Simple language detection based on character patterns. Args: text: Input text Returns: Detected language code ('en', 'zh', 'km', or 'unknown') """ if not text: return "unknown" # Check for Khmer characters khmer_chars = len(self.KHMER_CHAR_PATTERN.findall(text)) # Check for Chinese characters chinese_chars = len(self.CHINESE_CHAR_PATTERN.findall(text)) # Count total characters total_chars = len(text) if khmer_chars > 0 and khmer_chars / total_chars > 0.3: return "km" elif chinese_chars > 0 and chinese_chars / total_chars > 0.3: return "zh" else: return "en" class DataValidator: """Validate and clean datasets for training.""" @staticmethod def validate_dataframe(df, text_column: str = "text", label_column: str = "label") -> Tuple[bool, str]: """ Validate DataFrame structure for training. Returns: Tuple of (is_valid, status_message) """ errors = [] if text_column not in df.columns: errors.append(f"Missing required column: '{text_column}'") if label_column not in df.columns: errors.append(f"Missing required column: '{label_column}'") if not errors: # Check for empty values empty_texts = df[text_column].isna().sum() + (df[text_column] == "").sum() if empty_texts > 0: errors.append(f"Found {empty_texts} empty text entries") empty_labels = df[label_column].isna().sum() if empty_labels > 0: errors.append(f"Found {empty_labels} missing labels") if not errors: return True, "Dataset structure is valid" else: return False, "; ".join(errors) @staticmethod def get_label_distribution(df, label_column: str = "label") -> dict: """Get distribution of labels in dataset.""" if label_column not in df.columns: return {} return df[label_column].value_counts().to_dict()