File size: 12,325 Bytes
b90f550
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, LabelEncoder
from typing import Tuple, Dict, Optional
import os
import logging

# Configure logging
logging.basicConfig(level=logging.ERROR)
logger = logging.getLogger(__name__)


class DataPreprocessor:
    """Handles data preprocessing for credit card anomaly detection."""
    
    def __init__(self):
        self.scaler = StandardScaler()
        self.label_encoders = {}
        self.user_stats = {}
        
    def load_data(self, filepath: str) -> pd.DataFrame:
        """Load data from multiple file formats (CSV, Excel, JSON, Parquet)."""
        try:
            file_ext = os.path.splitext(filepath)[1].lower()
            
            if file_ext == '.csv':
                df = pd.read_csv(filepath)
            elif file_ext in ['.xlsx', '.xls']:
                df = pd.read_excel(filepath)
            elif file_ext == '.json':
                df = pd.read_json(filepath)
            elif file_ext == '.parquet':
                df = pd.read_parquet(filepath)
            else:
                # Try CSV as fallback
                try:
                    df = pd.read_csv(filepath)
                except:
                    raise ValueError(f"Unsupported file format: {file_ext}")
            
            # Ensure dataframe is not empty
            if df.empty:
                raise ValueError("Loaded data is empty")
                
            return df
            
        except Exception as e:
            logger.error(f"Error loading data: {str(e)}")
            # Return empty dataframe with expected columns as fallback
            return self._create_empty_dataframe()
    
    def _create_empty_dataframe(self) -> pd.DataFrame:
        """Create an empty dataframe with expected columns as fallback."""
        columns = ['Transaction ID', 'User ID', 'Amount', 'Timestamp', 'Merchant Category', 'Location']
        return pd.DataFrame(columns=columns)
    
    def ensure_required_columns(self, df: pd.DataFrame) -> pd.DataFrame:
        """Ensure all required columns exist, auto-generate missing ones."""
        df = df.copy()
        
        # Define required columns and their default generators
        required_columns = {
            'Transaction ID': lambda: [f'TX{i:06d}' for i in range(len(df))],
            'User ID': lambda: [f'USER{i%10+1:03d}' for i in range(len(df))],
            'Amount': lambda: [0.0] * len(df),
            'Timestamp': lambda: pd.date_range(start='2024-01-01', periods=len(df), freq='H'),
            'Merchant Category': lambda: ['Unknown'] * len(df),
            'Location': lambda: ['Unknown'] * len(df)
        }
        
        # Add missing columns with generated defaults
        for col, generator in required_columns.items():
            if col not in df.columns:
                try:
                    df[col] = generator()
                except Exception as e:
                    logger.error(f"Error generating column {col}: {str(e)}")
                    df[col] = 'Unknown' if col in ['Merchant Category', 'Location'] else 0
        
        # Ensure numeric columns are numeric
        numeric_cols = ['Amount']
        for col in numeric_cols:
            if col in df.columns:
                try:
                    df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0)
                except:
                    df[col] = 0
        
        # Ensure timestamp is datetime
        if 'Timestamp' in df.columns:
            try:
                df['Timestamp'] = pd.to_datetime(df['Timestamp'], errors='coerce')
                df['Timestamp'] = df['Timestamp'].fillna(pd.Timestamp('2024-01-01'))
            except:
                df['Timestamp'] = pd.Timestamp('2024-01-01')
        
        return df
    
    def handle_missing_values(self, df: pd.DataFrame) -> pd.DataFrame:
        """Handle missing values in the dataset with safe operations."""
        df = df.copy()
        
        try:
            # Fill missing numeric values with median
            numeric_cols = df.select_dtypes(include=[np.number]).columns
            for col in numeric_cols:
                median_val = df[col].median()
                if pd.isna(median_val):
                    median_val = 0
                df[col] = df[col].fillna(median_val)
            
            # Fill missing categorical values with mode
            categorical_cols = df.select_dtypes(include=['object', 'category']).columns
            for col in categorical_cols:
                mode_val = df[col].mode()
                if not mode_val.empty:
                    df[col] = df[col].fillna(mode_val[0])
                else:
                    df[col] = df[col].fillna('Unknown')
        except Exception as e:
            logger.error(f"Error handling missing values: {str(e)}")
            # Fallback: fill all NaN with appropriate defaults
            for col in df.columns:
                if df[col].dtype in [np.number, 'int64', 'float64']:
                    df[col] = df[col].fillna(0)
                else:
                    df[col] = df[col].fillna('Unknown')
        
        return df
    
    def normalize_per_user(self, df: pd.DataFrame) -> pd.DataFrame:
        """Normalize spending amounts per user with error handling."""
        df = df.copy()
        
        try:
            # Check if User ID column exists
            if 'User ID' not in df.columns:
                df['User ID'] = 'USER001'
                logger.warning("User ID column missing, using default")
            
            # Check if Amount column exists
            if 'Amount' not in df.columns:
                df['Amount'] = 0
                logger.warning("Amount column missing, using default")
            
            # Calculate user statistics
            self.user_stats = df.groupby('User ID')['Amount'].agg(['mean', 'std', 'median']).to_dict('index')
            
            # Normalize amount relative to user average
            def normalize_row(row):
                user_id = row.get('User ID', 'USER001')
                amount = row.get('Amount', 0)
                
                if user_id in self.user_stats:
                    mean = self.user_stats[user_id]['mean']
                    std = self.user_stats[user_id]['std']
                    return (amount - mean) / (std + 1e-8) if std > 0 else 0
                else:
                    return 0
            
            df['Amount_Normalized'] = df.apply(normalize_row, axis=1)
            
        except Exception as e:
            logger.error(f"Error normalizing per user: {str(e)}")
            # Fallback: use simple z-score
            try:
                mean = df['Amount'].mean()
                std = df['Amount'].std()
                df['Amount_Normalized'] = (df['Amount'] - mean) / (std + 1e-8) if std > 0 else 0
            except:
                df['Amount_Normalized'] = 0
        
        return df
    
    def encode_categorical(self, df: pd.DataFrame) -> pd.DataFrame:
        """Encode categorical variables with error handling."""
        df = df.copy()
        
        try:
            # Try to find merchant category column with various possible names
            category_col = None
            possible_names = ['Merchant Category', 'merchant_category', 'Merchant_Category', 'category', 'Category']
            
            for name in possible_names:
                if name in df.columns:
                    category_col = name
                    break
            
            # If no category column found, use a default
            if category_col is None:
                if 'Merchant Category' not in df.columns:
                    df['Merchant Category'] = 'Unknown'
                category_col = 'Merchant Category'
            
            # Encode the category column
            if category_col in df.columns:
                if category_col not in self.label_encoders:
                    self.label_encoders[category_col] = LabelEncoder()
                    df[category_col + '_Encoded'] = self.label_encoders[category_col].fit_transform(df[category_col].astype(str))
                else:
                    df[category_col + '_Encoded'] = self.label_encoders[category_col].transform(df[category_col].astype(str))
        except Exception as e:
            logger.error(f"Error encoding categorical variables: {str(e)}")
            # Fallback: add a simple encoded column
            if 'Merchant Category_Encoded' not in df.columns:
                df['Merchant Category_Encoded'] = 0
        
        return df
    
    def scale_features(self, df: pd.DataFrame, feature_cols: list) -> Tuple[pd.DataFrame, np.ndarray]:
        """Scale numerical features using StandardScaler with error handling."""
        df = df.copy()
        
        try:
            # Filter to only columns that exist
            valid_cols = [col for col in feature_cols if col in df.columns]
            
            if not valid_cols:
                logger.warning("No valid columns to scale")
                return df, np.array([])
            
            if len(df) > 0:
                # Ensure all columns are numeric
                for col in valid_cols:
                    if df[col].dtype not in [np.number, 'int64', 'float64']:
                        try:
                            df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0)
                        except:
                            df[col] = 0
                
                scaled_features = self.scaler.fit_transform(df[valid_cols])
                for i, col in enumerate(valid_cols):
                    df[col + '_Scaled'] = scaled_features[:, i]
            else:
                scaled_features = np.array([])
        except Exception as e:
            logger.error(f"Error scaling features: {str(e)}")
            # Fallback: return original dataframe with empty scaled features
            scaled_features = np.array([])
        
        return df, scaled_features
    
    def preprocess_pipeline(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, np.ndarray, list]:
        """Complete preprocessing pipeline with comprehensive error handling."""
        try:
            # Ensure required columns exist
            df = self.ensure_required_columns(df)
            
            # Handle missing values
            df = self.handle_missing_values(df)
            
            # Normalize per user
            df = self.normalize_per_user(df)
            
            # Encode categorical variables
            df = self.encode_categorical(df)
            
            # Select feature columns for scaling - only include columns that exist
            feature_cols = []
            
            if 'Amount_Normalized' in df.columns:
                feature_cols.append('Amount_Normalized')
            
            if 'Merchant_Category_Encoded' in df.columns:
                feature_cols.append('Merchant_Category_Encoded')
            
            # Add additional features if available
            if 'Amount' in df.columns:
                feature_cols.append('Amount')
            
            # If no features selected, use Amount as fallback
            if not feature_cols and 'Amount' in df.columns:
                feature_cols = ['Amount']
            
            # Scale features only if we have valid columns
            if feature_cols:
                df, scaled_features = self.scale_features(df, feature_cols)
            else:
                scaled_features = np.array([])
            
            return df, scaled_features, feature_cols
        except Exception as e:
            logger.error(f"Error in preprocessing pipeline: {str(e)}")
            # Fallback: return original dataframe with minimal processing
            try:
                df = self.ensure_required_columns(df)
                feature_cols = ['Amount']
                df, scaled_features = self.scale_features(df, feature_cols)
                return df, scaled_features, feature_cols
            except:
                # Ultimate fallback
                return df, np.array([]), ['Amount']
    
    def get_user_statistics(self, user_id: str) -> Dict:
        """Get statistics for a specific user."""
        return self.user_stats.get(user_id, {})