""" Data Preprocessing & Cleaning Module Author: AI Generated Created: 2025-11-24 Purpose: Clean and preprocess data before AI processing """ import re from typing import List, Dict import numpy as np from pyvi import ViTokenizer from sklearn.preprocessing import StandardScaler class VietnameseTextCleaner: """ Clean and preprocess Vietnamese text for NLP tasks. """ # Vietnamese stopwords STOP_WORDS = { 'và', 'của', 'có', 'là', 'được', 'này', 'cho', 'với', 'các', 'đã', 'trong', 'không', 'rất', 'một', 'để', 'những', 'cũng', 'về', 'từ', 'hay', 'bị', 'như', 'làm', 'đó', 'lại', 'sẽ', 'thì', 'nếu', 'khi', 'mà', 'hoặc', 'nên', 'trên', 'dưới' } def __init__(self): self.tokenizer = ViTokenizer def clean_text(self, text: str) -> str: """ Clean Vietnamese text: - Remove HTML tags - Remove special characters - Normalize whitespace - Lowercase """ if not text: return "" # Remove HTML tags text = re.sub(r'<[^>]+>', '', text) # Remove URLs text = re.sub(r'http\S+|www\.\S+', '', text) # Remove emails text = re.sub(r'\S+@\S+', '', text) # Remove special characters (keep Vietnamese) text = re.sub(r'[^a-zA-ZàáảãạăắằẵặẳâấầẩẫậèéẻẽẹêếềểễệìíỉĩịòóỏõọôốồổỗộơớờởỡợùúủũụưứừửữựỳýỷỹỵđĐ\s]', ' ', text) # Normalize whitespace text = re.sub(r'\s+', ' ', text).strip() # Lowercase text = text.lower() return text def tokenize(self, text: str) -> List[str]: """ Tokenize Vietnamese text using pyvi. Returns list of words. """ text = self.clean_text(text) if not text: return [] # Use pyvi for Vietnamese word segmentation tokenized = self.tokenizer.tokenize(text) words = tokenized.split() return words def remove_stopwords(self, words: List[str]) -> List[str]: """ Remove Vietnamese stopwords. """ return [w for w in words if w not in self.STOP_WORDS] def preprocess_for_sentiment(self, text: str) -> str: """ Preprocess text for PhoBERT sentiment analysis. PhoBERT expects word-segmented text. """ # Clean and tokenize words = self.tokenize(text) # Join back with spaces (word-segmented format) return ' '.join(words) def extract_keywords(self, text: str, top_n: int = 5) -> List[str]: """ Extract keywords from text. Simple TF approach without stopwords. """ words = self.tokenize(text) words = self.remove_stopwords(words) # Count frequency word_freq = {} for word in words: if len(word) > 2: # Filter very short words word_freq[word] = word_freq.get(word, 0) + 1 # Get top N top_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)[:top_n] return [word[0] for word in top_words] class DataCleaner: """ Clean and validate user feature data for clustering. """ def __init__(self): self.scaler = StandardScaler() def remove_outliers(self, data: np.ndarray, threshold: float = 3.0) -> tuple: """ Remove outliers using Z-score method. Returns: (cleaned_data, valid_indices) """ # Calculate z-scores z_scores = np.abs((data - data.mean(axis=0)) / data.std(axis=0)) # Find rows without extreme outliers valid_indices = np.where(np.all(z_scores < threshold, axis=1))[0] cleaned_data = data[valid_indices] removed_count = len(data) - len(cleaned_data) if removed_count > 0: print(f" ⚠ Removed {removed_count} outliers ({removed_count/len(data)*100:.1f}%)") return cleaned_data, valid_indices def handle_missing_values(self, data: np.ndarray) -> np.ndarray: """ Handle missing values (NaN, inf) by replacing with median. """ # Replace inf with NaN data = np.where(np.isinf(data), np.nan, data) # Replace NaN with column median col_median = np.nanmedian(data, axis=0) inds = np.where(np.isnan(data)) data[inds] = np.take(col_median, inds[1]) return data def normalize_features(self, data: np.ndarray, fit: bool = True) -> np.ndarray: """ Standardize features using StandardScaler. Args: data: Feature matrix fit: If True, fit scaler. If False, use existing scaler. """ if fit: normalized = self.scaler.fit_transform(data) else: normalized = self.scaler.transform(data) return normalized def clean_user_features(self, feature_matrix: np.ndarray, remove_outliers: bool = True) -> tuple: """ Complete cleaning pipeline for user features. Returns: (cleaned_features, valid_indices) """ print("🔄 Cleaning user feature data...") # Step 1: Handle missing values data = self.handle_missing_values(feature_matrix) print(f" ✓ Handled missing values") # Step 2: Remove outliers (optional) if remove_outliers: data, valid_indices = self.remove_outliers(data) else: valid_indices = np.arange(len(data)) # Step 3: Normalize data = self.normalize_features(data, fit=True) print(f" ✓ Normalized {data.shape[0]} samples") return data, valid_indices