Girinath11's picture
Upload 8 files
4aa0277 verified
"""
Data Cleaning Agent - Handles data preprocessing and cleaning
"""
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder
class DataCleaningAgent:
"""Agent responsible for data cleaning and preprocessing"""
def __init__(self):
self.cleaning_report = {}
self.label_encoders = {}
def clean_data(self, data, aggressive_cleaning=False):
"""
Comprehensive data cleaning
Args:
data: Input DataFrame
aggressive_cleaning: Whether to apply more aggressive cleaning
Returns:
Dictionary with cleaned data and cleaning report
"""
cleaned_data = data.copy()
report = {
'original_shape': data.shape,
'cleaning_steps': []
}
# Handle missing values
missing_info = self._handle_missing_values(cleaned_data)
report['missing_values'] = missing_info
report['cleaning_steps'].append('Missing values handled')
# Remove duplicates
duplicates_removed = self._remove_duplicates(cleaned_data)
report['duplicates_removed'] = duplicates_removed
if duplicates_removed > 0:
report['cleaning_steps'].append(f'Removed {duplicates_removed} duplicates')
# Handle outliers
if aggressive_cleaning:
outliers_info = self._handle_outliers(cleaned_data)
report['outliers'] = outliers_info
report['cleaning_steps'].append('Outliers handled')
# Data type optimization
type_changes = self._optimize_dtypes(cleaned_data)
report['type_changes'] = type_changes
if type_changes:
report['cleaning_steps'].append('Data types optimized')
# Handle infinite values
inf_handled = self._handle_infinite_values(cleaned_data)
if inf_handled:
report['cleaning_steps'].append('Infinite values handled')
report['final_shape'] = cleaned_data.shape
report['rows_removed'] = data.shape[0] - cleaned_data.shape[0]
return {
'status': 'success',
'data': cleaned_data,
'cleaning_report': report
}
def _handle_missing_values(self, data, strategy='smart'):
"""Handle missing values based on column type and distribution"""
missing_info = {}
for col in data.columns:
missing_count = data[col].isnull().sum()
if missing_count > 0:
missing_info[col] = {
'count': missing_count,
'percentage': (missing_count / len(data)) * 100
}
if data[col].dtype in ['object', 'string']:
# Fill with mode for categorical
mode_val = data[col].mode()
if len(mode_val) > 0:
data[col].fillna(mode_val[0], inplace=True)
missing_info[col]['strategy'] = f'filled_with_mode: {mode_val[0]}'
else:
data[col].fillna('Unknown', inplace=True)
missing_info[col]['strategy'] = 'filled_with_unknown'
else:
# For numerical columns, choose between mean/median based on skewness
skewness = abs(data[col].skew())
if skewness > 1: # Highly skewed, use median
fill_value = data[col].median()
data[col].fillna(fill_value, inplace=True)
missing_info[col]['strategy'] = f'filled_with_median: {fill_value}'
else: # Relatively normal, use mean
fill_value = data[col].mean()
data[col].fillna(fill_value, inplace=True)
missing_info[col]['strategy'] = f'filled_with_mean: {fill_value}'
return missing_info
def _remove_duplicates(self, data):
"""Remove duplicate rows"""
initial_count = len(data)
data.drop_duplicates(inplace=True)
data.reset_index(drop=True, inplace=True)
return initial_count - len(data)
def _handle_outliers(self, data, method='iqr'):
"""Handle outliers using IQR method"""
outlier_info = {}
for col in data.select_dtypes(include=[np.number]).columns:
Q1 = data[col].quantile(0.25)
Q3 = data[col].quantile(0.75)
IQR = Q3 - Q1
if IQR == 0: # Skip columns with no variance
continue
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outlier_mask = (data[col] < lower_bound) | (data[col] > upper_bound)
outlier_count = outlier_mask.sum()
if outlier_count > 0:
outlier_info[col] = {
'count': outlier_count,
'percentage': (outlier_count / len(data)) * 100,
'lower_bound': lower_bound,
'upper_bound': upper_bound
}
# Cap outliers instead of removing (more conservative)
data.loc[data[col] < lower_bound, col] = lower_bound
data.loc[data[col] > upper_bound, col] = upper_bound
return outlier_info
def _optimize_dtypes(self, data):
"""Optimize data types for memory efficiency"""
type_changes = {}
for col in data.columns:
original_type = str(data[col].dtype)
# Try to convert object columns to numeric
if data[col].dtype == 'object':
try:
# First try to convert to numeric
numeric_series = pd.to_numeric(data[col], errors='coerce')
if not numeric_series.isnull().all():
data[col] = numeric_series
type_changes[col] = f"{original_type} -> {data[col].dtype}"
continue
except:
pass
# Try to convert to datetime
try:
datetime_series = pd.to_datetime(data[col], errors='coerce')
if not datetime_series.isnull().all():
data[col] = datetime_series
type_changes[col] = f"{original_type} -> datetime64[ns]"
continue
except:
pass
# Optimize integer types
elif data[col].dtype in ['int64']:
if data[col].min() >= 0:
if data[col].max() <= 255:
data[col] = data[col].astype('uint8')
type_changes[col] = f"{original_type} -> uint8"
elif data[col].max() <= 65535:
data[col] = data[col].astype('uint16')
type_changes[col] = f"{original_type} -> uint16"
elif data[col].max() <= 4294967295:
data[col] = data[col].astype('uint32')
type_changes[col] = f"{original_type} -> uint32"
else:
if data[col].min() >= -128 and data[col].max() <= 127:
data[col] = data[col].astype('int8')
type_changes[col] = f"{original_type} -> int8"
elif data[col].min() >= -32768 and data[col].max() <= 32767:
data[col] = data[col].astype('int16')
type_changes[col] = f"{original_type} -> int16"
elif data[col].min() >= -2147483648 and data[col].max() <= 2147483647:
data[col] = data[col].astype('int32')
type_changes[col] = f"{original_type} -> int32"
# Optimize float types
elif data[col].dtype in ['float64']:
if data[col].min() >= np.finfo(np.float32).min and data[col].max() <= np.finfo(np.float32).max:
data[col] = data[col].astype('float32')
type_changes[col] = f"{original_type} -> float32"
return type_changes
def _handle_infinite_values(self, data):
"""Handle infinite values in the dataset"""
inf_cols = []
for col in data.select_dtypes(include=[np.number]).columns:
if np.isinf(data[col]).any():
inf_cols.append(col)
# Replace infinite values with NaN, then fill with column median
data[col] = data[col].replace([np.inf, -np.inf], np.nan)
data[col].fillna(data[col].median(), inplace=True)
return len(inf_cols) > 0
def get_data_quality_report(self, data):
"""Generate a comprehensive data quality report"""
report = {}
# Basic info
report['shape'] = data.shape
report['dtypes'] = data.dtypes.to_dict()
# Missing values
missing = data.isnull().sum()
report['missing_values'] = {
'total': missing.sum(),
'by_column': missing[missing > 0].to_dict(),
'percentage': (missing / len(data) * 100)[missing > 0].to_dict()
}
# Duplicates
report['duplicates'] = data.duplicated().sum()
# Unique values
report['unique_values'] = {col: data[col].nunique() for col in data.columns}
# Memory usage
report['memory_usage'] = {
'total_mb': data.memory_usage(deep=True).sum() / 1024**2,
'by_column': (data.memory_usage(deep=True) / 1024**2).to_dict()
}
return report