Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import numpy as np | |
| from sklearn.preprocessing import StandardScaler, LabelEncoder | |
| from typing import Tuple, Dict, Optional | |
| import os | |
| import logging | |
| # Configure logging | |
| logging.basicConfig(level=logging.ERROR) | |
| logger = logging.getLogger(__name__) | |
| class DataPreprocessor: | |
| """Handles data preprocessing for credit card anomaly detection.""" | |
| def __init__(self): | |
| self.scaler = StandardScaler() | |
| self.label_encoders = {} | |
| self.user_stats = {} | |
| def load_data(self, filepath: str) -> pd.DataFrame: | |
| """Load data from multiple file formats (CSV, Excel, JSON, Parquet).""" | |
| try: | |
| file_ext = os.path.splitext(filepath)[1].lower() | |
| if file_ext == '.csv': | |
| df = pd.read_csv(filepath) | |
| elif file_ext in ['.xlsx', '.xls']: | |
| df = pd.read_excel(filepath) | |
| elif file_ext == '.json': | |
| df = pd.read_json(filepath) | |
| elif file_ext == '.parquet': | |
| df = pd.read_parquet(filepath) | |
| else: | |
| # Try CSV as fallback | |
| try: | |
| df = pd.read_csv(filepath) | |
| except: | |
| raise ValueError(f"Unsupported file format: {file_ext}") | |
| # Ensure dataframe is not empty | |
| if df.empty: | |
| raise ValueError("Loaded data is empty") | |
| return df | |
| except Exception as e: | |
| logger.error(f"Error loading data: {str(e)}") | |
| # Return empty dataframe with expected columns as fallback | |
| return self._create_empty_dataframe() | |
| def _create_empty_dataframe(self) -> pd.DataFrame: | |
| """Create an empty dataframe with expected columns as fallback.""" | |
| columns = ['Transaction ID', 'User ID', 'Amount', 'Timestamp', 'Merchant Category', 'Location'] | |
| return pd.DataFrame(columns=columns) | |
| def ensure_required_columns(self, df: pd.DataFrame) -> pd.DataFrame: | |
| """Ensure all required columns exist, auto-generate missing ones.""" | |
| df = df.copy() | |
| # Define required columns and their default generators | |
| required_columns = { | |
| 'Transaction ID': lambda: [f'TX{i:06d}' for i in range(len(df))], | |
| 'User ID': lambda: [f'USER{i%10+1:03d}' for i in range(len(df))], | |
| 'Amount': lambda: [0.0] * len(df), | |
| 'Timestamp': lambda: pd.date_range(start='2024-01-01', periods=len(df), freq='H'), | |
| 'Merchant Category': lambda: ['Unknown'] * len(df), | |
| 'Location': lambda: ['Unknown'] * len(df) | |
| } | |
| # Add missing columns with generated defaults | |
| for col, generator in required_columns.items(): | |
| if col not in df.columns: | |
| try: | |
| df[col] = generator() | |
| except Exception as e: | |
| logger.error(f"Error generating column {col}: {str(e)}") | |
| df[col] = 'Unknown' if col in ['Merchant Category', 'Location'] else 0 | |
| # Ensure numeric columns are numeric | |
| numeric_cols = ['Amount'] | |
| for col in numeric_cols: | |
| if col in df.columns: | |
| try: | |
| df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0) | |
| except: | |
| df[col] = 0 | |
| # Ensure timestamp is datetime | |
| if 'Timestamp' in df.columns: | |
| try: | |
| df['Timestamp'] = pd.to_datetime(df['Timestamp'], errors='coerce') | |
| df['Timestamp'] = df['Timestamp'].fillna(pd.Timestamp('2024-01-01')) | |
| except: | |
| df['Timestamp'] = pd.Timestamp('2024-01-01') | |
| return df | |
| def handle_missing_values(self, df: pd.DataFrame) -> pd.DataFrame: | |
| """Handle missing values in the dataset with safe operations.""" | |
| df = df.copy() | |
| try: | |
| # Fill missing numeric values with median | |
| numeric_cols = df.select_dtypes(include=[np.number]).columns | |
| for col in numeric_cols: | |
| median_val = df[col].median() | |
| if pd.isna(median_val): | |
| median_val = 0 | |
| df[col] = df[col].fillna(median_val) | |
| # Fill missing categorical values with mode | |
| categorical_cols = df.select_dtypes(include=['object', 'category']).columns | |
| for col in categorical_cols: | |
| mode_val = df[col].mode() | |
| if not mode_val.empty: | |
| df[col] = df[col].fillna(mode_val[0]) | |
| else: | |
| df[col] = df[col].fillna('Unknown') | |
| except Exception as e: | |
| logger.error(f"Error handling missing values: {str(e)}") | |
| # Fallback: fill all NaN with appropriate defaults | |
| for col in df.columns: | |
| if df[col].dtype in [np.number, 'int64', 'float64']: | |
| df[col] = df[col].fillna(0) | |
| else: | |
| df[col] = df[col].fillna('Unknown') | |
| return df | |
| def normalize_per_user(self, df: pd.DataFrame) -> pd.DataFrame: | |
| """Normalize spending amounts per user with error handling.""" | |
| df = df.copy() | |
| try: | |
| # Check if User ID column exists | |
| if 'User ID' not in df.columns: | |
| df['User ID'] = 'USER001' | |
| logger.warning("User ID column missing, using default") | |
| # Check if Amount column exists | |
| if 'Amount' not in df.columns: | |
| df['Amount'] = 0 | |
| logger.warning("Amount column missing, using default") | |
| # Calculate user statistics | |
| self.user_stats = df.groupby('User ID')['Amount'].agg(['mean', 'std', 'median']).to_dict('index') | |
| # Normalize amount relative to user average | |
| def normalize_row(row): | |
| user_id = row.get('User ID', 'USER001') | |
| amount = row.get('Amount', 0) | |
| if user_id in self.user_stats: | |
| mean = self.user_stats[user_id]['mean'] | |
| std = self.user_stats[user_id]['std'] | |
| return (amount - mean) / (std + 1e-8) if std > 0 else 0 | |
| else: | |
| return 0 | |
| df['Amount_Normalized'] = df.apply(normalize_row, axis=1) | |
| except Exception as e: | |
| logger.error(f"Error normalizing per user: {str(e)}") | |
| # Fallback: use simple z-score | |
| try: | |
| mean = df['Amount'].mean() | |
| std = df['Amount'].std() | |
| df['Amount_Normalized'] = (df['Amount'] - mean) / (std + 1e-8) if std > 0 else 0 | |
| except: | |
| df['Amount_Normalized'] = 0 | |
| return df | |
| def encode_categorical(self, df: pd.DataFrame) -> pd.DataFrame: | |
| """Encode categorical variables with error handling.""" | |
| df = df.copy() | |
| try: | |
| # Try to find merchant category column with various possible names | |
| category_col = None | |
| possible_names = ['Merchant Category', 'merchant_category', 'Merchant_Category', 'category', 'Category'] | |
| for name in possible_names: | |
| if name in df.columns: | |
| category_col = name | |
| break | |
| # If no category column found, use a default | |
| if category_col is None: | |
| if 'Merchant Category' not in df.columns: | |
| df['Merchant Category'] = 'Unknown' | |
| category_col = 'Merchant Category' | |
| # Encode the category column | |
| if category_col in df.columns: | |
| if category_col not in self.label_encoders: | |
| self.label_encoders[category_col] = LabelEncoder() | |
| df[category_col + '_Encoded'] = self.label_encoders[category_col].fit_transform(df[category_col].astype(str)) | |
| else: | |
| df[category_col + '_Encoded'] = self.label_encoders[category_col].transform(df[category_col].astype(str)) | |
| except Exception as e: | |
| logger.error(f"Error encoding categorical variables: {str(e)}") | |
| # Fallback: add a simple encoded column | |
| if 'Merchant Category_Encoded' not in df.columns: | |
| df['Merchant Category_Encoded'] = 0 | |
| return df | |
| def scale_features(self, df: pd.DataFrame, feature_cols: list) -> Tuple[pd.DataFrame, np.ndarray]: | |
| """Scale numerical features using StandardScaler with error handling.""" | |
| df = df.copy() | |
| try: | |
| # Filter to only columns that exist | |
| valid_cols = [col for col in feature_cols if col in df.columns] | |
| if not valid_cols: | |
| logger.warning("No valid columns to scale") | |
| return df, np.array([]) | |
| if len(df) > 0: | |
| # Ensure all columns are numeric | |
| for col in valid_cols: | |
| if df[col].dtype not in [np.number, 'int64', 'float64']: | |
| try: | |
| df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0) | |
| except: | |
| df[col] = 0 | |
| scaled_features = self.scaler.fit_transform(df[valid_cols]) | |
| for i, col in enumerate(valid_cols): | |
| df[col + '_Scaled'] = scaled_features[:, i] | |
| else: | |
| scaled_features = np.array([]) | |
| except Exception as e: | |
| logger.error(f"Error scaling features: {str(e)}") | |
| # Fallback: return original dataframe with empty scaled features | |
| scaled_features = np.array([]) | |
| return df, scaled_features | |
| def preprocess_pipeline(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, np.ndarray, list]: | |
| """Complete preprocessing pipeline with comprehensive error handling.""" | |
| try: | |
| # Ensure required columns exist | |
| df = self.ensure_required_columns(df) | |
| # Handle missing values | |
| df = self.handle_missing_values(df) | |
| # Normalize per user | |
| df = self.normalize_per_user(df) | |
| # Encode categorical variables | |
| df = self.encode_categorical(df) | |
| # Select feature columns for scaling - only include columns that exist | |
| feature_cols = [] | |
| if 'Amount_Normalized' in df.columns: | |
| feature_cols.append('Amount_Normalized') | |
| if 'Merchant_Category_Encoded' in df.columns: | |
| feature_cols.append('Merchant_Category_Encoded') | |
| # Add additional features if available | |
| if 'Amount' in df.columns: | |
| feature_cols.append('Amount') | |
| # If no features selected, use Amount as fallback | |
| if not feature_cols and 'Amount' in df.columns: | |
| feature_cols = ['Amount'] | |
| # Scale features only if we have valid columns | |
| if feature_cols: | |
| df, scaled_features = self.scale_features(df, feature_cols) | |
| else: | |
| scaled_features = np.array([]) | |
| return df, scaled_features, feature_cols | |
| except Exception as e: | |
| logger.error(f"Error in preprocessing pipeline: {str(e)}") | |
| # Fallback: return original dataframe with minimal processing | |
| try: | |
| df = self.ensure_required_columns(df) | |
| feature_cols = ['Amount'] | |
| df, scaled_features = self.scale_features(df, feature_cols) | |
| return df, scaled_features, feature_cols | |
| except: | |
| # Ultimate fallback | |
| return df, np.array([]), ['Amount'] | |
| def get_user_statistics(self, user_id: str) -> Dict: | |
| """Get statistics for a specific user.""" | |
| return self.user_stats.get(user_id, {}) | |