File size: 9,646 Bytes
4aa0277 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 |
"""
Data Cleaning Agent - Handles data preprocessing and cleaning
"""
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder
class DataCleaningAgent:
"""Agent responsible for data cleaning and preprocessing"""
def __init__(self):
self.cleaning_report = {}
self.label_encoders = {}
def clean_data(self, data, aggressive_cleaning=False):
"""
Comprehensive data cleaning
Args:
data: Input DataFrame
aggressive_cleaning: Whether to apply more aggressive cleaning
Returns:
Dictionary with cleaned data and cleaning report
"""
cleaned_data = data.copy()
report = {
'original_shape': data.shape,
'cleaning_steps': []
}
# Handle missing values
missing_info = self._handle_missing_values(cleaned_data)
report['missing_values'] = missing_info
report['cleaning_steps'].append('Missing values handled')
# Remove duplicates
duplicates_removed = self._remove_duplicates(cleaned_data)
report['duplicates_removed'] = duplicates_removed
if duplicates_removed > 0:
report['cleaning_steps'].append(f'Removed {duplicates_removed} duplicates')
# Handle outliers
if aggressive_cleaning:
outliers_info = self._handle_outliers(cleaned_data)
report['outliers'] = outliers_info
report['cleaning_steps'].append('Outliers handled')
# Data type optimization
type_changes = self._optimize_dtypes(cleaned_data)
report['type_changes'] = type_changes
if type_changes:
report['cleaning_steps'].append('Data types optimized')
# Handle infinite values
inf_handled = self._handle_infinite_values(cleaned_data)
if inf_handled:
report['cleaning_steps'].append('Infinite values handled')
report['final_shape'] = cleaned_data.shape
report['rows_removed'] = data.shape[0] - cleaned_data.shape[0]
return {
'status': 'success',
'data': cleaned_data,
'cleaning_report': report
}
def _handle_missing_values(self, data, strategy='smart'):
"""Handle missing values based on column type and distribution"""
missing_info = {}
for col in data.columns:
missing_count = data[col].isnull().sum()
if missing_count > 0:
missing_info[col] = {
'count': missing_count,
'percentage': (missing_count / len(data)) * 100
}
if data[col].dtype in ['object', 'string']:
# Fill with mode for categorical
mode_val = data[col].mode()
if len(mode_val) > 0:
data[col].fillna(mode_val[0], inplace=True)
missing_info[col]['strategy'] = f'filled_with_mode: {mode_val[0]}'
else:
data[col].fillna('Unknown', inplace=True)
missing_info[col]['strategy'] = 'filled_with_unknown'
else:
# For numerical columns, choose between mean/median based on skewness
skewness = abs(data[col].skew())
if skewness > 1: # Highly skewed, use median
fill_value = data[col].median()
data[col].fillna(fill_value, inplace=True)
missing_info[col]['strategy'] = f'filled_with_median: {fill_value}'
else: # Relatively normal, use mean
fill_value = data[col].mean()
data[col].fillna(fill_value, inplace=True)
missing_info[col]['strategy'] = f'filled_with_mean: {fill_value}'
return missing_info
def _remove_duplicates(self, data):
"""Remove duplicate rows"""
initial_count = len(data)
data.drop_duplicates(inplace=True)
data.reset_index(drop=True, inplace=True)
return initial_count - len(data)
def _handle_outliers(self, data, method='iqr'):
"""Handle outliers using IQR method"""
outlier_info = {}
for col in data.select_dtypes(include=[np.number]).columns:
Q1 = data[col].quantile(0.25)
Q3 = data[col].quantile(0.75)
IQR = Q3 - Q1
if IQR == 0: # Skip columns with no variance
continue
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outlier_mask = (data[col] < lower_bound) | (data[col] > upper_bound)
outlier_count = outlier_mask.sum()
if outlier_count > 0:
outlier_info[col] = {
'count': outlier_count,
'percentage': (outlier_count / len(data)) * 100,
'lower_bound': lower_bound,
'upper_bound': upper_bound
}
# Cap outliers instead of removing (more conservative)
data.loc[data[col] < lower_bound, col] = lower_bound
data.loc[data[col] > upper_bound, col] = upper_bound
return outlier_info
def _optimize_dtypes(self, data):
"""Optimize data types for memory efficiency"""
type_changes = {}
for col in data.columns:
original_type = str(data[col].dtype)
# Try to convert object columns to numeric
if data[col].dtype == 'object':
try:
# First try to convert to numeric
numeric_series = pd.to_numeric(data[col], errors='coerce')
if not numeric_series.isnull().all():
data[col] = numeric_series
type_changes[col] = f"{original_type} -> {data[col].dtype}"
continue
except:
pass
# Try to convert to datetime
try:
datetime_series = pd.to_datetime(data[col], errors='coerce')
if not datetime_series.isnull().all():
data[col] = datetime_series
type_changes[col] = f"{original_type} -> datetime64[ns]"
continue
except:
pass
# Optimize integer types
elif data[col].dtype in ['int64']:
if data[col].min() >= 0:
if data[col].max() <= 255:
data[col] = data[col].astype('uint8')
type_changes[col] = f"{original_type} -> uint8"
elif data[col].max() <= 65535:
data[col] = data[col].astype('uint16')
type_changes[col] = f"{original_type} -> uint16"
elif data[col].max() <= 4294967295:
data[col] = data[col].astype('uint32')
type_changes[col] = f"{original_type} -> uint32"
else:
if data[col].min() >= -128 and data[col].max() <= 127:
data[col] = data[col].astype('int8')
type_changes[col] = f"{original_type} -> int8"
elif data[col].min() >= -32768 and data[col].max() <= 32767:
data[col] = data[col].astype('int16')
type_changes[col] = f"{original_type} -> int16"
elif data[col].min() >= -2147483648 and data[col].max() <= 2147483647:
data[col] = data[col].astype('int32')
type_changes[col] = f"{original_type} -> int32"
# Optimize float types
elif data[col].dtype in ['float64']:
if data[col].min() >= np.finfo(np.float32).min and data[col].max() <= np.finfo(np.float32).max:
data[col] = data[col].astype('float32')
type_changes[col] = f"{original_type} -> float32"
return type_changes
def _handle_infinite_values(self, data):
"""Handle infinite values in the dataset"""
inf_cols = []
for col in data.select_dtypes(include=[np.number]).columns:
if np.isinf(data[col]).any():
inf_cols.append(col)
# Replace infinite values with NaN, then fill with column median
data[col] = data[col].replace([np.inf, -np.inf], np.nan)
data[col].fillna(data[col].median(), inplace=True)
return len(inf_cols) > 0
def get_data_quality_report(self, data):
"""Generate a comprehensive data quality report"""
report = {}
# Basic info
report['shape'] = data.shape
report['dtypes'] = data.dtypes.to_dict()
# Missing values
missing = data.isnull().sum()
report['missing_values'] = {
'total': missing.sum(),
'by_column': missing[missing > 0].to_dict(),
'percentage': (missing / len(data) * 100)[missing > 0].to_dict()
}
# Duplicates
report['duplicates'] = data.duplicated().sum()
# Unique values
report['unique_values'] = {col: data[col].nunique() for col in data.columns}
# Memory usage
report['memory_usage'] = {
'total_mb': data.memory_usage(deep=True).sum() / 1024**2,
'by_column': (data.memory_usage(deep=True) / 1024**2).to_dict()
}
return report
|