Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import numpy as np | |
| from datetime import datetime | |
| from typing import Dict, List | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| class FeatureEngineer: | |
| """Handles feature engineering for credit card anomaly detection.""" | |
| def __init__(self): | |
| self.user_profiles = {} | |
| def extract_time_features(self, df: pd.DataFrame) -> pd.DataFrame: | |
| """Extract time-based features from timestamp with error handling.""" | |
| df = df.copy() | |
| try: | |
| # Try to find timestamp column with various possible names | |
| timestamp_col = None | |
| possible_names = ['Timestamp', 'timestamp', 'Time', 'time', 'Date', 'date', 'datetime'] | |
| for name in possible_names: | |
| if name in df.columns: | |
| timestamp_col = name | |
| break | |
| # If no timestamp column found, create a default one | |
| if timestamp_col is None: | |
| logger.warning("No timestamp column found, creating default") | |
| df['Timestamp'] = pd.date_range(start='2024-01-01', periods=len(df), freq='H') | |
| timestamp_col = 'Timestamp' | |
| # Convert timestamp to datetime if it's not already | |
| if timestamp_col in df.columns: | |
| df['Timestamp'] = pd.to_datetime(df[timestamp_col], errors='coerce') | |
| df['Timestamp'] = df['Timestamp'].fillna(pd.Timestamp('2024-01-01')) | |
| # Extract time features | |
| df['Hour'] = df['Timestamp'].dt.hour | |
| df['DayOfWeek'] = df['Timestamp'].dt.dayofweek | |
| df['DayOfMonth'] = df['Timestamp'].dt.day | |
| df['Month'] = df['Timestamp'].dt.month | |
| df['IsWeekend'] = (df['DayOfWeek'] >= 5).astype(int) | |
| df['IsNight'] = ((df['Hour'] >= 22) | (df['Hour'] <= 5)).astype(int) | |
| except Exception as e: | |
| logger.error(f"Error extracting time features: {str(e)}") | |
| # Fallback: add default time features | |
| df['Hour'] = 12 | |
| df['DayOfWeek'] = 0 | |
| df['DayOfMonth'] = 1 | |
| df['Month'] = 1 | |
| df['IsWeekend'] = 0 | |
| df['IsNight'] = 0 | |
| return df | |
| def calculate_transaction_frequency(self, df: pd.DataFrame, window_hours: int = 24) -> pd.DataFrame: | |
| """Calculate transaction frequency for each user with error handling.""" | |
| df = df.copy() | |
| try: | |
| # Ensure required columns exist | |
| if 'User ID' not in df.columns: | |
| df['User ID'] = 'USER001' | |
| if 'Timestamp' not in df.columns: | |
| df['Timestamp'] = pd.Timestamp('2024-01-01') | |
| df = df.sort_values('Timestamp') | |
| # Calculate time since last transaction for each user | |
| df['TimeSinceLastTx'] = df.groupby('User ID')['Timestamp'].diff().dt.total_seconds() / 3600 | |
| df['TimeSinceLastTx'] = df['TimeSinceLastTx'].fillna(999) # Large value for first transaction | |
| # Simple transaction count per user | |
| df['TxCount_Window'] = df.groupby('User ID').cumcount() + 1 | |
| except Exception as e: | |
| logger.error(f"Error calculating transaction frequency: {str(e)}") | |
| # Fallback: add default values | |
| df['TimeSinceLastTx'] = 999 | |
| df['TxCount_Window'] = 1 | |
| return df | |
| def calculate_user_statistics(self, df: pd.DataFrame) -> pd.DataFrame: | |
| """Calculate user-level statistics with error handling.""" | |
| df = df.copy() | |
| try: | |
| # Ensure required columns exist | |
| if 'User ID' not in df.columns: | |
| df['User ID'] = 'USER001' | |
| if 'Amount' not in df.columns: | |
| df['Amount'] = 0 | |
| # Calculate per-user statistics | |
| agg_dict = {'Amount': ['mean', 'std', 'median', 'min', 'max', 'count']} | |
| # Only add merchant category if it exists | |
| if 'Merchant Category' in df.columns: | |
| agg_dict['Merchant Category'] = 'nunique' | |
| user_stats = df.groupby('User ID').agg(agg_dict).round(2) | |
| user_stats.columns = ['_'.join(col).strip() for col in user_stats.columns.values] | |
| # Merge back to original dataframe | |
| self.user_profiles = user_stats.to_dict('index') | |
| df = df.merge(user_stats, left_on='User ID', right_index=True, how='left') | |
| except Exception as e: | |
| logger.error(f"Error calculating user statistics: {str(e)}") | |
| # Fallback: add default statistics columns | |
| df['Amount_mean'] = df['Amount'].mean() if 'Amount' in df.columns else 0 | |
| df['Amount_std'] = 0 | |
| df['Amount_median'] = df['Amount'].median() if 'Amount' in df.columns else 0 | |
| df['Amount_min'] = df['Amount'].min() if 'Amount' in df.columns else 0 | |
| df['Amount_max'] = df['Amount'].max() if 'Amount' in df.columns else 0 | |
| df['Amount_count'] = len(df) | |
| if 'Merchant Category' in df.columns: | |
| df['Merchant Category_nunique'] = df['Merchant Category'].nunique() | |
| return df | |
| def calculate_amount_features(self, df: pd.DataFrame) -> pd.DataFrame: | |
| """Calculate amount-related features with error handling.""" | |
| df = df.copy() | |
| try: | |
| # Ensure Amount column exists | |
| if 'Amount' not in df.columns: | |
| df['Amount'] = 0 | |
| # Ratio to user mean | |
| if 'Amount_mean' in df.columns: | |
| df['AmountRatio_Mean'] = df['Amount'] / (df['Amount_mean'] + 1e-8) | |
| else: | |
| df['AmountRatio_Mean'] = 1.0 | |
| # Ratio to user median | |
| if 'Amount_median' in df.columns: | |
| df['AmountRatio_Median'] = df['Amount'] / (df['Amount_median'] + 1e-8) | |
| else: | |
| df['AmountRatio_Median'] = 1.0 | |
| # Z-score based on user statistics | |
| if 'Amount_mean' in df.columns and 'Amount_std' in df.columns: | |
| df['Amount_ZScore'] = (df['Amount'] - df['Amount_mean']) / (df['Amount_std'] + 1e-8) | |
| else: | |
| df['Amount_ZScore'] = 0 | |
| except Exception as e: | |
| logger.error(f"Error calculating amount features: {str(e)}") | |
| # Fallback: add default values | |
| df['AmountRatio_Mean'] = 1.0 | |
| df['AmountRatio_Median'] = 1.0 | |
| df['Amount_ZScore'] = 0 | |
| return df | |
| def calculate_category_diversity(self, df: pd.DataFrame) -> pd.DataFrame: | |
| """Calculate category diversity metrics with error handling.""" | |
| df = df.copy() | |
| try: | |
| # Ensure required columns exist | |
| if 'User ID' not in df.columns: | |
| df['User ID'] = 'USER001' | |
| if 'Merchant Category' not in df.columns: | |
| df['Merchant Category'] = 'Unknown' | |
| # Calculate category entropy for each user | |
| def entropy(series): | |
| try: | |
| counts = series.value_counts(normalize=True) | |
| return -np.sum(counts * np.log2(counts + 1e-8)) | |
| except: | |
| return 0 | |
| category_entropy = df.groupby('User ID')['Merchant Category'].apply(entropy) | |
| df['Category_Entropy'] = df['User ID'].map(category_entropy).fillna(0) | |
| except Exception as e: | |
| logger.error(f"Error calculating category diversity: {str(e)}") | |
| # Fallback: add default value | |
| df['Category_Entropy'] = 0 | |
| return df | |
| def calculate_time_variance(self, df: pd.DataFrame) -> pd.DataFrame: | |
| """Calculate time variance features with error handling.""" | |
| df = df.copy() | |
| try: | |
| # Ensure required columns exist | |
| if 'User ID' not in df.columns: | |
| df['User ID'] = 'USER001' | |
| if 'Hour' not in df.columns: | |
| df['Hour'] = 12 | |
| # Calculate hour variance for each user | |
| hour_variance = df.groupby('User ID')['Hour'].var() | |
| df['Hour_Variance'] = df['User ID'].map(hour_variance).fillna(0) | |
| # Calculate most common hour for each user | |
| most_common_hour = df.groupby('User ID')['Hour'].apply(lambda x: x.mode()[0] if not x.mode().empty else 12) | |
| df['Common_Hour'] = df['User ID'].map(most_common_hour).fillna(12) | |
| # Distance from common hour (circular distance) | |
| df['Hour_Distance'] = np.minimum( | |
| np.abs(df['Hour'] - df['Common_Hour']), | |
| 24 - np.abs(df['Hour'] - df['Common_Hour']) | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error calculating time variance: {str(e)}") | |
| # Fallback: add default values | |
| df['Hour_Variance'] = 0 | |
| df['Common_Hour'] = 12 | |
| df['Hour_Distance'] = 0 | |
| return df | |
| def engineer_features(self, df: pd.DataFrame) -> pd.DataFrame: | |
| """Complete feature engineering pipeline with comprehensive error handling.""" | |
| try: | |
| # Extract time features | |
| df = self.extract_time_features(df) | |
| # Calculate transaction frequency | |
| df = self.calculate_transaction_frequency(df) | |
| # Calculate user statistics | |
| df = self.calculate_user_statistics(df) | |
| # Calculate amount features | |
| df = self.calculate_amount_features(df) | |
| # Calculate category diversity | |
| df = self.calculate_category_diversity(df) | |
| # Calculate time variance | |
| df = self.calculate_time_variance(df) | |
| except Exception as e: | |
| logger.error(f"Error in feature engineering pipeline: {str(e)}") | |
| # Fallback: return dataframe with minimal features | |
| # Ensure at least basic features exist | |
| if 'Amount' not in df.columns: | |
| df['Amount'] = 0 | |
| if 'Hour' not in df.columns: | |
| df['Hour'] = 12 | |
| if 'DayOfWeek' not in df.columns: | |
| df['DayOfWeek'] = 0 | |
| if 'Amount_ZScore' not in df.columns: | |
| df['Amount_ZScore'] = 0 | |
| if 'AmountRatio_Mean' not in df.columns: | |
| df['AmountRatio_Mean'] = 1.0 | |
| return df | |
| def get_feature_columns(self) -> List[str]: | |
| """Return list of engineered feature columns.""" | |
| return [ | |
| 'Amount', 'Hour', 'DayOfWeek', 'IsWeekend', 'IsNight', | |
| 'TimeSinceLastTx', 'TxCount_Window', 'AmountRatio_Mean', | |
| 'AmountRatio_Median', 'Amount_ZScore', 'Category_Entropy', | |
| 'Hour_Variance', 'Hour_Distance' | |
| ] | |