|
|
""" |
|
|
Data Cleaning Agent - Handles data preprocessing and cleaning |
|
|
""" |
|
|
|
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
from sklearn.preprocessing import LabelEncoder |
|
|
|
|
|
|
|
|
class DataCleaningAgent: |
|
|
"""Agent responsible for data cleaning and preprocessing""" |
|
|
|
|
|
def __init__(self): |
|
|
self.cleaning_report = {} |
|
|
self.label_encoders = {} |
|
|
|
|
|
def clean_data(self, data, aggressive_cleaning=False): |
|
|
""" |
|
|
Comprehensive data cleaning |
|
|
|
|
|
Args: |
|
|
data: Input DataFrame |
|
|
aggressive_cleaning: Whether to apply more aggressive cleaning |
|
|
|
|
|
Returns: |
|
|
Dictionary with cleaned data and cleaning report |
|
|
""" |
|
|
cleaned_data = data.copy() |
|
|
report = { |
|
|
'original_shape': data.shape, |
|
|
'cleaning_steps': [] |
|
|
} |
|
|
|
|
|
|
|
|
missing_info = self._handle_missing_values(cleaned_data) |
|
|
report['missing_values'] = missing_info |
|
|
report['cleaning_steps'].append('Missing values handled') |
|
|
|
|
|
|
|
|
duplicates_removed = self._remove_duplicates(cleaned_data) |
|
|
report['duplicates_removed'] = duplicates_removed |
|
|
if duplicates_removed > 0: |
|
|
report['cleaning_steps'].append(f'Removed {duplicates_removed} duplicates') |
|
|
|
|
|
|
|
|
if aggressive_cleaning: |
|
|
outliers_info = self._handle_outliers(cleaned_data) |
|
|
report['outliers'] = outliers_info |
|
|
report['cleaning_steps'].append('Outliers handled') |
|
|
|
|
|
|
|
|
type_changes = self._optimize_dtypes(cleaned_data) |
|
|
report['type_changes'] = type_changes |
|
|
if type_changes: |
|
|
report['cleaning_steps'].append('Data types optimized') |
|
|
|
|
|
|
|
|
inf_handled = self._handle_infinite_values(cleaned_data) |
|
|
if inf_handled: |
|
|
report['cleaning_steps'].append('Infinite values handled') |
|
|
|
|
|
report['final_shape'] = cleaned_data.shape |
|
|
report['rows_removed'] = data.shape[0] - cleaned_data.shape[0] |
|
|
|
|
|
return { |
|
|
'status': 'success', |
|
|
'data': cleaned_data, |
|
|
'cleaning_report': report |
|
|
} |
|
|
|
|
|
def _handle_missing_values(self, data, strategy='smart'): |
|
|
"""Handle missing values based on column type and distribution""" |
|
|
missing_info = {} |
|
|
|
|
|
for col in data.columns: |
|
|
missing_count = data[col].isnull().sum() |
|
|
if missing_count > 0: |
|
|
missing_info[col] = { |
|
|
'count': missing_count, |
|
|
'percentage': (missing_count / len(data)) * 100 |
|
|
} |
|
|
|
|
|
if data[col].dtype in ['object', 'string']: |
|
|
|
|
|
mode_val = data[col].mode() |
|
|
if len(mode_val) > 0: |
|
|
data[col].fillna(mode_val[0], inplace=True) |
|
|
missing_info[col]['strategy'] = f'filled_with_mode: {mode_val[0]}' |
|
|
else: |
|
|
data[col].fillna('Unknown', inplace=True) |
|
|
missing_info[col]['strategy'] = 'filled_with_unknown' |
|
|
else: |
|
|
|
|
|
skewness = abs(data[col].skew()) |
|
|
if skewness > 1: |
|
|
fill_value = data[col].median() |
|
|
data[col].fillna(fill_value, inplace=True) |
|
|
missing_info[col]['strategy'] = f'filled_with_median: {fill_value}' |
|
|
else: |
|
|
fill_value = data[col].mean() |
|
|
data[col].fillna(fill_value, inplace=True) |
|
|
missing_info[col]['strategy'] = f'filled_with_mean: {fill_value}' |
|
|
|
|
|
return missing_info |
|
|
|
|
|
def _remove_duplicates(self, data): |
|
|
"""Remove duplicate rows""" |
|
|
initial_count = len(data) |
|
|
data.drop_duplicates(inplace=True) |
|
|
data.reset_index(drop=True, inplace=True) |
|
|
return initial_count - len(data) |
|
|
|
|
|
def _handle_outliers(self, data, method='iqr'): |
|
|
"""Handle outliers using IQR method""" |
|
|
outlier_info = {} |
|
|
|
|
|
for col in data.select_dtypes(include=[np.number]).columns: |
|
|
Q1 = data[col].quantile(0.25) |
|
|
Q3 = data[col].quantile(0.75) |
|
|
IQR = Q3 - Q1 |
|
|
|
|
|
if IQR == 0: |
|
|
continue |
|
|
|
|
|
lower_bound = Q1 - 1.5 * IQR |
|
|
upper_bound = Q3 + 1.5 * IQR |
|
|
|
|
|
outlier_mask = (data[col] < lower_bound) | (data[col] > upper_bound) |
|
|
outlier_count = outlier_mask.sum() |
|
|
|
|
|
if outlier_count > 0: |
|
|
outlier_info[col] = { |
|
|
'count': outlier_count, |
|
|
'percentage': (outlier_count / len(data)) * 100, |
|
|
'lower_bound': lower_bound, |
|
|
'upper_bound': upper_bound |
|
|
} |
|
|
|
|
|
|
|
|
data.loc[data[col] < lower_bound, col] = lower_bound |
|
|
data.loc[data[col] > upper_bound, col] = upper_bound |
|
|
|
|
|
return outlier_info |
|
|
|
|
|
def _optimize_dtypes(self, data): |
|
|
"""Optimize data types for memory efficiency""" |
|
|
type_changes = {} |
|
|
|
|
|
for col in data.columns: |
|
|
original_type = str(data[col].dtype) |
|
|
|
|
|
|
|
|
if data[col].dtype == 'object': |
|
|
try: |
|
|
|
|
|
numeric_series = pd.to_numeric(data[col], errors='coerce') |
|
|
if not numeric_series.isnull().all(): |
|
|
data[col] = numeric_series |
|
|
type_changes[col] = f"{original_type} -> {data[col].dtype}" |
|
|
continue |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
try: |
|
|
datetime_series = pd.to_datetime(data[col], errors='coerce') |
|
|
if not datetime_series.isnull().all(): |
|
|
data[col] = datetime_series |
|
|
type_changes[col] = f"{original_type} -> datetime64[ns]" |
|
|
continue |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
elif data[col].dtype in ['int64']: |
|
|
if data[col].min() >= 0: |
|
|
if data[col].max() <= 255: |
|
|
data[col] = data[col].astype('uint8') |
|
|
type_changes[col] = f"{original_type} -> uint8" |
|
|
elif data[col].max() <= 65535: |
|
|
data[col] = data[col].astype('uint16') |
|
|
type_changes[col] = f"{original_type} -> uint16" |
|
|
elif data[col].max() <= 4294967295: |
|
|
data[col] = data[col].astype('uint32') |
|
|
type_changes[col] = f"{original_type} -> uint32" |
|
|
else: |
|
|
if data[col].min() >= -128 and data[col].max() <= 127: |
|
|
data[col] = data[col].astype('int8') |
|
|
type_changes[col] = f"{original_type} -> int8" |
|
|
elif data[col].min() >= -32768 and data[col].max() <= 32767: |
|
|
data[col] = data[col].astype('int16') |
|
|
type_changes[col] = f"{original_type} -> int16" |
|
|
elif data[col].min() >= -2147483648 and data[col].max() <= 2147483647: |
|
|
data[col] = data[col].astype('int32') |
|
|
type_changes[col] = f"{original_type} -> int32" |
|
|
|
|
|
|
|
|
elif data[col].dtype in ['float64']: |
|
|
if data[col].min() >= np.finfo(np.float32).min and data[col].max() <= np.finfo(np.float32).max: |
|
|
data[col] = data[col].astype('float32') |
|
|
type_changes[col] = f"{original_type} -> float32" |
|
|
|
|
|
return type_changes |
|
|
|
|
|
def _handle_infinite_values(self, data): |
|
|
"""Handle infinite values in the dataset""" |
|
|
inf_cols = [] |
|
|
for col in data.select_dtypes(include=[np.number]).columns: |
|
|
if np.isinf(data[col]).any(): |
|
|
inf_cols.append(col) |
|
|
|
|
|
data[col] = data[col].replace([np.inf, -np.inf], np.nan) |
|
|
data[col].fillna(data[col].median(), inplace=True) |
|
|
|
|
|
return len(inf_cols) > 0 |
|
|
|
|
|
def get_data_quality_report(self, data): |
|
|
"""Generate a comprehensive data quality report""" |
|
|
report = {} |
|
|
|
|
|
|
|
|
report['shape'] = data.shape |
|
|
report['dtypes'] = data.dtypes.to_dict() |
|
|
|
|
|
|
|
|
missing = data.isnull().sum() |
|
|
report['missing_values'] = { |
|
|
'total': missing.sum(), |
|
|
'by_column': missing[missing > 0].to_dict(), |
|
|
'percentage': (missing / len(data) * 100)[missing > 0].to_dict() |
|
|
} |
|
|
|
|
|
|
|
|
report['duplicates'] = data.duplicated().sum() |
|
|
|
|
|
|
|
|
report['unique_values'] = {col: data[col].nunique() for col in data.columns} |
|
|
|
|
|
|
|
|
report['memory_usage'] = { |
|
|
'total_mb': data.memory_usage(deep=True).sum() / 1024**2, |
|
|
'by_column': (data.memory_usage(deep=True) / 1024**2).to_dict() |
|
|
} |
|
|
|
|
|
return report |
|
|
|