Spaces:
Sleeping
Sleeping
| """ | |
| Text Preprocessor Module | |
| ======================== | |
| Provides language-specific text preprocessing pipelines for | |
| English, Chinese, and Khmer languages. | |
| """ | |
| import re | |
| import unicodedata | |
| from typing import List, Optional, Tuple, Callable | |
| from dataclasses import dataclass | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| class PreprocessingStats: | |
| """Statistics from preprocessing operation.""" | |
| original_count: int = 0 | |
| processed_count: int = 0 | |
| empty_removed: int = 0 | |
| duplicates_removed: int = 0 | |
| avg_length_before: float = 0.0 | |
| avg_length_after: float = 0.0 | |
| class TextPreprocessor: | |
| """ | |
| Language-aware text preprocessor for multilingual NLP tasks. | |
| Supports: | |
| - English (en): Standard NLP preprocessing with stopword removal | |
| - Chinese (zh): Character-level processing with jieba segmentation | |
| - Khmer (km): Unicode normalization for Khmer script | |
| """ | |
| # Common URL and email patterns | |
| URL_PATTERN = re.compile( | |
| r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+' | |
| ) | |
| EMAIL_PATTERN = re.compile(r'[\w\.-]+@[\w\.-]+\.\w+') | |
| # Language-specific patterns | |
| CHINESE_CHAR_PATTERN = re.compile(r'[\u4e00-\u9fff]+') | |
| KHMER_CHAR_PATTERN = re.compile(r'[\u1780-\u17FF]+') | |
| def __init__(self, language: str = "en"): | |
| """ | |
| Initialize the preprocessor for a specific language. | |
| Args: | |
| language: Language code ('en', 'zh', 'km') | |
| """ | |
| self.language = language | |
| self._jieba_initialized = False | |
| self._setup_language_resources() | |
| def _setup_language_resources(self): | |
| """Setup language-specific resources.""" | |
| # English stopwords (basic list) | |
| self.english_stopwords = { | |
| 'a', 'an', 'the', 'and', 'or', 'but', 'in', 'on', 'at', 'to', | |
| 'for', 'of', 'with', 'by', 'from', 'as', 'is', 'was', 'are', | |
| 'were', 'been', 'be', 'have', 'has', 'had', 'do', 'does', 'did', | |
| 'will', 'would', 'could', 'should', 'may', 'might', 'must', | |
| 'this', 'that', 'these', 'those', 'i', 'you', 'he', 'she', 'it', | |
| 'we', 'they', 'what', 'which', 'who', 'when', 'where', 'why', 'how' | |
| } | |
| # Chinese stopwords (basic list) | |
| self.chinese_stopwords = { | |
| '的', '了', '和', '是', '就', '都', '而', '及', '与', '着', | |
| '之', '用', '于', '但', '并', '等', '被', '让', '给', '在', | |
| '也', '很', '只', '又', '这', '那', '些', '把', '比', '去' | |
| } | |
| # Khmer stopwords (basic list) | |
| self.khmer_stopwords = { | |
| 'និង', 'ឬ', 'ដែល', 'នៅ', 'ក្នុង', 'ពី', 'ទៅ', 'មាន', 'ជា', | |
| 'នេះ', 'នោះ', 'គេ', 'ខ្ញុំ', 'យើង', 'គាត់', 'នាង', 'វា' | |
| } | |
| def _init_jieba(self): | |
| """Lazy initialization of jieba for Chinese tokenization.""" | |
| if not self._jieba_initialized: | |
| try: | |
| import jieba | |
| jieba.setLogLevel(logging.WARNING) | |
| self._jieba = jieba | |
| self._jieba_initialized = True | |
| logger.info("Jieba initialized for Chinese tokenization") | |
| except ImportError: | |
| logger.warning("Jieba not installed. Using character-level tokenization for Chinese.") | |
| self._jieba = None | |
| self._jieba_initialized = True | |
| def normalize_unicode(self, text: str) -> str: | |
| """Normalize Unicode text to NFC form.""" | |
| return unicodedata.normalize('NFC', text) | |
| def remove_urls(self, text: str) -> str: | |
| """Remove URLs from text.""" | |
| return self.URL_PATTERN.sub(' [URL] ', text) | |
| def remove_emails(self, text: str) -> str: | |
| """Remove email addresses from text.""" | |
| return self.EMAIL_PATTERN.sub(' [EMAIL] ', text) | |
| def remove_extra_whitespace(self, text: str) -> str: | |
| """Remove extra whitespace and normalize spacing.""" | |
| return ' '.join(text.split()) | |
| def to_lowercase(self, text: str) -> str: | |
| """Convert text to lowercase.""" | |
| return text.lower() | |
| # ==================== English Preprocessing ==================== | |
| def preprocess_english(self, text: str, remove_stopwords: bool = False) -> str: | |
| """ | |
| Preprocess English text. | |
| Args: | |
| text: Input text | |
| remove_stopwords: Whether to remove common stopwords | |
| Returns: | |
| Preprocessed text | |
| """ | |
| # Normalize unicode | |
| text = self.normalize_unicode(text) | |
| # Remove URLs and emails | |
| text = self.remove_urls(text) | |
| text = self.remove_emails(text) | |
| # Lowercase | |
| text = self.to_lowercase(text) | |
| # Remove special characters but keep alphanumeric and basic punctuation | |
| text = re.sub(r'[^\w\s\.\,\!\?\-]', ' ', text) | |
| # Remove extra whitespace | |
| text = self.remove_extra_whitespace(text) | |
| # Optionally remove stopwords | |
| if remove_stopwords: | |
| words = text.split() | |
| words = [w for w in words if w not in self.english_stopwords] | |
| text = ' '.join(words) | |
| return text.strip() | |
| # ==================== Chinese Preprocessing ==================== | |
| def preprocess_chinese(self, text: str, segment_words: bool = True, | |
| remove_stopwords: bool = False) -> str: | |
| """ | |
| Preprocess Chinese text. | |
| Args: | |
| text: Input text (can contain both Chinese and English) | |
| segment_words: Whether to apply word segmentation | |
| remove_stopwords: Whether to remove common stopwords | |
| Returns: | |
| Preprocessed text | |
| """ | |
| # Normalize unicode | |
| text = self.normalize_unicode(text) | |
| # Remove URLs and emails | |
| text = self.remove_urls(text) | |
| text = self.remove_emails(text) | |
| # Convert full-width characters to half-width | |
| text = self._fullwidth_to_halfwidth(text) | |
| # Remove non-Chinese and non-alphanumeric characters | |
| # Keep Chinese characters, alphanumeric, and basic punctuation | |
| text = re.sub(r'[^\u4e00-\u9fff\w\s\。\,\!\?\、]', ' ', text) | |
| # Word segmentation | |
| if segment_words: | |
| self._init_jieba() | |
| if self._jieba: | |
| words = list(self._jieba.cut(text)) | |
| text = ' '.join(words) | |
| # Remove stopwords | |
| if remove_stopwords: | |
| words = text.split() | |
| words = [w for w in words if w not in self.chinese_stopwords | |
| and w not in self.english_stopwords] | |
| text = ' '.join(words) | |
| # Remove extra whitespace | |
| text = self.remove_extra_whitespace(text) | |
| return text.strip() | |
| def _fullwidth_to_halfwidth(self, text: str) -> str: | |
| """Convert full-width characters to half-width.""" | |
| result = [] | |
| for char in text: | |
| code = ord(char) | |
| # Full-width ASCII variants (excluding space) | |
| if 0xFF01 <= code <= 0xFF5E: | |
| code -= 0xFEE0 | |
| # Full-width space | |
| elif code == 0x3000: | |
| code = 0x0020 | |
| result.append(chr(code)) | |
| return ''.join(result) | |
| # ==================== Khmer Preprocessing ==================== | |
| def preprocess_khmer(self, text: str, normalize_spacing: bool = True) -> str: | |
| """ | |
| Preprocess Khmer text. | |
| Khmer script has unique characteristics: | |
| - No spaces between words | |
| - Complex consonant clusters | |
| - Dependent vowels and signs | |
| Args: | |
| text: Input text | |
| normalize_spacing: Whether to normalize whitespace | |
| Returns: | |
| Preprocessed text | |
| """ | |
| # Normalize unicode (important for Khmer) | |
| text = self.normalize_unicode(text) | |
| # Remove URLs and emails | |
| text = self.remove_urls(text) | |
| text = self.remove_emails(text) | |
| # Remove zero-width characters that might interfere | |
| text = re.sub(r'[\u200b\u200c\u200d\ufeff]', '', text) | |
| # Keep Khmer characters, ASCII alphanumeric, and basic punctuation | |
| # Khmer Unicode range: U+1780–U+17FF | |
| text = re.sub(r'[^\u1780-\u17FF\w\s\.\,\!\?]', ' ', text) | |
| if normalize_spacing: | |
| text = self.remove_extra_whitespace(text) | |
| return text.strip() | |
| # ==================== Main Interface ==================== | |
| def preprocess(self, text: str, **kwargs) -> str: | |
| """ | |
| Preprocess text according to the configured language. | |
| Args: | |
| text: Input text to preprocess | |
| **kwargs: Additional language-specific options | |
| Returns: | |
| Preprocessed text | |
| """ | |
| if not text or not isinstance(text, str): | |
| return "" | |
| if self.language == "en": | |
| return self.preprocess_english(text, **kwargs) | |
| elif self.language == "zh": | |
| return self.preprocess_chinese(text, **kwargs) | |
| elif self.language == "km": | |
| return self.preprocess_khmer(text, **kwargs) | |
| else: | |
| # Default: basic preprocessing | |
| logger.warning(f"Unknown language '{self.language}', using basic preprocessing") | |
| text = self.normalize_unicode(text) | |
| text = self.remove_urls(text) | |
| text = self.remove_emails(text) | |
| text = self.remove_extra_whitespace(text) | |
| return text.strip() | |
| def preprocess_batch(self, texts: List[str], | |
| remove_empty: bool = True, | |
| remove_duplicates: bool = False, | |
| **kwargs) -> Tuple[List[str], PreprocessingStats]: | |
| """ | |
| Preprocess a batch of texts. | |
| Args: | |
| texts: List of texts to preprocess | |
| remove_empty: Remove empty strings after preprocessing | |
| remove_duplicates: Remove duplicate texts | |
| **kwargs: Additional preprocessing options | |
| Returns: | |
| Tuple of (processed texts, preprocessing statistics) | |
| """ | |
| stats = PreprocessingStats( | |
| original_count=len(texts), | |
| avg_length_before=sum(len(t) for t in texts) / max(len(texts), 1) | |
| ) | |
| # Preprocess all texts | |
| processed = [self.preprocess(text, **kwargs) for text in texts] | |
| # Remove empty strings | |
| if remove_empty: | |
| original_len = len(processed) | |
| processed = [t for t in processed if t.strip()] | |
| stats.empty_removed = original_len - len(processed) | |
| # Remove duplicates while preserving order | |
| if remove_duplicates: | |
| seen = set() | |
| unique = [] | |
| for t in processed: | |
| if t not in seen: | |
| seen.add(t) | |
| unique.append(t) | |
| stats.duplicates_removed = len(processed) - len(unique) | |
| processed = unique | |
| stats.processed_count = len(processed) | |
| stats.avg_length_after = sum(len(t) for t in processed) / max(len(processed), 1) | |
| return processed, stats | |
| def detect_language(self, text: str) -> str: | |
| """ | |
| Simple language detection based on character patterns. | |
| Args: | |
| text: Input text | |
| Returns: | |
| Detected language code ('en', 'zh', 'km', or 'unknown') | |
| """ | |
| if not text: | |
| return "unknown" | |
| # Check for Khmer characters | |
| khmer_chars = len(self.KHMER_CHAR_PATTERN.findall(text)) | |
| # Check for Chinese characters | |
| chinese_chars = len(self.CHINESE_CHAR_PATTERN.findall(text)) | |
| # Count total characters | |
| total_chars = len(text) | |
| if khmer_chars > 0 and khmer_chars / total_chars > 0.3: | |
| return "km" | |
| elif chinese_chars > 0 and chinese_chars / total_chars > 0.3: | |
| return "zh" | |
| else: | |
| return "en" | |
| class DataValidator: | |
| """Validate and clean datasets for training.""" | |
| def validate_dataframe(df, text_column: str = "text", | |
| label_column: str = "label") -> Tuple[bool, str]: | |
| """ | |
| Validate DataFrame structure for training. | |
| Returns: | |
| Tuple of (is_valid, status_message) | |
| """ | |
| errors = [] | |
| if text_column not in df.columns: | |
| errors.append(f"Missing required column: '{text_column}'") | |
| if label_column not in df.columns: | |
| errors.append(f"Missing required column: '{label_column}'") | |
| if not errors: | |
| # Check for empty values | |
| empty_texts = df[text_column].isna().sum() + (df[text_column] == "").sum() | |
| if empty_texts > 0: | |
| errors.append(f"Found {empty_texts} empty text entries") | |
| empty_labels = df[label_column].isna().sum() | |
| if empty_labels > 0: | |
| errors.append(f"Found {empty_labels} missing labels") | |
| if not errors: | |
| return True, "Dataset structure is valid" | |
| else: | |
| return False, "; ".join(errors) | |
| def get_label_distribution(df, label_column: str = "label") -> dict: | |
| """Get distribution of labels in dataset.""" | |
| if label_column not in df.columns: | |
| return {} | |
| return df[label_column].value_counts().to_dict() | |