|
|
"""
|
|
|
Data Preprocessing & Cleaning Module
|
|
|
Author: AI Generated
|
|
|
Created: 2025-11-24
|
|
|
Purpose: Clean and preprocess data before AI processing
|
|
|
"""
|
|
|
|
|
|
import re
|
|
|
from typing import List, Dict
|
|
|
import numpy as np
|
|
|
from pyvi import ViTokenizer
|
|
|
from sklearn.preprocessing import StandardScaler
|
|
|
|
|
|
|
|
|
class VietnameseTextCleaner:
|
|
|
"""
|
|
|
Clean and preprocess Vietnamese text for NLP tasks.
|
|
|
"""
|
|
|
|
|
|
|
|
|
STOP_WORDS = {
|
|
|
'và', 'của', 'có', 'là', 'được', 'này', 'cho', 'với', 'các',
|
|
|
'đã', 'trong', 'không', 'rất', 'một', 'để', 'những', 'cũng',
|
|
|
'về', 'từ', 'hay', 'bị', 'như', 'làm', 'đó', 'lại', 'sẽ',
|
|
|
'thì', 'nếu', 'khi', 'mà', 'hoặc', 'nên', 'trên', 'dưới'
|
|
|
}
|
|
|
|
|
|
def __init__(self):
|
|
|
self.tokenizer = ViTokenizer
|
|
|
|
|
|
def clean_text(self, text: str) -> str:
|
|
|
"""
|
|
|
Clean Vietnamese text:
|
|
|
- Remove HTML tags
|
|
|
- Remove special characters
|
|
|
- Normalize whitespace
|
|
|
- Lowercase
|
|
|
"""
|
|
|
if not text:
|
|
|
return ""
|
|
|
|
|
|
|
|
|
text = re.sub(r'<[^>]+>', '', text)
|
|
|
|
|
|
|
|
|
text = re.sub(r'http\S+|www\.\S+', '', text)
|
|
|
|
|
|
|
|
|
text = re.sub(r'\S+@\S+', '', text)
|
|
|
|
|
|
|
|
|
text = re.sub(r'[^a-zA-ZàáảãạăắằẵặẳâấầẩẫậèéẻẽẹêếềểễệìíỉĩịòóỏõọôốồổỗộơớờởỡợùúủũụưứừửữựỳýỷỹỵđĐ\s]', ' ', text)
|
|
|
|
|
|
|
|
|
text = re.sub(r'\s+', ' ', text).strip()
|
|
|
|
|
|
|
|
|
text = text.lower()
|
|
|
|
|
|
return text
|
|
|
|
|
|
def tokenize(self, text: str) -> List[str]:
|
|
|
"""
|
|
|
Tokenize Vietnamese text using pyvi.
|
|
|
Returns list of words.
|
|
|
"""
|
|
|
text = self.clean_text(text)
|
|
|
if not text:
|
|
|
return []
|
|
|
|
|
|
|
|
|
tokenized = self.tokenizer.tokenize(text)
|
|
|
words = tokenized.split()
|
|
|
|
|
|
return words
|
|
|
|
|
|
def remove_stopwords(self, words: List[str]) -> List[str]:
|
|
|
"""
|
|
|
Remove Vietnamese stopwords.
|
|
|
"""
|
|
|
return [w for w in words if w not in self.STOP_WORDS]
|
|
|
|
|
|
def preprocess_for_sentiment(self, text: str) -> str:
|
|
|
"""
|
|
|
Preprocess text for PhoBERT sentiment analysis.
|
|
|
PhoBERT expects word-segmented text.
|
|
|
"""
|
|
|
|
|
|
words = self.tokenize(text)
|
|
|
|
|
|
|
|
|
return ' '.join(words)
|
|
|
|
|
|
def extract_keywords(self, text: str, top_n: int = 5) -> List[str]:
|
|
|
"""
|
|
|
Extract keywords from text.
|
|
|
Simple TF approach without stopwords.
|
|
|
"""
|
|
|
words = self.tokenize(text)
|
|
|
words = self.remove_stopwords(words)
|
|
|
|
|
|
|
|
|
word_freq = {}
|
|
|
for word in words:
|
|
|
if len(word) > 2:
|
|
|
word_freq[word] = word_freq.get(word, 0) + 1
|
|
|
|
|
|
|
|
|
top_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)[:top_n]
|
|
|
return [word[0] for word in top_words]
|
|
|
|
|
|
|
|
|
class DataCleaner:
|
|
|
"""
|
|
|
Clean and validate user feature data for clustering.
|
|
|
"""
|
|
|
|
|
|
def __init__(self):
|
|
|
self.scaler = StandardScaler()
|
|
|
|
|
|
def remove_outliers(self, data: np.ndarray, threshold: float = 3.0) -> tuple:
|
|
|
"""
|
|
|
Remove outliers using Z-score method.
|
|
|
Returns: (cleaned_data, valid_indices)
|
|
|
"""
|
|
|
|
|
|
z_scores = np.abs((data - data.mean(axis=0)) / data.std(axis=0))
|
|
|
|
|
|
|
|
|
valid_indices = np.where(np.all(z_scores < threshold, axis=1))[0]
|
|
|
|
|
|
cleaned_data = data[valid_indices]
|
|
|
|
|
|
removed_count = len(data) - len(cleaned_data)
|
|
|
if removed_count > 0:
|
|
|
print(f" ⚠ Removed {removed_count} outliers ({removed_count/len(data)*100:.1f}%)")
|
|
|
|
|
|
return cleaned_data, valid_indices
|
|
|
|
|
|
def handle_missing_values(self, data: np.ndarray) -> np.ndarray:
|
|
|
"""
|
|
|
Handle missing values (NaN, inf) by replacing with median.
|
|
|
"""
|
|
|
|
|
|
data = np.where(np.isinf(data), np.nan, data)
|
|
|
|
|
|
|
|
|
col_median = np.nanmedian(data, axis=0)
|
|
|
inds = np.where(np.isnan(data))
|
|
|
data[inds] = np.take(col_median, inds[1])
|
|
|
|
|
|
return data
|
|
|
|
|
|
def normalize_features(self, data: np.ndarray, fit: bool = True) -> np.ndarray:
|
|
|
"""
|
|
|
Standardize features using StandardScaler.
|
|
|
|
|
|
Args:
|
|
|
data: Feature matrix
|
|
|
fit: If True, fit scaler. If False, use existing scaler.
|
|
|
"""
|
|
|
if fit:
|
|
|
normalized = self.scaler.fit_transform(data)
|
|
|
else:
|
|
|
normalized = self.scaler.transform(data)
|
|
|
|
|
|
return normalized
|
|
|
|
|
|
def clean_user_features(self, feature_matrix: np.ndarray, remove_outliers: bool = True) -> tuple:
|
|
|
"""
|
|
|
Complete cleaning pipeline for user features.
|
|
|
|
|
|
Returns: (cleaned_features, valid_indices)
|
|
|
"""
|
|
|
print("🔄 Cleaning user feature data...")
|
|
|
|
|
|
|
|
|
data = self.handle_missing_values(feature_matrix)
|
|
|
print(f" ✓ Handled missing values")
|
|
|
|
|
|
|
|
|
if remove_outliers:
|
|
|
data, valid_indices = self.remove_outliers(data)
|
|
|
else:
|
|
|
valid_indices = np.arange(len(data))
|
|
|
|
|
|
|
|
|
data = self.normalize_features(data, fit=True)
|
|
|
print(f" ✓ Normalized {data.shape[0]} samples")
|
|
|
|
|
|
return data, valid_indices
|
|
|
|