import pandas as pd import numpy as np from sklearn.preprocessing import StandardScaler, LabelEncoder from typing import Tuple, Dict, Optional import os import logging # Configure logging logging.basicConfig(level=logging.ERROR) logger = logging.getLogger(__name__) class DataPreprocessor: """Handles data preprocessing for credit card anomaly detection.""" def __init__(self): self.scaler = StandardScaler() self.label_encoders = {} self.user_stats = {} def load_data(self, filepath: str) -> pd.DataFrame: """Load data from multiple file formats (CSV, Excel, JSON, Parquet).""" try: file_ext = os.path.splitext(filepath)[1].lower() if file_ext == '.csv': df = pd.read_csv(filepath) elif file_ext in ['.xlsx', '.xls']: df = pd.read_excel(filepath) elif file_ext == '.json': df = pd.read_json(filepath) elif file_ext == '.parquet': df = pd.read_parquet(filepath) else: # Try CSV as fallback try: df = pd.read_csv(filepath) except: raise ValueError(f"Unsupported file format: {file_ext}") # Ensure dataframe is not empty if df.empty: raise ValueError("Loaded data is empty") return df except Exception as e: logger.error(f"Error loading data: {str(e)}") # Return empty dataframe with expected columns as fallback return self._create_empty_dataframe() def _create_empty_dataframe(self) -> pd.DataFrame: """Create an empty dataframe with expected columns as fallback.""" columns = ['Transaction ID', 'User ID', 'Amount', 'Timestamp', 'Merchant Category', 'Location'] return pd.DataFrame(columns=columns) def ensure_required_columns(self, df: pd.DataFrame) -> pd.DataFrame: """Ensure all required columns exist, auto-generate missing ones.""" df = df.copy() # Define required columns and their default generators required_columns = { 'Transaction ID': lambda: [f'TX{i:06d}' for i in range(len(df))], 'User ID': lambda: [f'USER{i%10+1:03d}' for i in range(len(df))], 'Amount': lambda: [0.0] * len(df), 'Timestamp': lambda: pd.date_range(start='2024-01-01', periods=len(df), freq='H'), 'Merchant Category': lambda: ['Unknown'] * len(df), 'Location': lambda: ['Unknown'] * len(df) } # Add missing columns with generated defaults for col, generator in required_columns.items(): if col not in df.columns: try: df[col] = generator() except Exception as e: logger.error(f"Error generating column {col}: {str(e)}") df[col] = 'Unknown' if col in ['Merchant Category', 'Location'] else 0 # Ensure numeric columns are numeric numeric_cols = ['Amount'] for col in numeric_cols: if col in df.columns: try: df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0) except: df[col] = 0 # Ensure timestamp is datetime if 'Timestamp' in df.columns: try: df['Timestamp'] = pd.to_datetime(df['Timestamp'], errors='coerce') df['Timestamp'] = df['Timestamp'].fillna(pd.Timestamp('2024-01-01')) except: df['Timestamp'] = pd.Timestamp('2024-01-01') return df def handle_missing_values(self, df: pd.DataFrame) -> pd.DataFrame: """Handle missing values in the dataset with safe operations.""" df = df.copy() try: # Fill missing numeric values with median numeric_cols = df.select_dtypes(include=[np.number]).columns for col in numeric_cols: median_val = df[col].median() if pd.isna(median_val): median_val = 0 df[col] = df[col].fillna(median_val) # Fill missing categorical values with mode categorical_cols = df.select_dtypes(include=['object', 'category']).columns for col in categorical_cols: mode_val = df[col].mode() if not mode_val.empty: df[col] = df[col].fillna(mode_val[0]) else: df[col] = df[col].fillna('Unknown') except Exception as e: logger.error(f"Error handling missing values: {str(e)}") # Fallback: fill all NaN with appropriate defaults for col in df.columns: if df[col].dtype in [np.number, 'int64', 'float64']: df[col] = df[col].fillna(0) else: df[col] = df[col].fillna('Unknown') return df def normalize_per_user(self, df: pd.DataFrame) -> pd.DataFrame: """Normalize spending amounts per user with error handling.""" df = df.copy() try: # Check if User ID column exists if 'User ID' not in df.columns: df['User ID'] = 'USER001' logger.warning("User ID column missing, using default") # Check if Amount column exists if 'Amount' not in df.columns: df['Amount'] = 0 logger.warning("Amount column missing, using default") # Calculate user statistics self.user_stats = df.groupby('User ID')['Amount'].agg(['mean', 'std', 'median']).to_dict('index') # Normalize amount relative to user average def normalize_row(row): user_id = row.get('User ID', 'USER001') amount = row.get('Amount', 0) if user_id in self.user_stats: mean = self.user_stats[user_id]['mean'] std = self.user_stats[user_id]['std'] return (amount - mean) / (std + 1e-8) if std > 0 else 0 else: return 0 df['Amount_Normalized'] = df.apply(normalize_row, axis=1) except Exception as e: logger.error(f"Error normalizing per user: {str(e)}") # Fallback: use simple z-score try: mean = df['Amount'].mean() std = df['Amount'].std() df['Amount_Normalized'] = (df['Amount'] - mean) / (std + 1e-8) if std > 0 else 0 except: df['Amount_Normalized'] = 0 return df def encode_categorical(self, df: pd.DataFrame) -> pd.DataFrame: """Encode categorical variables with error handling.""" df = df.copy() try: # Try to find merchant category column with various possible names category_col = None possible_names = ['Merchant Category', 'merchant_category', 'Merchant_Category', 'category', 'Category'] for name in possible_names: if name in df.columns: category_col = name break # If no category column found, use a default if category_col is None: if 'Merchant Category' not in df.columns: df['Merchant Category'] = 'Unknown' category_col = 'Merchant Category' # Encode the category column if category_col in df.columns: if category_col not in self.label_encoders: self.label_encoders[category_col] = LabelEncoder() df[category_col + '_Encoded'] = self.label_encoders[category_col].fit_transform(df[category_col].astype(str)) else: df[category_col + '_Encoded'] = self.label_encoders[category_col].transform(df[category_col].astype(str)) except Exception as e: logger.error(f"Error encoding categorical variables: {str(e)}") # Fallback: add a simple encoded column if 'Merchant Category_Encoded' not in df.columns: df['Merchant Category_Encoded'] = 0 return df def scale_features(self, df: pd.DataFrame, feature_cols: list) -> Tuple[pd.DataFrame, np.ndarray]: """Scale numerical features using StandardScaler with error handling.""" df = df.copy() try: # Filter to only columns that exist valid_cols = [col for col in feature_cols if col in df.columns] if not valid_cols: logger.warning("No valid columns to scale") return df, np.array([]) if len(df) > 0: # Ensure all columns are numeric for col in valid_cols: if df[col].dtype not in [np.number, 'int64', 'float64']: try: df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0) except: df[col] = 0 scaled_features = self.scaler.fit_transform(df[valid_cols]) for i, col in enumerate(valid_cols): df[col + '_Scaled'] = scaled_features[:, i] else: scaled_features = np.array([]) except Exception as e: logger.error(f"Error scaling features: {str(e)}") # Fallback: return original dataframe with empty scaled features scaled_features = np.array([]) return df, scaled_features def preprocess_pipeline(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, np.ndarray, list]: """Complete preprocessing pipeline with comprehensive error handling.""" try: # Ensure required columns exist df = self.ensure_required_columns(df) # Handle missing values df = self.handle_missing_values(df) # Normalize per user df = self.normalize_per_user(df) # Encode categorical variables df = self.encode_categorical(df) # Select feature columns for scaling - only include columns that exist feature_cols = [] if 'Amount_Normalized' in df.columns: feature_cols.append('Amount_Normalized') if 'Merchant_Category_Encoded' in df.columns: feature_cols.append('Merchant_Category_Encoded') # Add additional features if available if 'Amount' in df.columns: feature_cols.append('Amount') # If no features selected, use Amount as fallback if not feature_cols and 'Amount' in df.columns: feature_cols = ['Amount'] # Scale features only if we have valid columns if feature_cols: df, scaled_features = self.scale_features(df, feature_cols) else: scaled_features = np.array([]) return df, scaled_features, feature_cols except Exception as e: logger.error(f"Error in preprocessing pipeline: {str(e)}") # Fallback: return original dataframe with minimal processing try: df = self.ensure_required_columns(df) feature_cols = ['Amount'] df, scaled_features = self.scale_features(df, feature_cols) return df, scaled_features, feature_cols except: # Ultimate fallback return df, np.array([]), ['Amount'] def get_user_statistics(self, user_id: str) -> Dict: """Get statistics for a specific user.""" return self.user_stats.get(user_id, {})