Credit-Card-Anomaly / utils /preprocessing.py
Zayeemk's picture
Rename preprocessing.py to utils/preprocessing.py
9a8c220 verified
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, LabelEncoder
from typing import Tuple, Dict, Optional
import os
import logging
# Configure logging
logging.basicConfig(level=logging.ERROR)
logger = logging.getLogger(__name__)
class DataPreprocessor:
"""Handles data preprocessing for credit card anomaly detection."""
def __init__(self):
self.scaler = StandardScaler()
self.label_encoders = {}
self.user_stats = {}
def load_data(self, filepath: str) -> pd.DataFrame:
"""Load data from multiple file formats (CSV, Excel, JSON, Parquet)."""
try:
file_ext = os.path.splitext(filepath)[1].lower()
if file_ext == '.csv':
df = pd.read_csv(filepath)
elif file_ext in ['.xlsx', '.xls']:
df = pd.read_excel(filepath)
elif file_ext == '.json':
df = pd.read_json(filepath)
elif file_ext == '.parquet':
df = pd.read_parquet(filepath)
else:
# Try CSV as fallback
try:
df = pd.read_csv(filepath)
except:
raise ValueError(f"Unsupported file format: {file_ext}")
# Ensure dataframe is not empty
if df.empty:
raise ValueError("Loaded data is empty")
return df
except Exception as e:
logger.error(f"Error loading data: {str(e)}")
# Return empty dataframe with expected columns as fallback
return self._create_empty_dataframe()
def _create_empty_dataframe(self) -> pd.DataFrame:
"""Create an empty dataframe with expected columns as fallback."""
columns = ['Transaction ID', 'User ID', 'Amount', 'Timestamp', 'Merchant Category', 'Location']
return pd.DataFrame(columns=columns)
def ensure_required_columns(self, df: pd.DataFrame) -> pd.DataFrame:
"""Ensure all required columns exist, auto-generate missing ones."""
df = df.copy()
# Define required columns and their default generators
required_columns = {
'Transaction ID': lambda: [f'TX{i:06d}' for i in range(len(df))],
'User ID': lambda: [f'USER{i%10+1:03d}' for i in range(len(df))],
'Amount': lambda: [0.0] * len(df),
'Timestamp': lambda: pd.date_range(start='2024-01-01', periods=len(df), freq='H'),
'Merchant Category': lambda: ['Unknown'] * len(df),
'Location': lambda: ['Unknown'] * len(df)
}
# Add missing columns with generated defaults
for col, generator in required_columns.items():
if col not in df.columns:
try:
df[col] = generator()
except Exception as e:
logger.error(f"Error generating column {col}: {str(e)}")
df[col] = 'Unknown' if col in ['Merchant Category', 'Location'] else 0
# Ensure numeric columns are numeric
numeric_cols = ['Amount']
for col in numeric_cols:
if col in df.columns:
try:
df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0)
except:
df[col] = 0
# Ensure timestamp is datetime
if 'Timestamp' in df.columns:
try:
df['Timestamp'] = pd.to_datetime(df['Timestamp'], errors='coerce')
df['Timestamp'] = df['Timestamp'].fillna(pd.Timestamp('2024-01-01'))
except:
df['Timestamp'] = pd.Timestamp('2024-01-01')
return df
def handle_missing_values(self, df: pd.DataFrame) -> pd.DataFrame:
"""Handle missing values in the dataset with safe operations."""
df = df.copy()
try:
# Fill missing numeric values with median
numeric_cols = df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
median_val = df[col].median()
if pd.isna(median_val):
median_val = 0
df[col] = df[col].fillna(median_val)
# Fill missing categorical values with mode
categorical_cols = df.select_dtypes(include=['object', 'category']).columns
for col in categorical_cols:
mode_val = df[col].mode()
if not mode_val.empty:
df[col] = df[col].fillna(mode_val[0])
else:
df[col] = df[col].fillna('Unknown')
except Exception as e:
logger.error(f"Error handling missing values: {str(e)}")
# Fallback: fill all NaN with appropriate defaults
for col in df.columns:
if df[col].dtype in [np.number, 'int64', 'float64']:
df[col] = df[col].fillna(0)
else:
df[col] = df[col].fillna('Unknown')
return df
def normalize_per_user(self, df: pd.DataFrame) -> pd.DataFrame:
"""Normalize spending amounts per user with error handling."""
df = df.copy()
try:
# Check if User ID column exists
if 'User ID' not in df.columns:
df['User ID'] = 'USER001'
logger.warning("User ID column missing, using default")
# Check if Amount column exists
if 'Amount' not in df.columns:
df['Amount'] = 0
logger.warning("Amount column missing, using default")
# Calculate user statistics
self.user_stats = df.groupby('User ID')['Amount'].agg(['mean', 'std', 'median']).to_dict('index')
# Normalize amount relative to user average
def normalize_row(row):
user_id = row.get('User ID', 'USER001')
amount = row.get('Amount', 0)
if user_id in self.user_stats:
mean = self.user_stats[user_id]['mean']
std = self.user_stats[user_id]['std']
return (amount - mean) / (std + 1e-8) if std > 0 else 0
else:
return 0
df['Amount_Normalized'] = df.apply(normalize_row, axis=1)
except Exception as e:
logger.error(f"Error normalizing per user: {str(e)}")
# Fallback: use simple z-score
try:
mean = df['Amount'].mean()
std = df['Amount'].std()
df['Amount_Normalized'] = (df['Amount'] - mean) / (std + 1e-8) if std > 0 else 0
except:
df['Amount_Normalized'] = 0
return df
def encode_categorical(self, df: pd.DataFrame) -> pd.DataFrame:
"""Encode categorical variables with error handling."""
df = df.copy()
try:
# Try to find merchant category column with various possible names
category_col = None
possible_names = ['Merchant Category', 'merchant_category', 'Merchant_Category', 'category', 'Category']
for name in possible_names:
if name in df.columns:
category_col = name
break
# If no category column found, use a default
if category_col is None:
if 'Merchant Category' not in df.columns:
df['Merchant Category'] = 'Unknown'
category_col = 'Merchant Category'
# Encode the category column
if category_col in df.columns:
if category_col not in self.label_encoders:
self.label_encoders[category_col] = LabelEncoder()
df[category_col + '_Encoded'] = self.label_encoders[category_col].fit_transform(df[category_col].astype(str))
else:
df[category_col + '_Encoded'] = self.label_encoders[category_col].transform(df[category_col].astype(str))
except Exception as e:
logger.error(f"Error encoding categorical variables: {str(e)}")
# Fallback: add a simple encoded column
if 'Merchant Category_Encoded' not in df.columns:
df['Merchant Category_Encoded'] = 0
return df
def scale_features(self, df: pd.DataFrame, feature_cols: list) -> Tuple[pd.DataFrame, np.ndarray]:
"""Scale numerical features using StandardScaler with error handling."""
df = df.copy()
try:
# Filter to only columns that exist
valid_cols = [col for col in feature_cols if col in df.columns]
if not valid_cols:
logger.warning("No valid columns to scale")
return df, np.array([])
if len(df) > 0:
# Ensure all columns are numeric
for col in valid_cols:
if df[col].dtype not in [np.number, 'int64', 'float64']:
try:
df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0)
except:
df[col] = 0
scaled_features = self.scaler.fit_transform(df[valid_cols])
for i, col in enumerate(valid_cols):
df[col + '_Scaled'] = scaled_features[:, i]
else:
scaled_features = np.array([])
except Exception as e:
logger.error(f"Error scaling features: {str(e)}")
# Fallback: return original dataframe with empty scaled features
scaled_features = np.array([])
return df, scaled_features
def preprocess_pipeline(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, np.ndarray, list]:
"""Complete preprocessing pipeline with comprehensive error handling."""
try:
# Ensure required columns exist
df = self.ensure_required_columns(df)
# Handle missing values
df = self.handle_missing_values(df)
# Normalize per user
df = self.normalize_per_user(df)
# Encode categorical variables
df = self.encode_categorical(df)
# Select feature columns for scaling - only include columns that exist
feature_cols = []
if 'Amount_Normalized' in df.columns:
feature_cols.append('Amount_Normalized')
if 'Merchant_Category_Encoded' in df.columns:
feature_cols.append('Merchant_Category_Encoded')
# Add additional features if available
if 'Amount' in df.columns:
feature_cols.append('Amount')
# If no features selected, use Amount as fallback
if not feature_cols and 'Amount' in df.columns:
feature_cols = ['Amount']
# Scale features only if we have valid columns
if feature_cols:
df, scaled_features = self.scale_features(df, feature_cols)
else:
scaled_features = np.array([])
return df, scaled_features, feature_cols
except Exception as e:
logger.error(f"Error in preprocessing pipeline: {str(e)}")
# Fallback: return original dataframe with minimal processing
try:
df = self.ensure_required_columns(df)
feature_cols = ['Amount']
df, scaled_features = self.scale_features(df, feature_cols)
return df, scaled_features, feature_cols
except:
# Ultimate fallback
return df, np.array([]), ['Amount']
def get_user_statistics(self, user_id: str) -> Dict:
"""Get statistics for a specific user."""
return self.user_stats.get(user_id, {})