AI-Based-Data-Cleaner / src /cleaning_engine.py
midlajvalappil's picture
Upload 14 files
6aa09c0 verified
import pandas as pd
import numpy as np
from typing import Dict, List, Any, Tuple, Optional
from data_processor import DataProcessor
from ai_cleaner import AICleaner
from utils.logger import setup_logger
from config import Config
logger = setup_logger(__name__)
class CleaningEngine:
"""Main engine for orchestrating data cleaning operations"""
def __init__(self):
"""Initialize cleaning engine with required components"""
self.data_processor = DataProcessor()
self.ai_cleaner = AICleaner()
self.cleaning_report = {
'changes': [],
'statistics': {},
'errors': []
}
def clean_dataset(self, df: pd.DataFrame, cleaning_options: Dict[str, bool]) -> Tuple[pd.DataFrame, Dict]:
"""
Main method to clean the entire dataset
Args:
df: Original pandas DataFrame
cleaning_options: Dictionary of cleaning options to apply
Returns:
Tuple of (cleaned_dataframe, cleaning_report)
"""
logger.info(f"Starting data cleaning process for dataset with shape {df.shape}")
# Initialize cleaned dataframe
cleaned_df = df.copy()
self.cleaning_report = {'changes': [], 'statistics': {}, 'errors': []}
# Store original statistics
original_analysis = self.data_processor.analyze_data_quality(df)
self.cleaning_report['statistics']['original'] = original_analysis
try:
# 1. Remove duplicate rows
if cleaning_options.get('remove_duplicates', True):
cleaned_df, duplicate_changes = self._remove_duplicates(cleaned_df)
self.cleaning_report['changes'].extend(duplicate_changes)
# 2. Clean text columns with AI
if cleaning_options.get('ai_text_cleaning', True):
cleaned_df = self._clean_text_columns(cleaned_df)
# 3. Handle missing values
if cleaning_options.get('fill_missing_values', True):
cleaned_df = self._handle_missing_values(cleaned_df, cleaning_options)
# 4. Fix data types
if cleaning_options.get('fix_data_types', True):
cleaned_df = self._fix_data_types(cleaned_df)
# 5. Handle outliers
if cleaning_options.get('handle_outliers', False):
cleaned_df = self._handle_outliers(cleaned_df)
# Generate final statistics
final_analysis = self.data_processor.analyze_data_quality(cleaned_df)
self.cleaning_report['statistics']['cleaned'] = final_analysis
self.cleaning_report['statistics']['summary'] = self._generate_summary_stats(
original_analysis, final_analysis
)
logger.info(f"Data cleaning completed. Applied {len(self.cleaning_report['changes'])} changes")
except Exception as e:
error_msg = f"Error during data cleaning: {str(e)}"
logger.error(error_msg)
self.cleaning_report['errors'].append(error_msg)
return cleaned_df, self.cleaning_report
def _remove_duplicates(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, List[Dict]]:
"""Remove duplicate rows from dataframe"""
changes = []
original_count = len(df)
# Identify duplicates
duplicate_mask = df.duplicated()
duplicate_count = duplicate_mask.sum()
if duplicate_count > 0:
# Remove duplicates
cleaned_df = df.drop_duplicates()
changes.append({
'type': 'duplicate_removal',
'original_rows': original_count,
'removed_rows': duplicate_count,
'final_rows': len(cleaned_df)
})
logger.info(f"Removed {duplicate_count} duplicate rows")
return cleaned_df, changes
return df, changes
def _clean_text_columns(self, df: pd.DataFrame) -> pd.DataFrame:
"""Clean text columns using AI"""
cleaned_df = df.copy()
# Identify text columns
text_columns = [col for col in df.columns if df[col].dtype == 'object']
for col in text_columns:
# Skip if column has too many unique values (likely not categorical)
if df[col].nunique() > Config.MAX_ROWS_FOR_AI_PROCESSING:
logger.info(f"Skipping AI cleaning for column '{col}' - too many unique values")
continue
try:
cleaned_series, changes = self.ai_cleaner.clean_text_column(df[col], col)
cleaned_df[col] = cleaned_series
self.cleaning_report['changes'].extend(changes)
except Exception as e:
error_msg = f"Error cleaning text column '{col}': {str(e)}"
logger.error(error_msg)
self.cleaning_report['errors'].append(error_msg)
return cleaned_df
def _handle_missing_values(self, df: pd.DataFrame, options: Dict[str, bool]) -> pd.DataFrame:
"""Handle missing values using various strategies"""
cleaned_df = df.copy()
for col in df.columns:
if df[col].isnull().sum() == 0:
continue
try:
if df[col].dtype in ['int64', 'float64']:
# Numeric columns
cleaned_df[col] = self._fill_numeric_missing(df[col], col, options)
elif df[col].dtype == 'object':
# Text columns
cleaned_df[col] = self._fill_text_missing(df[col], col, options)
else:
# Other types - forward fill
cleaned_df[col] = df[col].fillna(method='ffill')
except Exception as e:
error_msg = f"Error handling missing values in column '{col}': {str(e)}"
logger.error(error_msg)
self.cleaning_report['errors'].append(error_msg)
return cleaned_df
def _fill_numeric_missing(self, series: pd.Series, col_name: str, options: Dict) -> pd.Series:
"""Fill missing values in numeric columns"""
missing_count = series.isnull().sum()
if options.get('use_ai_for_missing', False) and missing_count <= 10:
# Use AI for small number of missing values
try:
filled_series, changes = self.ai_cleaner.suggest_missing_values(series, col_name)
self.cleaning_report['changes'].extend(changes)
return filled_series
except Exception as e:
logger.warning(f"AI filling failed for {col_name}, falling back to statistical method")
# Use statistical methods
if series.std() / series.mean() < 0.5: # Low variance - use mean
fill_value = series.mean()
method = 'mean'
else: # High variance - use median
fill_value = series.median()
method = 'median'
filled_series = series.fillna(fill_value)
self.cleaning_report['changes'].append({
'type': 'missing_value_fill',
'column': col_name,
'method': method,
'fill_value': fill_value,
'count': missing_count
})
return filled_series
def _fill_text_missing(self, series: pd.Series, col_name: str, options: Dict) -> pd.Series:
"""Fill missing values in text columns"""
missing_count = series.isnull().sum()
if options.get('use_ai_for_missing', False) and missing_count <= 5:
# Use AI for small number of missing values
try:
filled_series, changes = self.ai_cleaner.suggest_missing_values(series, col_name)
self.cleaning_report['changes'].extend(changes)
return filled_series
except Exception as e:
logger.warning(f"AI filling failed for {col_name}, falling back to mode")
# Use mode (most frequent value)
mode_value = series.mode()
if len(mode_value) > 0:
fill_value = mode_value.iloc[0]
else:
fill_value = "Unknown"
filled_series = series.fillna(fill_value)
self.cleaning_report['changes'].append({
'type': 'missing_value_fill',
'column': col_name,
'method': 'mode',
'fill_value': fill_value,
'count': missing_count
})
return filled_series
def _fix_data_types(self, df: pd.DataFrame) -> pd.DataFrame:
"""Fix and optimize data types"""
cleaned_df = df.copy()
type_suggestions = self.data_processor.detect_data_types(df)
for col, suggested_type in type_suggestions.items():
current_type = str(df[col].dtype)
if current_type != suggested_type:
try:
if suggested_type == 'int64':
cleaned_df[col] = pd.to_numeric(df[col], errors='coerce').astype('Int64')
elif suggested_type == 'float64':
cleaned_df[col] = pd.to_numeric(df[col], errors='coerce')
elif suggested_type == 'datetime64[ns]':
cleaned_df[col] = pd.to_datetime(df[col], errors='coerce')
elif suggested_type == 'bool':
cleaned_df[col] = df[col].astype(str).str.lower().map({
'true': True, 'false': False, '1': True, '0': False,
'yes': True, 'no': False
})
self.cleaning_report['changes'].append({
'type': 'data_type_conversion',
'column': col,
'from_type': current_type,
'to_type': suggested_type
})
except Exception as e:
logger.warning(f"Could not convert column '{col}' to {suggested_type}: {str(e)}")
return cleaned_df
def _handle_outliers(self, df: pd.DataFrame) -> pd.DataFrame:
"""Handle outliers in numeric columns"""
cleaned_df = df.copy()
numeric_columns = df.select_dtypes(include=[np.number]).columns
for col in numeric_columns:
Q1 = df[col].quantile(0.25)
Q3 = df[col].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outlier_mask = (df[col] < lower_bound) | (df[col] > upper_bound)
outlier_count = outlier_mask.sum()
if outlier_count > 0:
# Cap outliers at bounds
cleaned_df.loc[df[col] < lower_bound, col] = lower_bound
cleaned_df.loc[df[col] > upper_bound, col] = upper_bound
self.cleaning_report['changes'].append({
'type': 'outlier_handling',
'column': col,
'outliers_capped': outlier_count,
'lower_bound': lower_bound,
'upper_bound': upper_bound
})
return cleaned_df
def _generate_summary_stats(self, original: Dict, cleaned: Dict) -> Dict:
"""Generate summary statistics comparing original and cleaned data"""
summary = {
'total_changes': len(self.cleaning_report['changes']),
'rows_before': original['shape'][0],
'rows_after': cleaned['shape'][0],
'columns': original['shape'][1],
'missing_values_before': sum(original['missing_values'].values()),
'missing_values_after': sum(cleaned['missing_values'].values()),
'duplicates_removed': original['duplicates']
}
summary['missing_reduction_percentage'] = (
(summary['missing_values_before'] - summary['missing_values_after']) /
max(summary['missing_values_before'], 1) * 100
)
return summary