Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import numpy as np | |
| from typing import Dict, List, Any | |
| class AnomalyExplainer: | |
| """Generate explanations for detected anomalies.""" | |
| def __init__(self): | |
| self.thresholds = { | |
| 'amount_ratio': 3.0, # 3x user average | |
| 'z_score': 2.5, # 2.5 standard deviations | |
| 'hour_distance': 6, # 6 hours from common hour | |
| 'time_since_last': 48, # 48 hours since last transaction | |
| 'night_transaction': True, # Transaction at night | |
| 'weekend_ratio': 2.0 # Weekend transaction ratio | |
| } | |
| def explain_anomaly(self, transaction: pd.Series, user_stats: Dict = None) -> List[str]: | |
| """Generate explanation for a single anomalous transaction.""" | |
| explanations = [] | |
| # Check amount deviation | |
| if 'AmountRatio_Mean' in transaction and not pd.isna(transaction['AmountRatio_Mean']): | |
| ratio = transaction['AmountRatio_Mean'] | |
| if ratio > self.thresholds['amount_ratio']: | |
| explanations.append(f"Transaction amount is {ratio:.1f}x higher than user's average") | |
| elif ratio > self.thresholds['amount_ratio'] * 0.5: | |
| explanations.append(f"Transaction amount is {ratio:.1f}x higher than user's average") | |
| if 'Amount_ZScore' in transaction and not pd.isna(transaction['Amount_ZScore']): | |
| z_score = abs(transaction['Amount_ZScore']) | |
| if z_score > self.thresholds['z_score']: | |
| explanations.append(f"Transaction amount deviates by {z_score:.1f} standard deviations from user's normal spending") | |
| # Check time-based anomalies | |
| if 'Hour' in transaction and not pd.isna(transaction['Hour']): | |
| hour = transaction['Hour'] | |
| if hour >= 22 or hour <= 5: | |
| explanations.append(f"Unusual transaction time: {hour}:00 (night hours)") | |
| if 'Hour_Distance' in transaction and not pd.isna(transaction['Hour_Distance']): | |
| hour_dist = transaction['Hour_Distance'] | |
| if hour_dist > self.thresholds['hour_distance']: | |
| explanations.append(f"Transaction time is {hour_dist:.0f} hours away from user's typical transaction hour") | |
| if 'IsWeekend' in transaction and transaction['IsWeekend'] == 1: | |
| explanations.append("Transaction occurred on a weekend") | |
| # Check frequency anomalies | |
| if 'TimeSinceLastTx' in transaction and not pd.isna(transaction['TimeSinceLastTx']): | |
| time_since = transaction['TimeSinceLastTx'] | |
| if time_since > self.thresholds['time_since_last']: | |
| explanations.append(f"Unusual transaction pattern: {time_since:.0f} hours since last transaction") | |
| elif time_since < 1: | |
| explanations.append("Rapid succession: multiple transactions within 1 hour") | |
| # Check category anomalies | |
| if 'Merchant Category' in transaction: | |
| category = transaction['Merchant Category'] | |
| explanations.append(f"Merchant category: {category}") | |
| if 'Category_Entropy' in transaction and not pd.isna(transaction['Category_Entropy']): | |
| entropy = transaction['Category_Entropy'] | |
| if entropy < 1.0: | |
| explanations.append("User typically has low category diversity - this transaction may be unusual") | |
| # If no specific explanations found, provide general one | |
| if not explanations: | |
| explanations.append("Anomaly detected based on combined feature analysis") | |
| return explanations | |
| def explain_batch(self, df: pd.DataFrame, user_stats: Dict = None) -> pd.DataFrame: | |
| """Generate explanations for a batch of transactions.""" | |
| df = df.copy() | |
| explanations = [] | |
| for idx, row in df.iterrows(): | |
| explanation = self.explain_anomaly(row, user_stats) | |
| explanations.append('; '.join(explanation)) | |
| df['Explanation'] = explanations | |
| return df | |
| def get_feature_importance(self, transaction: pd.Series) -> Dict[str, float]: | |
| """Calculate feature importance for the anomaly.""" | |
| importance = {} | |
| # Amount importance | |
| if 'Amount_ZScore' in transaction and not pd.isna(transaction['Amount_ZScore']): | |
| importance['Amount'] = min(abs(transaction['Amount_ZScore']) / 5.0, 1.0) | |
| # Time importance | |
| if 'Hour_Distance' in transaction and not pd.isna(transaction['Hour_Distance']): | |
| importance['Time'] = min(transaction['Hour_Distance'] / 12.0, 1.0) | |
| # Frequency importance | |
| if 'TimeSinceLastTx' in transaction and not pd.isna(transaction['TimeSinceLastTx']): | |
| importance['Frequency'] = min(transaction['TimeSinceLastTx'] / 72.0, 1.0) | |
| # Category importance | |
| if 'Category_Entropy' in transaction and not pd.isna(transaction['Category_Entropy']): | |
| importance['Category'] = max(0, 1.0 - transaction['Category_Entropy'] / 3.0) | |
| # Normalize importance scores | |
| total = sum(importance.values()) if importance else 1 | |
| if total > 0: | |
| importance = {k: v / total for k, v in importance.items()} | |
| return importance | |
| def generate_radar_data(self, df: pd.DataFrame) -> Dict[str, List]: | |
| """Generate data for radar chart visualization.""" | |
| if df.empty: | |
| return {'labels': [], 'datasets': []} | |
| # Calculate metrics for radar chart | |
| metrics = { | |
| 'Amount': df['Amount'].mean() if 'Amount' in df.columns else 0, | |
| 'Frequency': df.get('TxCount_Window', pd.Series([1])).mean(), | |
| 'Time Variance': df.get('Hour_Variance', pd.Series([0])).mean(), | |
| 'Category Diversity': df.get('Category_Entropy', pd.Series([0])).mean() | |
| } | |
| # Normalize to 0-100 scale | |
| max_vals = { | |
| 'Amount': df['Amount'].max() if 'Amount' in df.columns else 1, | |
| 'Frequency': metrics['Frequency'] * 2, | |
| 'Time Variance': 50, | |
| 'Category Diversity': 3 | |
| } | |
| normalized = { | |
| 'Amount': (metrics['Amount'] / max_vals['Amount'] * 100) if max_vals['Amount'] > 0 else 50, | |
| 'Frequency': (metrics['Frequency'] / max_vals['Frequency'] * 100) if max_vals['Frequency'] > 0 else 50, | |
| 'Time Variance': (metrics['Time Variance'] / max_vals['Time Variance'] * 100), | |
| 'Category Diversity': (metrics['Category Diversity'] / max_vals['Category Diversity'] * 100) | |
| } | |
| return { | |
| 'labels': list(normalized.keys()), | |
| 'values': [normalized[k] for k in normalized.keys()] | |
| } | |
| def compare_with_user_baseline(self, transaction: pd.Series, user_baseline: Dict) -> Dict[str, Any]: | |
| """Compare transaction with user's baseline behavior.""" | |
| comparison = {} | |
| if 'Amount' in transaction and 'mean' in user_baseline: | |
| comparison['amount_vs_avg'] = transaction['Amount'] / user_baseline['mean'] | |
| if 'Hour' in transaction: | |
| comparison['hour'] = transaction['Hour'] | |
| if 'Merchant Category' in transaction: | |
| comparison['category'] = transaction['Merchant Category'] | |
| return comparison | |