| | import pandas as pd |
| | import numpy as np |
| | from typing import Dict, List, Any, Tuple, Optional |
| | from data_processor import DataProcessor |
| | from ai_cleaner import AICleaner |
| | from utils.logger import setup_logger |
| | from config import Config |
| |
|
| | logger = setup_logger(__name__) |
| |
|
| | class CleaningEngine: |
| | """Main engine for orchestrating data cleaning operations""" |
| | |
| | def __init__(self): |
| | """Initialize cleaning engine with required components""" |
| | self.data_processor = DataProcessor() |
| | self.ai_cleaner = AICleaner() |
| | self.cleaning_report = { |
| | 'changes': [], |
| | 'statistics': {}, |
| | 'errors': [] |
| | } |
| | |
| | def clean_dataset(self, df: pd.DataFrame, cleaning_options: Dict[str, bool]) -> Tuple[pd.DataFrame, Dict]: |
| | """ |
| | Main method to clean the entire dataset |
| | |
| | Args: |
| | df: Original pandas DataFrame |
| | cleaning_options: Dictionary of cleaning options to apply |
| | |
| | Returns: |
| | Tuple of (cleaned_dataframe, cleaning_report) |
| | """ |
| | logger.info(f"Starting data cleaning process for dataset with shape {df.shape}") |
| | |
| | |
| | cleaned_df = df.copy() |
| | self.cleaning_report = {'changes': [], 'statistics': {}, 'errors': []} |
| | |
| | |
| | original_analysis = self.data_processor.analyze_data_quality(df) |
| | self.cleaning_report['statistics']['original'] = original_analysis |
| | |
| | try: |
| | |
| | if cleaning_options.get('remove_duplicates', True): |
| | cleaned_df, duplicate_changes = self._remove_duplicates(cleaned_df) |
| | self.cleaning_report['changes'].extend(duplicate_changes) |
| | |
| | |
| | if cleaning_options.get('ai_text_cleaning', True): |
| | cleaned_df = self._clean_text_columns(cleaned_df) |
| | |
| | |
| | if cleaning_options.get('fill_missing_values', True): |
| | cleaned_df = self._handle_missing_values(cleaned_df, cleaning_options) |
| | |
| | |
| | if cleaning_options.get('fix_data_types', True): |
| | cleaned_df = self._fix_data_types(cleaned_df) |
| | |
| | |
| | if cleaning_options.get('handle_outliers', False): |
| | cleaned_df = self._handle_outliers(cleaned_df) |
| | |
| | |
| | final_analysis = self.data_processor.analyze_data_quality(cleaned_df) |
| | self.cleaning_report['statistics']['cleaned'] = final_analysis |
| | self.cleaning_report['statistics']['summary'] = self._generate_summary_stats( |
| | original_analysis, final_analysis |
| | ) |
| | |
| | logger.info(f"Data cleaning completed. Applied {len(self.cleaning_report['changes'])} changes") |
| | |
| | except Exception as e: |
| | error_msg = f"Error during data cleaning: {str(e)}" |
| | logger.error(error_msg) |
| | self.cleaning_report['errors'].append(error_msg) |
| | |
| | return cleaned_df, self.cleaning_report |
| | |
| | def _remove_duplicates(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, List[Dict]]: |
| | """Remove duplicate rows from dataframe""" |
| | changes = [] |
| | original_count = len(df) |
| | |
| | |
| | duplicate_mask = df.duplicated() |
| | duplicate_count = duplicate_mask.sum() |
| | |
| | if duplicate_count > 0: |
| | |
| | cleaned_df = df.drop_duplicates() |
| | changes.append({ |
| | 'type': 'duplicate_removal', |
| | 'original_rows': original_count, |
| | 'removed_rows': duplicate_count, |
| | 'final_rows': len(cleaned_df) |
| | }) |
| | logger.info(f"Removed {duplicate_count} duplicate rows") |
| | return cleaned_df, changes |
| | |
| | return df, changes |
| | |
| | def _clean_text_columns(self, df: pd.DataFrame) -> pd.DataFrame: |
| | """Clean text columns using AI""" |
| | cleaned_df = df.copy() |
| | |
| | |
| | text_columns = [col for col in df.columns if df[col].dtype == 'object'] |
| | |
| | for col in text_columns: |
| | |
| | if df[col].nunique() > Config.MAX_ROWS_FOR_AI_PROCESSING: |
| | logger.info(f"Skipping AI cleaning for column '{col}' - too many unique values") |
| | continue |
| | |
| | try: |
| | cleaned_series, changes = self.ai_cleaner.clean_text_column(df[col], col) |
| | cleaned_df[col] = cleaned_series |
| | self.cleaning_report['changes'].extend(changes) |
| | |
| | except Exception as e: |
| | error_msg = f"Error cleaning text column '{col}': {str(e)}" |
| | logger.error(error_msg) |
| | self.cleaning_report['errors'].append(error_msg) |
| | |
| | return cleaned_df |
| | |
| | def _handle_missing_values(self, df: pd.DataFrame, options: Dict[str, bool]) -> pd.DataFrame: |
| | """Handle missing values using various strategies""" |
| | cleaned_df = df.copy() |
| | |
| | for col in df.columns: |
| | if df[col].isnull().sum() == 0: |
| | continue |
| | |
| | try: |
| | if df[col].dtype in ['int64', 'float64']: |
| | |
| | cleaned_df[col] = self._fill_numeric_missing(df[col], col, options) |
| | elif df[col].dtype == 'object': |
| | |
| | cleaned_df[col] = self._fill_text_missing(df[col], col, options) |
| | else: |
| | |
| | cleaned_df[col] = df[col].fillna(method='ffill') |
| | |
| | except Exception as e: |
| | error_msg = f"Error handling missing values in column '{col}': {str(e)}" |
| | logger.error(error_msg) |
| | self.cleaning_report['errors'].append(error_msg) |
| | |
| | return cleaned_df |
| | |
| | def _fill_numeric_missing(self, series: pd.Series, col_name: str, options: Dict) -> pd.Series: |
| | """Fill missing values in numeric columns""" |
| | missing_count = series.isnull().sum() |
| | |
| | if options.get('use_ai_for_missing', False) and missing_count <= 10: |
| | |
| | try: |
| | filled_series, changes = self.ai_cleaner.suggest_missing_values(series, col_name) |
| | self.cleaning_report['changes'].extend(changes) |
| | return filled_series |
| | except Exception as e: |
| | logger.warning(f"AI filling failed for {col_name}, falling back to statistical method") |
| | |
| | |
| | if series.std() / series.mean() < 0.5: |
| | fill_value = series.mean() |
| | method = 'mean' |
| | else: |
| | fill_value = series.median() |
| | method = 'median' |
| | |
| | filled_series = series.fillna(fill_value) |
| | |
| | self.cleaning_report['changes'].append({ |
| | 'type': 'missing_value_fill', |
| | 'column': col_name, |
| | 'method': method, |
| | 'fill_value': fill_value, |
| | 'count': missing_count |
| | }) |
| | |
| | return filled_series |
| | |
| | def _fill_text_missing(self, series: pd.Series, col_name: str, options: Dict) -> pd.Series: |
| | """Fill missing values in text columns""" |
| | missing_count = series.isnull().sum() |
| | |
| | if options.get('use_ai_for_missing', False) and missing_count <= 5: |
| | |
| | try: |
| | filled_series, changes = self.ai_cleaner.suggest_missing_values(series, col_name) |
| | self.cleaning_report['changes'].extend(changes) |
| | return filled_series |
| | except Exception as e: |
| | logger.warning(f"AI filling failed for {col_name}, falling back to mode") |
| | |
| | |
| | mode_value = series.mode() |
| | if len(mode_value) > 0: |
| | fill_value = mode_value.iloc[0] |
| | else: |
| | fill_value = "Unknown" |
| | |
| | filled_series = series.fillna(fill_value) |
| | |
| | self.cleaning_report['changes'].append({ |
| | 'type': 'missing_value_fill', |
| | 'column': col_name, |
| | 'method': 'mode', |
| | 'fill_value': fill_value, |
| | 'count': missing_count |
| | }) |
| | |
| | return filled_series |
| | |
| | def _fix_data_types(self, df: pd.DataFrame) -> pd.DataFrame: |
| | """Fix and optimize data types""" |
| | cleaned_df = df.copy() |
| | type_suggestions = self.data_processor.detect_data_types(df) |
| | |
| | for col, suggested_type in type_suggestions.items(): |
| | current_type = str(df[col].dtype) |
| | |
| | if current_type != suggested_type: |
| | try: |
| | if suggested_type == 'int64': |
| | cleaned_df[col] = pd.to_numeric(df[col], errors='coerce').astype('Int64') |
| | elif suggested_type == 'float64': |
| | cleaned_df[col] = pd.to_numeric(df[col], errors='coerce') |
| | elif suggested_type == 'datetime64[ns]': |
| | cleaned_df[col] = pd.to_datetime(df[col], errors='coerce') |
| | elif suggested_type == 'bool': |
| | cleaned_df[col] = df[col].astype(str).str.lower().map({ |
| | 'true': True, 'false': False, '1': True, '0': False, |
| | 'yes': True, 'no': False |
| | }) |
| | |
| | self.cleaning_report['changes'].append({ |
| | 'type': 'data_type_conversion', |
| | 'column': col, |
| | 'from_type': current_type, |
| | 'to_type': suggested_type |
| | }) |
| | |
| | except Exception as e: |
| | logger.warning(f"Could not convert column '{col}' to {suggested_type}: {str(e)}") |
| | |
| | return cleaned_df |
| | |
| | def _handle_outliers(self, df: pd.DataFrame) -> pd.DataFrame: |
| | """Handle outliers in numeric columns""" |
| | cleaned_df = df.copy() |
| | |
| | numeric_columns = df.select_dtypes(include=[np.number]).columns |
| | |
| | for col in numeric_columns: |
| | Q1 = df[col].quantile(0.25) |
| | Q3 = df[col].quantile(0.75) |
| | IQR = Q3 - Q1 |
| | lower_bound = Q1 - 1.5 * IQR |
| | upper_bound = Q3 + 1.5 * IQR |
| | |
| | outlier_mask = (df[col] < lower_bound) | (df[col] > upper_bound) |
| | outlier_count = outlier_mask.sum() |
| | |
| | if outlier_count > 0: |
| | |
| | cleaned_df.loc[df[col] < lower_bound, col] = lower_bound |
| | cleaned_df.loc[df[col] > upper_bound, col] = upper_bound |
| | |
| | self.cleaning_report['changes'].append({ |
| | 'type': 'outlier_handling', |
| | 'column': col, |
| | 'outliers_capped': outlier_count, |
| | 'lower_bound': lower_bound, |
| | 'upper_bound': upper_bound |
| | }) |
| | |
| | return cleaned_df |
| | |
| | def _generate_summary_stats(self, original: Dict, cleaned: Dict) -> Dict: |
| | """Generate summary statistics comparing original and cleaned data""" |
| | summary = { |
| | 'total_changes': len(self.cleaning_report['changes']), |
| | 'rows_before': original['shape'][0], |
| | 'rows_after': cleaned['shape'][0], |
| | 'columns': original['shape'][1], |
| | 'missing_values_before': sum(original['missing_values'].values()), |
| | 'missing_values_after': sum(cleaned['missing_values'].values()), |
| | 'duplicates_removed': original['duplicates'] |
| | } |
| | |
| | summary['missing_reduction_percentage'] = ( |
| | (summary['missing_values_before'] - summary['missing_values_after']) / |
| | max(summary['missing_values_before'], 1) * 100 |
| | ) |
| | |
| | return summary |
| |
|