ReconAI / anomaly.py
ACA050's picture
Upload 14 files
64e5ee2 verified
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class AnomalyDetector:
def __init__(self, contamination=0.05):
self.contamination = contamination
self.model = IsolationForest(contamination=self.contamination, random_state=42, n_estimators=100)
self.scaler = StandardScaler()
def prepare_features(self, df, amount_col):
features_df = df.copy()
# Basic amount features
features_df['amount_log'] = np.log1p(np.abs(features_df[amount_col].fillna(0)))
features_df['amount_sign'] = np.sign(features_df[amount_col].fillna(0))
feature_columns = [amount_col, 'amount_log', 'amount_sign']
# Statistical features
if len(df) > 1:
features_df['amount_zscore'] = (
(features_df[amount_col] - features_df[amount_col].mean()) /
(features_df[amount_col].std() + 1e-9)
)
feature_columns.append('amount_zscore')
# Try to do rolling stats if date column exists
date_col = next((col for col in ['InvoiceDate', 'date', 'Date'] if col in features_df.columns), None)
if date_col:
# Keep track of original index to restore order later
features_df['original_idx'] = features_df.index
features_df[date_col] = pd.to_datetime(features_df[date_col], errors='coerce')
features_df = features_df.sort_values(date_col)
features_df['amount_rolling_mean'] = features_df[amount_col].rolling(7, min_periods=1).mean()
features_df['amount_rolling_std'] = features_df[amount_col].rolling(7, min_periods=1).std().fillna(0)
feature_columns.extend(['amount_rolling_mean', 'amount_rolling_std'])
# Restore original index order so we don't shuffle the output dataframe
features_df = features_df.sort_values('original_idx')
features_df = features_df.fillna(0)
return features_df, feature_columns
def detect_anomalies(self, df, amount_col='Amount'):
"""
Detects anomalies in the given DataFrame based on the specified amount column.
Returns the DataFrame with 'IsAnomaly' and 'AnomalyScore' appended.
"""
logger.info(f"Running advanced anomaly detection on column: {amount_col}")
if df.empty or amount_col not in df.columns:
logger.warning("DataFrame is empty or amount column not found.")
df['IsAnomaly'] = False
df['AnomalyScore'] = 0.0
return df
try:
# Prepare advanced features
features_df, feature_cols = self.prepare_features(df, amount_col)
X = features_df[feature_cols].values
# Scale features
X_scaled = self.scaler.fit_transform(X)
# Fit and predict
predictions = self.model.fit_predict(X_scaled)
scores = self.model.decision_function(X_scaled)
# -1 indicates anomaly, 1 indicates normal
df['IsAnomaly'] = predictions == -1
# Normalize scores: lower IsolationForest score = more anomalous.
# We invert it so a higher positive score = higher anomaly risk.
df['AnomalyScore'] = -scores
logger.info(f"Anomaly detection complete. Found {df['IsAnomaly'].sum()} anomalies.")
except Exception as e:
logger.error(f"Error during advanced anomaly detection: {e}")
df['IsAnomaly'] = False
df['AnomalyScore'] = 0.0
return df