import pandas as pd import numpy as np from sklearn.ensemble import IsolationForest from sklearn.preprocessing import StandardScaler import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class AnomalyDetector: def __init__(self, contamination=0.05): self.contamination = contamination self.model = IsolationForest(contamination=self.contamination, random_state=42, n_estimators=100) self.scaler = StandardScaler() def prepare_features(self, df, amount_col): features_df = df.copy() # Basic amount features features_df['amount_log'] = np.log1p(np.abs(features_df[amount_col].fillna(0))) features_df['amount_sign'] = np.sign(features_df[amount_col].fillna(0)) feature_columns = [amount_col, 'amount_log', 'amount_sign'] # Statistical features if len(df) > 1: features_df['amount_zscore'] = ( (features_df[amount_col] - features_df[amount_col].mean()) / (features_df[amount_col].std() + 1e-9) ) feature_columns.append('amount_zscore') # Try to do rolling stats if date column exists date_col = next((col for col in ['InvoiceDate', 'date', 'Date'] if col in features_df.columns), None) if date_col: # Keep track of original index to restore order later features_df['original_idx'] = features_df.index features_df[date_col] = pd.to_datetime(features_df[date_col], errors='coerce') features_df = features_df.sort_values(date_col) features_df['amount_rolling_mean'] = features_df[amount_col].rolling(7, min_periods=1).mean() features_df['amount_rolling_std'] = features_df[amount_col].rolling(7, min_periods=1).std().fillna(0) feature_columns.extend(['amount_rolling_mean', 'amount_rolling_std']) # Restore original index order so we don't shuffle the output dataframe features_df = features_df.sort_values('original_idx') features_df = features_df.fillna(0) return features_df, feature_columns def detect_anomalies(self, df, amount_col='Amount'): """ Detects anomalies in the given DataFrame based on the specified amount column. Returns the DataFrame with 'IsAnomaly' and 'AnomalyScore' appended. """ logger.info(f"Running advanced anomaly detection on column: {amount_col}") if df.empty or amount_col not in df.columns: logger.warning("DataFrame is empty or amount column not found.") df['IsAnomaly'] = False df['AnomalyScore'] = 0.0 return df try: # Prepare advanced features features_df, feature_cols = self.prepare_features(df, amount_col) X = features_df[feature_cols].values # Scale features X_scaled = self.scaler.fit_transform(X) # Fit and predict predictions = self.model.fit_predict(X_scaled) scores = self.model.decision_function(X_scaled) # -1 indicates anomaly, 1 indicates normal df['IsAnomaly'] = predictions == -1 # Normalize scores: lower IsolationForest score = more anomalous. # We invert it so a higher positive score = higher anomaly risk. df['AnomalyScore'] = -scores logger.info(f"Anomaly detection complete. Found {df['IsAnomaly'].sum()} anomalies.") except Exception as e: logger.error(f"Error during advanced anomaly detection: {e}") df['IsAnomaly'] = False df['AnomalyScore'] = 0.0 return df