File size: 12,536 Bytes
6aa09c0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 | import pandas as pd
import numpy as np
from typing import Dict, List, Any, Tuple, Optional
from data_processor import DataProcessor
from ai_cleaner import AICleaner
from utils.logger import setup_logger
from config import Config
logger = setup_logger(__name__)
class CleaningEngine:
"""Main engine for orchestrating data cleaning operations"""
def __init__(self):
"""Initialize cleaning engine with required components"""
self.data_processor = DataProcessor()
self.ai_cleaner = AICleaner()
self.cleaning_report = {
'changes': [],
'statistics': {},
'errors': []
}
def clean_dataset(self, df: pd.DataFrame, cleaning_options: Dict[str, bool]) -> Tuple[pd.DataFrame, Dict]:
"""
Main method to clean the entire dataset
Args:
df: Original pandas DataFrame
cleaning_options: Dictionary of cleaning options to apply
Returns:
Tuple of (cleaned_dataframe, cleaning_report)
"""
logger.info(f"Starting data cleaning process for dataset with shape {df.shape}")
# Initialize cleaned dataframe
cleaned_df = df.copy()
self.cleaning_report = {'changes': [], 'statistics': {}, 'errors': []}
# Store original statistics
original_analysis = self.data_processor.analyze_data_quality(df)
self.cleaning_report['statistics']['original'] = original_analysis
try:
# 1. Remove duplicate rows
if cleaning_options.get('remove_duplicates', True):
cleaned_df, duplicate_changes = self._remove_duplicates(cleaned_df)
self.cleaning_report['changes'].extend(duplicate_changes)
# 2. Clean text columns with AI
if cleaning_options.get('ai_text_cleaning', True):
cleaned_df = self._clean_text_columns(cleaned_df)
# 3. Handle missing values
if cleaning_options.get('fill_missing_values', True):
cleaned_df = self._handle_missing_values(cleaned_df, cleaning_options)
# 4. Fix data types
if cleaning_options.get('fix_data_types', True):
cleaned_df = self._fix_data_types(cleaned_df)
# 5. Handle outliers
if cleaning_options.get('handle_outliers', False):
cleaned_df = self._handle_outliers(cleaned_df)
# Generate final statistics
final_analysis = self.data_processor.analyze_data_quality(cleaned_df)
self.cleaning_report['statistics']['cleaned'] = final_analysis
self.cleaning_report['statistics']['summary'] = self._generate_summary_stats(
original_analysis, final_analysis
)
logger.info(f"Data cleaning completed. Applied {len(self.cleaning_report['changes'])} changes")
except Exception as e:
error_msg = f"Error during data cleaning: {str(e)}"
logger.error(error_msg)
self.cleaning_report['errors'].append(error_msg)
return cleaned_df, self.cleaning_report
def _remove_duplicates(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, List[Dict]]:
"""Remove duplicate rows from dataframe"""
changes = []
original_count = len(df)
# Identify duplicates
duplicate_mask = df.duplicated()
duplicate_count = duplicate_mask.sum()
if duplicate_count > 0:
# Remove duplicates
cleaned_df = df.drop_duplicates()
changes.append({
'type': 'duplicate_removal',
'original_rows': original_count,
'removed_rows': duplicate_count,
'final_rows': len(cleaned_df)
})
logger.info(f"Removed {duplicate_count} duplicate rows")
return cleaned_df, changes
return df, changes
def _clean_text_columns(self, df: pd.DataFrame) -> pd.DataFrame:
"""Clean text columns using AI"""
cleaned_df = df.copy()
# Identify text columns
text_columns = [col for col in df.columns if df[col].dtype == 'object']
for col in text_columns:
# Skip if column has too many unique values (likely not categorical)
if df[col].nunique() > Config.MAX_ROWS_FOR_AI_PROCESSING:
logger.info(f"Skipping AI cleaning for column '{col}' - too many unique values")
continue
try:
cleaned_series, changes = self.ai_cleaner.clean_text_column(df[col], col)
cleaned_df[col] = cleaned_series
self.cleaning_report['changes'].extend(changes)
except Exception as e:
error_msg = f"Error cleaning text column '{col}': {str(e)}"
logger.error(error_msg)
self.cleaning_report['errors'].append(error_msg)
return cleaned_df
def _handle_missing_values(self, df: pd.DataFrame, options: Dict[str, bool]) -> pd.DataFrame:
"""Handle missing values using various strategies"""
cleaned_df = df.copy()
for col in df.columns:
if df[col].isnull().sum() == 0:
continue
try:
if df[col].dtype in ['int64', 'float64']:
# Numeric columns
cleaned_df[col] = self._fill_numeric_missing(df[col], col, options)
elif df[col].dtype == 'object':
# Text columns
cleaned_df[col] = self._fill_text_missing(df[col], col, options)
else:
# Other types - forward fill
cleaned_df[col] = df[col].fillna(method='ffill')
except Exception as e:
error_msg = f"Error handling missing values in column '{col}': {str(e)}"
logger.error(error_msg)
self.cleaning_report['errors'].append(error_msg)
return cleaned_df
def _fill_numeric_missing(self, series: pd.Series, col_name: str, options: Dict) -> pd.Series:
"""Fill missing values in numeric columns"""
missing_count = series.isnull().sum()
if options.get('use_ai_for_missing', False) and missing_count <= 10:
# Use AI for small number of missing values
try:
filled_series, changes = self.ai_cleaner.suggest_missing_values(series, col_name)
self.cleaning_report['changes'].extend(changes)
return filled_series
except Exception as e:
logger.warning(f"AI filling failed for {col_name}, falling back to statistical method")
# Use statistical methods
if series.std() / series.mean() < 0.5: # Low variance - use mean
fill_value = series.mean()
method = 'mean'
else: # High variance - use median
fill_value = series.median()
method = 'median'
filled_series = series.fillna(fill_value)
self.cleaning_report['changes'].append({
'type': 'missing_value_fill',
'column': col_name,
'method': method,
'fill_value': fill_value,
'count': missing_count
})
return filled_series
def _fill_text_missing(self, series: pd.Series, col_name: str, options: Dict) -> pd.Series:
"""Fill missing values in text columns"""
missing_count = series.isnull().sum()
if options.get('use_ai_for_missing', False) and missing_count <= 5:
# Use AI for small number of missing values
try:
filled_series, changes = self.ai_cleaner.suggest_missing_values(series, col_name)
self.cleaning_report['changes'].extend(changes)
return filled_series
except Exception as e:
logger.warning(f"AI filling failed for {col_name}, falling back to mode")
# Use mode (most frequent value)
mode_value = series.mode()
if len(mode_value) > 0:
fill_value = mode_value.iloc[0]
else:
fill_value = "Unknown"
filled_series = series.fillna(fill_value)
self.cleaning_report['changes'].append({
'type': 'missing_value_fill',
'column': col_name,
'method': 'mode',
'fill_value': fill_value,
'count': missing_count
})
return filled_series
def _fix_data_types(self, df: pd.DataFrame) -> pd.DataFrame:
"""Fix and optimize data types"""
cleaned_df = df.copy()
type_suggestions = self.data_processor.detect_data_types(df)
for col, suggested_type in type_suggestions.items():
current_type = str(df[col].dtype)
if current_type != suggested_type:
try:
if suggested_type == 'int64':
cleaned_df[col] = pd.to_numeric(df[col], errors='coerce').astype('Int64')
elif suggested_type == 'float64':
cleaned_df[col] = pd.to_numeric(df[col], errors='coerce')
elif suggested_type == 'datetime64[ns]':
cleaned_df[col] = pd.to_datetime(df[col], errors='coerce')
elif suggested_type == 'bool':
cleaned_df[col] = df[col].astype(str).str.lower().map({
'true': True, 'false': False, '1': True, '0': False,
'yes': True, 'no': False
})
self.cleaning_report['changes'].append({
'type': 'data_type_conversion',
'column': col,
'from_type': current_type,
'to_type': suggested_type
})
except Exception as e:
logger.warning(f"Could not convert column '{col}' to {suggested_type}: {str(e)}")
return cleaned_df
def _handle_outliers(self, df: pd.DataFrame) -> pd.DataFrame:
"""Handle outliers in numeric columns"""
cleaned_df = df.copy()
numeric_columns = df.select_dtypes(include=[np.number]).columns
for col in numeric_columns:
Q1 = df[col].quantile(0.25)
Q3 = df[col].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outlier_mask = (df[col] < lower_bound) | (df[col] > upper_bound)
outlier_count = outlier_mask.sum()
if outlier_count > 0:
# Cap outliers at bounds
cleaned_df.loc[df[col] < lower_bound, col] = lower_bound
cleaned_df.loc[df[col] > upper_bound, col] = upper_bound
self.cleaning_report['changes'].append({
'type': 'outlier_handling',
'column': col,
'outliers_capped': outlier_count,
'lower_bound': lower_bound,
'upper_bound': upper_bound
})
return cleaned_df
def _generate_summary_stats(self, original: Dict, cleaned: Dict) -> Dict:
"""Generate summary statistics comparing original and cleaned data"""
summary = {
'total_changes': len(self.cleaning_report['changes']),
'rows_before': original['shape'][0],
'rows_after': cleaned['shape'][0],
'columns': original['shape'][1],
'missing_values_before': sum(original['missing_values'].values()),
'missing_values_after': sum(cleaned['missing_values'].values()),
'duplicates_removed': original['duplicates']
}
summary['missing_reduction_percentage'] = (
(summary['missing_values_before'] - summary['missing_values_after']) /
max(summary['missing_values_before'], 1) * 100
)
return summary
|