Spaces:
Sleeping
Sleeping
File size: 12,325 Bytes
b90f550 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 | import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, LabelEncoder
from typing import Tuple, Dict, Optional
import os
import logging
# Configure logging
logging.basicConfig(level=logging.ERROR)
logger = logging.getLogger(__name__)
class DataPreprocessor:
"""Handles data preprocessing for credit card anomaly detection."""
def __init__(self):
self.scaler = StandardScaler()
self.label_encoders = {}
self.user_stats = {}
def load_data(self, filepath: str) -> pd.DataFrame:
"""Load data from multiple file formats (CSV, Excel, JSON, Parquet)."""
try:
file_ext = os.path.splitext(filepath)[1].lower()
if file_ext == '.csv':
df = pd.read_csv(filepath)
elif file_ext in ['.xlsx', '.xls']:
df = pd.read_excel(filepath)
elif file_ext == '.json':
df = pd.read_json(filepath)
elif file_ext == '.parquet':
df = pd.read_parquet(filepath)
else:
# Try CSV as fallback
try:
df = pd.read_csv(filepath)
except:
raise ValueError(f"Unsupported file format: {file_ext}")
# Ensure dataframe is not empty
if df.empty:
raise ValueError("Loaded data is empty")
return df
except Exception as e:
logger.error(f"Error loading data: {str(e)}")
# Return empty dataframe with expected columns as fallback
return self._create_empty_dataframe()
def _create_empty_dataframe(self) -> pd.DataFrame:
"""Create an empty dataframe with expected columns as fallback."""
columns = ['Transaction ID', 'User ID', 'Amount', 'Timestamp', 'Merchant Category', 'Location']
return pd.DataFrame(columns=columns)
def ensure_required_columns(self, df: pd.DataFrame) -> pd.DataFrame:
"""Ensure all required columns exist, auto-generate missing ones."""
df = df.copy()
# Define required columns and their default generators
required_columns = {
'Transaction ID': lambda: [f'TX{i:06d}' for i in range(len(df))],
'User ID': lambda: [f'USER{i%10+1:03d}' for i in range(len(df))],
'Amount': lambda: [0.0] * len(df),
'Timestamp': lambda: pd.date_range(start='2024-01-01', periods=len(df), freq='H'),
'Merchant Category': lambda: ['Unknown'] * len(df),
'Location': lambda: ['Unknown'] * len(df)
}
# Add missing columns with generated defaults
for col, generator in required_columns.items():
if col not in df.columns:
try:
df[col] = generator()
except Exception as e:
logger.error(f"Error generating column {col}: {str(e)}")
df[col] = 'Unknown' if col in ['Merchant Category', 'Location'] else 0
# Ensure numeric columns are numeric
numeric_cols = ['Amount']
for col in numeric_cols:
if col in df.columns:
try:
df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0)
except:
df[col] = 0
# Ensure timestamp is datetime
if 'Timestamp' in df.columns:
try:
df['Timestamp'] = pd.to_datetime(df['Timestamp'], errors='coerce')
df['Timestamp'] = df['Timestamp'].fillna(pd.Timestamp('2024-01-01'))
except:
df['Timestamp'] = pd.Timestamp('2024-01-01')
return df
def handle_missing_values(self, df: pd.DataFrame) -> pd.DataFrame:
"""Handle missing values in the dataset with safe operations."""
df = df.copy()
try:
# Fill missing numeric values with median
numeric_cols = df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
median_val = df[col].median()
if pd.isna(median_val):
median_val = 0
df[col] = df[col].fillna(median_val)
# Fill missing categorical values with mode
categorical_cols = df.select_dtypes(include=['object', 'category']).columns
for col in categorical_cols:
mode_val = df[col].mode()
if not mode_val.empty:
df[col] = df[col].fillna(mode_val[0])
else:
df[col] = df[col].fillna('Unknown')
except Exception as e:
logger.error(f"Error handling missing values: {str(e)}")
# Fallback: fill all NaN with appropriate defaults
for col in df.columns:
if df[col].dtype in [np.number, 'int64', 'float64']:
df[col] = df[col].fillna(0)
else:
df[col] = df[col].fillna('Unknown')
return df
def normalize_per_user(self, df: pd.DataFrame) -> pd.DataFrame:
"""Normalize spending amounts per user with error handling."""
df = df.copy()
try:
# Check if User ID column exists
if 'User ID' not in df.columns:
df['User ID'] = 'USER001'
logger.warning("User ID column missing, using default")
# Check if Amount column exists
if 'Amount' not in df.columns:
df['Amount'] = 0
logger.warning("Amount column missing, using default")
# Calculate user statistics
self.user_stats = df.groupby('User ID')['Amount'].agg(['mean', 'std', 'median']).to_dict('index')
# Normalize amount relative to user average
def normalize_row(row):
user_id = row.get('User ID', 'USER001')
amount = row.get('Amount', 0)
if user_id in self.user_stats:
mean = self.user_stats[user_id]['mean']
std = self.user_stats[user_id]['std']
return (amount - mean) / (std + 1e-8) if std > 0 else 0
else:
return 0
df['Amount_Normalized'] = df.apply(normalize_row, axis=1)
except Exception as e:
logger.error(f"Error normalizing per user: {str(e)}")
# Fallback: use simple z-score
try:
mean = df['Amount'].mean()
std = df['Amount'].std()
df['Amount_Normalized'] = (df['Amount'] - mean) / (std + 1e-8) if std > 0 else 0
except:
df['Amount_Normalized'] = 0
return df
def encode_categorical(self, df: pd.DataFrame) -> pd.DataFrame:
"""Encode categorical variables with error handling."""
df = df.copy()
try:
# Try to find merchant category column with various possible names
category_col = None
possible_names = ['Merchant Category', 'merchant_category', 'Merchant_Category', 'category', 'Category']
for name in possible_names:
if name in df.columns:
category_col = name
break
# If no category column found, use a default
if category_col is None:
if 'Merchant Category' not in df.columns:
df['Merchant Category'] = 'Unknown'
category_col = 'Merchant Category'
# Encode the category column
if category_col in df.columns:
if category_col not in self.label_encoders:
self.label_encoders[category_col] = LabelEncoder()
df[category_col + '_Encoded'] = self.label_encoders[category_col].fit_transform(df[category_col].astype(str))
else:
df[category_col + '_Encoded'] = self.label_encoders[category_col].transform(df[category_col].astype(str))
except Exception as e:
logger.error(f"Error encoding categorical variables: {str(e)}")
# Fallback: add a simple encoded column
if 'Merchant Category_Encoded' not in df.columns:
df['Merchant Category_Encoded'] = 0
return df
def scale_features(self, df: pd.DataFrame, feature_cols: list) -> Tuple[pd.DataFrame, np.ndarray]:
"""Scale numerical features using StandardScaler with error handling."""
df = df.copy()
try:
# Filter to only columns that exist
valid_cols = [col for col in feature_cols if col in df.columns]
if not valid_cols:
logger.warning("No valid columns to scale")
return df, np.array([])
if len(df) > 0:
# Ensure all columns are numeric
for col in valid_cols:
if df[col].dtype not in [np.number, 'int64', 'float64']:
try:
df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0)
except:
df[col] = 0
scaled_features = self.scaler.fit_transform(df[valid_cols])
for i, col in enumerate(valid_cols):
df[col + '_Scaled'] = scaled_features[:, i]
else:
scaled_features = np.array([])
except Exception as e:
logger.error(f"Error scaling features: {str(e)}")
# Fallback: return original dataframe with empty scaled features
scaled_features = np.array([])
return df, scaled_features
def preprocess_pipeline(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, np.ndarray, list]:
"""Complete preprocessing pipeline with comprehensive error handling."""
try:
# Ensure required columns exist
df = self.ensure_required_columns(df)
# Handle missing values
df = self.handle_missing_values(df)
# Normalize per user
df = self.normalize_per_user(df)
# Encode categorical variables
df = self.encode_categorical(df)
# Select feature columns for scaling - only include columns that exist
feature_cols = []
if 'Amount_Normalized' in df.columns:
feature_cols.append('Amount_Normalized')
if 'Merchant_Category_Encoded' in df.columns:
feature_cols.append('Merchant_Category_Encoded')
# Add additional features if available
if 'Amount' in df.columns:
feature_cols.append('Amount')
# If no features selected, use Amount as fallback
if not feature_cols and 'Amount' in df.columns:
feature_cols = ['Amount']
# Scale features only if we have valid columns
if feature_cols:
df, scaled_features = self.scale_features(df, feature_cols)
else:
scaled_features = np.array([])
return df, scaled_features, feature_cols
except Exception as e:
logger.error(f"Error in preprocessing pipeline: {str(e)}")
# Fallback: return original dataframe with minimal processing
try:
df = self.ensure_required_columns(df)
feature_cols = ['Amount']
df, scaled_features = self.scale_features(df, feature_cols)
return df, scaled_features, feature_cols
except:
# Ultimate fallback
return df, np.array([]), ['Amount']
def get_user_statistics(self, user_id: str) -> Dict:
"""Get statistics for a specific user."""
return self.user_stats.get(user_id, {})
|