Credit-Card-Anomaly / model /explain.py
Zayeemk's picture
Rename explain.py to model/explain.py
bc64e76 verified
import pandas as pd
import numpy as np
from typing import Dict, List, Any
class AnomalyExplainer:
"""Generate explanations for detected anomalies."""
def __init__(self):
self.thresholds = {
'amount_ratio': 3.0, # 3x user average
'z_score': 2.5, # 2.5 standard deviations
'hour_distance': 6, # 6 hours from common hour
'time_since_last': 48, # 48 hours since last transaction
'night_transaction': True, # Transaction at night
'weekend_ratio': 2.0 # Weekend transaction ratio
}
def explain_anomaly(self, transaction: pd.Series, user_stats: Dict = None) -> List[str]:
"""Generate explanation for a single anomalous transaction."""
explanations = []
# Check amount deviation
if 'AmountRatio_Mean' in transaction and not pd.isna(transaction['AmountRatio_Mean']):
ratio = transaction['AmountRatio_Mean']
if ratio > self.thresholds['amount_ratio']:
explanations.append(f"Transaction amount is {ratio:.1f}x higher than user's average")
elif ratio > self.thresholds['amount_ratio'] * 0.5:
explanations.append(f"Transaction amount is {ratio:.1f}x higher than user's average")
if 'Amount_ZScore' in transaction and not pd.isna(transaction['Amount_ZScore']):
z_score = abs(transaction['Amount_ZScore'])
if z_score > self.thresholds['z_score']:
explanations.append(f"Transaction amount deviates by {z_score:.1f} standard deviations from user's normal spending")
# Check time-based anomalies
if 'Hour' in transaction and not pd.isna(transaction['Hour']):
hour = transaction['Hour']
if hour >= 22 or hour <= 5:
explanations.append(f"Unusual transaction time: {hour}:00 (night hours)")
if 'Hour_Distance' in transaction and not pd.isna(transaction['Hour_Distance']):
hour_dist = transaction['Hour_Distance']
if hour_dist > self.thresholds['hour_distance']:
explanations.append(f"Transaction time is {hour_dist:.0f} hours away from user's typical transaction hour")
if 'IsWeekend' in transaction and transaction['IsWeekend'] == 1:
explanations.append("Transaction occurred on a weekend")
# Check frequency anomalies
if 'TimeSinceLastTx' in transaction and not pd.isna(transaction['TimeSinceLastTx']):
time_since = transaction['TimeSinceLastTx']
if time_since > self.thresholds['time_since_last']:
explanations.append(f"Unusual transaction pattern: {time_since:.0f} hours since last transaction")
elif time_since < 1:
explanations.append("Rapid succession: multiple transactions within 1 hour")
# Check category anomalies
if 'Merchant Category' in transaction:
category = transaction['Merchant Category']
explanations.append(f"Merchant category: {category}")
if 'Category_Entropy' in transaction and not pd.isna(transaction['Category_Entropy']):
entropy = transaction['Category_Entropy']
if entropy < 1.0:
explanations.append("User typically has low category diversity - this transaction may be unusual")
# If no specific explanations found, provide general one
if not explanations:
explanations.append("Anomaly detected based on combined feature analysis")
return explanations
def explain_batch(self, df: pd.DataFrame, user_stats: Dict = None) -> pd.DataFrame:
"""Generate explanations for a batch of transactions."""
df = df.copy()
explanations = []
for idx, row in df.iterrows():
explanation = self.explain_anomaly(row, user_stats)
explanations.append('; '.join(explanation))
df['Explanation'] = explanations
return df
def get_feature_importance(self, transaction: pd.Series) -> Dict[str, float]:
"""Calculate feature importance for the anomaly."""
importance = {}
# Amount importance
if 'Amount_ZScore' in transaction and not pd.isna(transaction['Amount_ZScore']):
importance['Amount'] = min(abs(transaction['Amount_ZScore']) / 5.0, 1.0)
# Time importance
if 'Hour_Distance' in transaction and not pd.isna(transaction['Hour_Distance']):
importance['Time'] = min(transaction['Hour_Distance'] / 12.0, 1.0)
# Frequency importance
if 'TimeSinceLastTx' in transaction and not pd.isna(transaction['TimeSinceLastTx']):
importance['Frequency'] = min(transaction['TimeSinceLastTx'] / 72.0, 1.0)
# Category importance
if 'Category_Entropy' in transaction and not pd.isna(transaction['Category_Entropy']):
importance['Category'] = max(0, 1.0 - transaction['Category_Entropy'] / 3.0)
# Normalize importance scores
total = sum(importance.values()) if importance else 1
if total > 0:
importance = {k: v / total for k, v in importance.items()}
return importance
def generate_radar_data(self, df: pd.DataFrame) -> Dict[str, List]:
"""Generate data for radar chart visualization."""
if df.empty:
return {'labels': [], 'datasets': []}
# Calculate metrics for radar chart
metrics = {
'Amount': df['Amount'].mean() if 'Amount' in df.columns else 0,
'Frequency': df.get('TxCount_Window', pd.Series([1])).mean(),
'Time Variance': df.get('Hour_Variance', pd.Series([0])).mean(),
'Category Diversity': df.get('Category_Entropy', pd.Series([0])).mean()
}
# Normalize to 0-100 scale
max_vals = {
'Amount': df['Amount'].max() if 'Amount' in df.columns else 1,
'Frequency': metrics['Frequency'] * 2,
'Time Variance': 50,
'Category Diversity': 3
}
normalized = {
'Amount': (metrics['Amount'] / max_vals['Amount'] * 100) if max_vals['Amount'] > 0 else 50,
'Frequency': (metrics['Frequency'] / max_vals['Frequency'] * 100) if max_vals['Frequency'] > 0 else 50,
'Time Variance': (metrics['Time Variance'] / max_vals['Time Variance'] * 100),
'Category Diversity': (metrics['Category Diversity'] / max_vals['Category Diversity'] * 100)
}
return {
'labels': list(normalized.keys()),
'values': [normalized[k] for k in normalized.keys()]
}
def compare_with_user_baseline(self, transaction: pd.Series, user_baseline: Dict) -> Dict[str, Any]:
"""Compare transaction with user's baseline behavior."""
comparison = {}
if 'Amount' in transaction and 'mean' in user_baseline:
comparison['amount_vs_avg'] = transaction['Amount'] / user_baseline['mean']
if 'Hour' in transaction:
comparison['hour'] = transaction['Hour']
if 'Merchant Category' in transaction:
comparison['category'] = transaction['Merchant Category']
return comparison