import pandas as pd import numpy as np from typing import Dict, List, Any class AnomalyExplainer: """Generate explanations for detected anomalies.""" def __init__(self): self.thresholds = { 'amount_ratio': 3.0, # 3x user average 'z_score': 2.5, # 2.5 standard deviations 'hour_distance': 6, # 6 hours from common hour 'time_since_last': 48, # 48 hours since last transaction 'night_transaction': True, # Transaction at night 'weekend_ratio': 2.0 # Weekend transaction ratio } def explain_anomaly(self, transaction: pd.Series, user_stats: Dict = None) -> List[str]: """Generate explanation for a single anomalous transaction.""" explanations = [] # Check amount deviation if 'AmountRatio_Mean' in transaction and not pd.isna(transaction['AmountRatio_Mean']): ratio = transaction['AmountRatio_Mean'] if ratio > self.thresholds['amount_ratio']: explanations.append(f"Transaction amount is {ratio:.1f}x higher than user's average") elif ratio > self.thresholds['amount_ratio'] * 0.5: explanations.append(f"Transaction amount is {ratio:.1f}x higher than user's average") if 'Amount_ZScore' in transaction and not pd.isna(transaction['Amount_ZScore']): z_score = abs(transaction['Amount_ZScore']) if z_score > self.thresholds['z_score']: explanations.append(f"Transaction amount deviates by {z_score:.1f} standard deviations from user's normal spending") # Check time-based anomalies if 'Hour' in transaction and not pd.isna(transaction['Hour']): hour = transaction['Hour'] if hour >= 22 or hour <= 5: explanations.append(f"Unusual transaction time: {hour}:00 (night hours)") if 'Hour_Distance' in transaction and not pd.isna(transaction['Hour_Distance']): hour_dist = transaction['Hour_Distance'] if hour_dist > self.thresholds['hour_distance']: explanations.append(f"Transaction time is {hour_dist:.0f} hours away from user's typical transaction hour") if 'IsWeekend' in transaction and transaction['IsWeekend'] == 1: explanations.append("Transaction occurred on a weekend") # Check frequency anomalies if 'TimeSinceLastTx' in transaction and not pd.isna(transaction['TimeSinceLastTx']): time_since = transaction['TimeSinceLastTx'] if time_since > self.thresholds['time_since_last']: explanations.append(f"Unusual transaction pattern: {time_since:.0f} hours since last transaction") elif time_since < 1: explanations.append("Rapid succession: multiple transactions within 1 hour") # Check category anomalies if 'Merchant Category' in transaction: category = transaction['Merchant Category'] explanations.append(f"Merchant category: {category}") if 'Category_Entropy' in transaction and not pd.isna(transaction['Category_Entropy']): entropy = transaction['Category_Entropy'] if entropy < 1.0: explanations.append("User typically has low category diversity - this transaction may be unusual") # If no specific explanations found, provide general one if not explanations: explanations.append("Anomaly detected based on combined feature analysis") return explanations def explain_batch(self, df: pd.DataFrame, user_stats: Dict = None) -> pd.DataFrame: """Generate explanations for a batch of transactions.""" df = df.copy() explanations = [] for idx, row in df.iterrows(): explanation = self.explain_anomaly(row, user_stats) explanations.append('; '.join(explanation)) df['Explanation'] = explanations return df def get_feature_importance(self, transaction: pd.Series) -> Dict[str, float]: """Calculate feature importance for the anomaly.""" importance = {} # Amount importance if 'Amount_ZScore' in transaction and not pd.isna(transaction['Amount_ZScore']): importance['Amount'] = min(abs(transaction['Amount_ZScore']) / 5.0, 1.0) # Time importance if 'Hour_Distance' in transaction and not pd.isna(transaction['Hour_Distance']): importance['Time'] = min(transaction['Hour_Distance'] / 12.0, 1.0) # Frequency importance if 'TimeSinceLastTx' in transaction and not pd.isna(transaction['TimeSinceLastTx']): importance['Frequency'] = min(transaction['TimeSinceLastTx'] / 72.0, 1.0) # Category importance if 'Category_Entropy' in transaction and not pd.isna(transaction['Category_Entropy']): importance['Category'] = max(0, 1.0 - transaction['Category_Entropy'] / 3.0) # Normalize importance scores total = sum(importance.values()) if importance else 1 if total > 0: importance = {k: v / total for k, v in importance.items()} return importance def generate_radar_data(self, df: pd.DataFrame) -> Dict[str, List]: """Generate data for radar chart visualization.""" if df.empty: return {'labels': [], 'datasets': []} # Calculate metrics for radar chart metrics = { 'Amount': df['Amount'].mean() if 'Amount' in df.columns else 0, 'Frequency': df.get('TxCount_Window', pd.Series([1])).mean(), 'Time Variance': df.get('Hour_Variance', pd.Series([0])).mean(), 'Category Diversity': df.get('Category_Entropy', pd.Series([0])).mean() } # Normalize to 0-100 scale max_vals = { 'Amount': df['Amount'].max() if 'Amount' in df.columns else 1, 'Frequency': metrics['Frequency'] * 2, 'Time Variance': 50, 'Category Diversity': 3 } normalized = { 'Amount': (metrics['Amount'] / max_vals['Amount'] * 100) if max_vals['Amount'] > 0 else 50, 'Frequency': (metrics['Frequency'] / max_vals['Frequency'] * 100) if max_vals['Frequency'] > 0 else 50, 'Time Variance': (metrics['Time Variance'] / max_vals['Time Variance'] * 100), 'Category Diversity': (metrics['Category Diversity'] / max_vals['Category Diversity'] * 100) } return { 'labels': list(normalized.keys()), 'values': [normalized[k] for k in normalized.keys()] } def compare_with_user_baseline(self, transaction: pd.Series, user_baseline: Dict) -> Dict[str, Any]: """Compare transaction with user's baseline behavior.""" comparison = {} if 'Amount' in transaction and 'mean' in user_baseline: comparison['amount_vs_avg'] = transaction['Amount'] / user_baseline['mean'] if 'Hour' in transaction: comparison['hour'] = transaction['Hour'] if 'Merchant Category' in transaction: comparison['category'] = transaction['Merchant Category'] return comparison