MLOps-Platforms / src /mlops /preprocessor.py
songhieng's picture
Upload 72 files
7e825f9 verified
"""
Text Preprocessor Module
========================
Provides language-specific text preprocessing pipelines for
English, Chinese, and Khmer languages.
"""
import re
import unicodedata
from typing import List, Optional, Tuple, Callable
from dataclasses import dataclass
import logging
logger = logging.getLogger(__name__)
@dataclass
class PreprocessingStats:
"""Statistics from preprocessing operation."""
original_count: int = 0
processed_count: int = 0
empty_removed: int = 0
duplicates_removed: int = 0
avg_length_before: float = 0.0
avg_length_after: float = 0.0
class TextPreprocessor:
"""
Language-aware text preprocessor for multilingual NLP tasks.
Supports:
- English (en): Standard NLP preprocessing with stopword removal
- Chinese (zh): Character-level processing with jieba segmentation
- Khmer (km): Unicode normalization for Khmer script
"""
# Common URL and email patterns
URL_PATTERN = re.compile(
r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'
)
EMAIL_PATTERN = re.compile(r'[\w\.-]+@[\w\.-]+\.\w+')
# Language-specific patterns
CHINESE_CHAR_PATTERN = re.compile(r'[\u4e00-\u9fff]+')
KHMER_CHAR_PATTERN = re.compile(r'[\u1780-\u17FF]+')
def __init__(self, language: str = "en"):
"""
Initialize the preprocessor for a specific language.
Args:
language: Language code ('en', 'zh', 'km')
"""
self.language = language
self._jieba_initialized = False
self._setup_language_resources()
def _setup_language_resources(self):
"""Setup language-specific resources."""
# English stopwords (basic list)
self.english_stopwords = {
'a', 'an', 'the', 'and', 'or', 'but', 'in', 'on', 'at', 'to',
'for', 'of', 'with', 'by', 'from', 'as', 'is', 'was', 'are',
'were', 'been', 'be', 'have', 'has', 'had', 'do', 'does', 'did',
'will', 'would', 'could', 'should', 'may', 'might', 'must',
'this', 'that', 'these', 'those', 'i', 'you', 'he', 'she', 'it',
'we', 'they', 'what', 'which', 'who', 'when', 'where', 'why', 'how'
}
# Chinese stopwords (basic list)
self.chinese_stopwords = {
'的', '了', '和', '是', '就', '都', '而', '及', '与', '着',
'之', '用', '于', '但', '并', '等', '被', '让', '给', '在',
'也', '很', '只', '又', '这', '那', '些', '把', '比', '去'
}
# Khmer stopwords (basic list)
self.khmer_stopwords = {
'និង', 'ឬ', 'ដែល', 'នៅ', 'ក្នុង', 'ពី', 'ទៅ', 'មាន', 'ជា',
'នេះ', 'នោះ', 'គេ', 'ខ្ញុំ', 'យើង', 'គាត់', 'នាង', 'វា'
}
def _init_jieba(self):
"""Lazy initialization of jieba for Chinese tokenization."""
if not self._jieba_initialized:
try:
import jieba
jieba.setLogLevel(logging.WARNING)
self._jieba = jieba
self._jieba_initialized = True
logger.info("Jieba initialized for Chinese tokenization")
except ImportError:
logger.warning("Jieba not installed. Using character-level tokenization for Chinese.")
self._jieba = None
self._jieba_initialized = True
def normalize_unicode(self, text: str) -> str:
"""Normalize Unicode text to NFC form."""
return unicodedata.normalize('NFC', text)
def remove_urls(self, text: str) -> str:
"""Remove URLs from text."""
return self.URL_PATTERN.sub(' [URL] ', text)
def remove_emails(self, text: str) -> str:
"""Remove email addresses from text."""
return self.EMAIL_PATTERN.sub(' [EMAIL] ', text)
def remove_extra_whitespace(self, text: str) -> str:
"""Remove extra whitespace and normalize spacing."""
return ' '.join(text.split())
def to_lowercase(self, text: str) -> str:
"""Convert text to lowercase."""
return text.lower()
# ==================== English Preprocessing ====================
def preprocess_english(self, text: str, remove_stopwords: bool = False) -> str:
"""
Preprocess English text.
Args:
text: Input text
remove_stopwords: Whether to remove common stopwords
Returns:
Preprocessed text
"""
# Normalize unicode
text = self.normalize_unicode(text)
# Remove URLs and emails
text = self.remove_urls(text)
text = self.remove_emails(text)
# Lowercase
text = self.to_lowercase(text)
# Remove special characters but keep alphanumeric and basic punctuation
text = re.sub(r'[^\w\s\.\,\!\?\-]', ' ', text)
# Remove extra whitespace
text = self.remove_extra_whitespace(text)
# Optionally remove stopwords
if remove_stopwords:
words = text.split()
words = [w for w in words if w not in self.english_stopwords]
text = ' '.join(words)
return text.strip()
# ==================== Chinese Preprocessing ====================
def preprocess_chinese(self, text: str, segment_words: bool = True,
remove_stopwords: bool = False) -> str:
"""
Preprocess Chinese text.
Args:
text: Input text (can contain both Chinese and English)
segment_words: Whether to apply word segmentation
remove_stopwords: Whether to remove common stopwords
Returns:
Preprocessed text
"""
# Normalize unicode
text = self.normalize_unicode(text)
# Remove URLs and emails
text = self.remove_urls(text)
text = self.remove_emails(text)
# Convert full-width characters to half-width
text = self._fullwidth_to_halfwidth(text)
# Remove non-Chinese and non-alphanumeric characters
# Keep Chinese characters, alphanumeric, and basic punctuation
text = re.sub(r'[^\u4e00-\u9fff\w\s\。\,\!\?\、]', ' ', text)
# Word segmentation
if segment_words:
self._init_jieba()
if self._jieba:
words = list(self._jieba.cut(text))
text = ' '.join(words)
# Remove stopwords
if remove_stopwords:
words = text.split()
words = [w for w in words if w not in self.chinese_stopwords
and w not in self.english_stopwords]
text = ' '.join(words)
# Remove extra whitespace
text = self.remove_extra_whitespace(text)
return text.strip()
def _fullwidth_to_halfwidth(self, text: str) -> str:
"""Convert full-width characters to half-width."""
result = []
for char in text:
code = ord(char)
# Full-width ASCII variants (excluding space)
if 0xFF01 <= code <= 0xFF5E:
code -= 0xFEE0
# Full-width space
elif code == 0x3000:
code = 0x0020
result.append(chr(code))
return ''.join(result)
# ==================== Khmer Preprocessing ====================
def preprocess_khmer(self, text: str, normalize_spacing: bool = True) -> str:
"""
Preprocess Khmer text.
Khmer script has unique characteristics:
- No spaces between words
- Complex consonant clusters
- Dependent vowels and signs
Args:
text: Input text
normalize_spacing: Whether to normalize whitespace
Returns:
Preprocessed text
"""
# Normalize unicode (important for Khmer)
text = self.normalize_unicode(text)
# Remove URLs and emails
text = self.remove_urls(text)
text = self.remove_emails(text)
# Remove zero-width characters that might interfere
text = re.sub(r'[\u200b\u200c\u200d\ufeff]', '', text)
# Keep Khmer characters, ASCII alphanumeric, and basic punctuation
# Khmer Unicode range: U+1780–U+17FF
text = re.sub(r'[^\u1780-\u17FF\w\s\.\,\!\?]', ' ', text)
if normalize_spacing:
text = self.remove_extra_whitespace(text)
return text.strip()
# ==================== Main Interface ====================
def preprocess(self, text: str, **kwargs) -> str:
"""
Preprocess text according to the configured language.
Args:
text: Input text to preprocess
**kwargs: Additional language-specific options
Returns:
Preprocessed text
"""
if not text or not isinstance(text, str):
return ""
if self.language == "en":
return self.preprocess_english(text, **kwargs)
elif self.language == "zh":
return self.preprocess_chinese(text, **kwargs)
elif self.language == "km":
return self.preprocess_khmer(text, **kwargs)
else:
# Default: basic preprocessing
logger.warning(f"Unknown language '{self.language}', using basic preprocessing")
text = self.normalize_unicode(text)
text = self.remove_urls(text)
text = self.remove_emails(text)
text = self.remove_extra_whitespace(text)
return text.strip()
def preprocess_batch(self, texts: List[str],
remove_empty: bool = True,
remove_duplicates: bool = False,
**kwargs) -> Tuple[List[str], PreprocessingStats]:
"""
Preprocess a batch of texts.
Args:
texts: List of texts to preprocess
remove_empty: Remove empty strings after preprocessing
remove_duplicates: Remove duplicate texts
**kwargs: Additional preprocessing options
Returns:
Tuple of (processed texts, preprocessing statistics)
"""
stats = PreprocessingStats(
original_count=len(texts),
avg_length_before=sum(len(t) for t in texts) / max(len(texts), 1)
)
# Preprocess all texts
processed = [self.preprocess(text, **kwargs) for text in texts]
# Remove empty strings
if remove_empty:
original_len = len(processed)
processed = [t for t in processed if t.strip()]
stats.empty_removed = original_len - len(processed)
# Remove duplicates while preserving order
if remove_duplicates:
seen = set()
unique = []
for t in processed:
if t not in seen:
seen.add(t)
unique.append(t)
stats.duplicates_removed = len(processed) - len(unique)
processed = unique
stats.processed_count = len(processed)
stats.avg_length_after = sum(len(t) for t in processed) / max(len(processed), 1)
return processed, stats
def detect_language(self, text: str) -> str:
"""
Simple language detection based on character patterns.
Args:
text: Input text
Returns:
Detected language code ('en', 'zh', 'km', or 'unknown')
"""
if not text:
return "unknown"
# Check for Khmer characters
khmer_chars = len(self.KHMER_CHAR_PATTERN.findall(text))
# Check for Chinese characters
chinese_chars = len(self.CHINESE_CHAR_PATTERN.findall(text))
# Count total characters
total_chars = len(text)
if khmer_chars > 0 and khmer_chars / total_chars > 0.3:
return "km"
elif chinese_chars > 0 and chinese_chars / total_chars > 0.3:
return "zh"
else:
return "en"
class DataValidator:
"""Validate and clean datasets for training."""
@staticmethod
def validate_dataframe(df, text_column: str = "text",
label_column: str = "label") -> Tuple[bool, str]:
"""
Validate DataFrame structure for training.
Returns:
Tuple of (is_valid, status_message)
"""
errors = []
if text_column not in df.columns:
errors.append(f"Missing required column: '{text_column}'")
if label_column not in df.columns:
errors.append(f"Missing required column: '{label_column}'")
if not errors:
# Check for empty values
empty_texts = df[text_column].isna().sum() + (df[text_column] == "").sum()
if empty_texts > 0:
errors.append(f"Found {empty_texts} empty text entries")
empty_labels = df[label_column].isna().sum()
if empty_labels > 0:
errors.append(f"Found {empty_labels} missing labels")
if not errors:
return True, "Dataset structure is valid"
else:
return False, "; ".join(errors)
@staticmethod
def get_label_distribution(df, label_column: str = "label") -> dict:
"""Get distribution of labels in dataset."""
if label_column not in df.columns:
return {}
return df[label_column].value_counts().to_dict()