| import pandas as pd |
| import numpy as np |
| from sklearn.ensemble import IsolationForest |
| from sklearn.preprocessing import StandardScaler |
| import logging |
|
|
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| class AnomalyDetector: |
| def __init__(self, contamination=0.05): |
| self.contamination = contamination |
| self.model = IsolationForest(contamination=self.contamination, random_state=42, n_estimators=100) |
| self.scaler = StandardScaler() |
|
|
| def prepare_features(self, df, amount_col): |
| features_df = df.copy() |
| |
| |
| features_df['amount_log'] = np.log1p(np.abs(features_df[amount_col].fillna(0))) |
| features_df['amount_sign'] = np.sign(features_df[amount_col].fillna(0)) |
| |
| feature_columns = [amount_col, 'amount_log', 'amount_sign'] |
| |
| |
| if len(df) > 1: |
| features_df['amount_zscore'] = ( |
| (features_df[amount_col] - features_df[amount_col].mean()) / |
| (features_df[amount_col].std() + 1e-9) |
| ) |
| feature_columns.append('amount_zscore') |
| |
| |
| date_col = next((col for col in ['InvoiceDate', 'date', 'Date'] if col in features_df.columns), None) |
| if date_col: |
| |
| features_df['original_idx'] = features_df.index |
| features_df[date_col] = pd.to_datetime(features_df[date_col], errors='coerce') |
| features_df = features_df.sort_values(date_col) |
| |
| features_df['amount_rolling_mean'] = features_df[amount_col].rolling(7, min_periods=1).mean() |
| features_df['amount_rolling_std'] = features_df[amount_col].rolling(7, min_periods=1).std().fillna(0) |
| feature_columns.extend(['amount_rolling_mean', 'amount_rolling_std']) |
| |
| |
| features_df = features_df.sort_values('original_idx') |
|
|
| features_df = features_df.fillna(0) |
| return features_df, feature_columns |
|
|
| def detect_anomalies(self, df, amount_col='Amount'): |
| """ |
| Detects anomalies in the given DataFrame based on the specified amount column. |
| Returns the DataFrame with 'IsAnomaly' and 'AnomalyScore' appended. |
| """ |
| logger.info(f"Running advanced anomaly detection on column: {amount_col}") |
| |
| if df.empty or amount_col not in df.columns: |
| logger.warning("DataFrame is empty or amount column not found.") |
| df['IsAnomaly'] = False |
| df['AnomalyScore'] = 0.0 |
| return df |
|
|
| try: |
| |
| features_df, feature_cols = self.prepare_features(df, amount_col) |
| X = features_df[feature_cols].values |
| |
| |
| X_scaled = self.scaler.fit_transform(X) |
| |
| |
| predictions = self.model.fit_predict(X_scaled) |
| scores = self.model.decision_function(X_scaled) |
| |
| |
| df['IsAnomaly'] = predictions == -1 |
| |
| |
| |
| df['AnomalyScore'] = -scores |
| |
| logger.info(f"Anomaly detection complete. Found {df['IsAnomaly'].sum()} anomalies.") |
| except Exception as e: |
| logger.error(f"Error during advanced anomaly detection: {e}") |
| df['IsAnomaly'] = False |
| df['AnomalyScore'] = 0.0 |
|
|
| return df |
|
|