""" Rule-Based Fraud Detection Engine Core engine for configurable fraud detection rules """ from datetime import datetime, timedelta from enum import Enum from typing import Any class RuleType(Enum): """Types of fraud detection rules""" VELOCITY = "velocity" # Transaction frequency AMOUNT = "amount" # Transaction amount thresholds PATTERN = "pattern" # Behavioral patterns GEOGRAPHIC = "geographic" # Location-based rules TIME = "time" # Time-based rules ACCOUNT = "account" # Account-based rules class RiskLevel(Enum): """Risk levels for flagged transactions""" LOW = "low" MEDIUM = "medium" HIGH = "high" CRITICAL = "critical" class FraudRule: """Base class for fraud detection rules""" def __init__( self, rule_id: str, name: str, description: str, rule_type: RuleType, risk_level: RiskLevel, enabled: bool = True, ): self.rule_id = rule_id self.name = name self.description = description self.rule_type = rule_type self.risk_level = risk_level self.enabled = enabled self.created_at = datetime.utcnow() self.triggered_count = 0 def evaluate( self, transaction: dict[str, Any], context: dict[str, Any] ) -> dict[str, Any]: """ Evaluate if transaction triggers this rule Returns: { "triggered": bool, "risk_score": int (0-100), "reason": str, "details": dict } """ raise NotImplementedError("Subclasses must implement evaluate()") class VelocityRule(FraudRule): """Check transaction velocity (frequency) patterns""" def __init__( self, rule_id: str, max_transactions: int, time_window_minutes: int, risk_level: RiskLevel = RiskLevel.MEDIUM, ): super().__init__( rule_id=rule_id, name=f"Velocity Check: {max_transactions} transactions in {time_window_minutes} minutes", description=f"Flags if more than {max_transactions} transactions occur within {time_window_minutes} minutes", rule_type=RuleType.VELOCITY, risk_level=risk_level, ) self.max_transactions = max_transactions self.time_window_minutes = time_window_minutes def evaluate( self, transaction: dict[str, Any], context: dict[str, Any] ) -> dict[str, Any]: """Check if velocity threshold exceeded""" account_id = transaction.get("account_id") timestamp = datetime.fromisoformat( transaction.get("timestamp", datetime.utcnow().isoformat()) ) # Get recent transactions from context recent_transactions = context.get("recent_transactions", []) # Filter transactions within time window cutoff_time = timestamp - timedelta(minutes=self.time_window_minutes) transactions_in_window = [ t for t in recent_transactions if datetime.fromisoformat(t.get("timestamp", "")) > cutoff_time and t.get("account_id") == account_id ] count = len(transactions_in_window) + 1 # +1 for current transaction triggered = count > self.max_transactions if triggered: self.triggered_count += 1 risk_score = ( min(100, int((count / self.max_transactions) * 50)) if triggered else 0 ) return { "triggered": triggered, "risk_score": risk_score, "reason": f"Velocity exceeded: {count} transactions in {self.time_window_minutes} minutes (limit: {self.max_transactions})", "details": { "transaction_count": count, "time_window_minutes": self.time_window_minutes, "threshold": self.max_transactions, }, } class AmountThresholdRule(FraudRule): """Check if transaction amount exceeds thresholds""" def __init__( self, rule_id: str, threshold_amount: float, currency: str = "USD", risk_level: RiskLevel = RiskLevel.HIGH, ): super().__init__( rule_id=rule_id, name=f"Amount Threshold: {currency} {threshold_amount}", description=f"Flags transactions exceeding {currency} {threshold_amount}", rule_type=RuleType.AMOUNT, risk_level=risk_level, ) self.threshold_amount = threshold_amount self.currency = currency def evaluate( self, transaction: dict[str, Any], context: dict[str, Any] ) -> dict[str, Any]: """Check if amount exceeds threshold""" amount = float(transaction.get("amount", 0)) currency = transaction.get("currency", "USD") triggered = amount > self.threshold_amount and currency == self.currency if triggered: self.triggered_count += 1 risk_score = ( min(100, int((amount / self.threshold_amount) * 60)) if triggered else 0 ) return { "triggered": triggered, "risk_score": risk_score, "reason": f"Amount {currency} {amount} exceeds threshold {self.currency} {self.threshold_amount}", "details": { "amount": amount, "currency": currency, "threshold": self.threshold_amount, "excess_percentage": ( ((amount / self.threshold_amount) - 1) * 100 if triggered else 0 ), }, } class GeographicAnomalyRule(FraudRule): """Detect geographic anomalies (impossible travel, unusual locations)""" def __init__( self, rule_id: str, max_distance_km: float = 500, min_time_hours: float = 1, risk_level: RiskLevel = RiskLevel.CRITICAL, ): super().__init__( rule_id=rule_id, name=f"Geographic Anomaly: {max_distance_km}km in {min_time_hours}h", description="Detects impossible travel patterns", rule_type=RuleType.GEOGRAPHIC, risk_level=risk_level, ) self.max_distance_km = max_distance_km self.min_time_hours = min_time_hours def evaluate( self, transaction: dict[str, Any], context: dict[str, Any] ) -> dict[str, Any]: """Check for impossible travel""" location = transaction.get("location", {}) timestamp = datetime.fromisoformat( transaction.get("timestamp", datetime.utcnow().isoformat()) ) last_transaction = context.get("last_transaction", {}) if not last_transaction: return { "triggered": False, "risk_score": 0, "reason": "No previous transaction", "details": {}, } last_location = last_transaction.get("location", {}) last_timestamp = datetime.fromisoformat(last_transaction.get("timestamp", "")) # Simplified distance calculation (should use haversine in production) distance_km = self._calculate_distance(location, last_location) time_diff_hours = (timestamp - last_timestamp).total_seconds() / 3600 # Check if travel is physically impossible required_speed = ( distance_km / time_diff_hours if time_diff_hours > 0 else float("inf") ) triggered = ( distance_km > self.max_distance_km and time_diff_hours < self.min_time_hours ) if triggered: self.triggered_count += 1 risk_score = ( min(100, int((distance_km / self.max_distance_km) * 80)) if triggered else 0 ) return { "triggered": triggered, "risk_score": risk_score, "reason": f"Impossible travel: {distance_km:.0f}km in {time_diff_hours:.1f}h (speed: {required_speed:.0f}km/h)", "details": { "distance_km": distance_km, "time_hours": time_diff_hours, "required_speed_kmh": required_speed, "from_location": last_location, "to_location": location, }, } def _calculate_distance(self, loc1: dict, loc2: dict) -> float: """Simplified distance calculation - use haversine in production""" if not loc1 or not loc2: return 0 lat1, lon1 = loc1.get("lat", 0), loc1.get("lon", 0) lat2, lon2 = loc2.get("lat", 0), loc2.get("lon", 0) # Simplified: 1 degree ≈ 111km return abs(lat1 - lat2) * 111 + abs(lon1 - lon2) * 111 class RuleEngine: """Main fraud detection rule engine""" def __init__(self): self.rules: list[FraudRule] = [] self.evaluation_count = 0 def add_rule(self, rule: FraudRule): """Add a rule to the engine""" self.rules.append(rule) def remove_rule(self, rule_id: str): """Remove a rule by ID""" self.rules = [r for r in self.rules if r.rule_id != rule_id] def get_rule(self, rule_id: str) -> FraudRule | None: """Get a specific rule""" return next((r for r in self.rules if r.rule_id == rule_id), None) def evaluate_transaction( self, transaction: dict[str, Any], context: dict[str, Any] ) -> dict[str, Any]: """ Evaluate a transaction against all enabled rules Returns: { "is_fraud": bool, "overall_risk_score": int (0-100), "triggered_rules": list[dict], "recommendations": list[str] } """ self.evaluation_count += 1 triggered_rules = [] total_risk_score = 0 for rule in self.rules: if not rule.enabled: continue result = rule.evaluate(transaction, context) if result["triggered"]: triggered_rules.append( { "rule_id": rule.rule_id, "rule_name": rule.name, "rule_type": rule.rule_type.value, "risk_level": rule.risk_level.value, "risk_score": result["risk_score"], "reason": result["reason"], "details": result["details"], } ) total_risk_score += result["risk_score"] # Cap at 100 overall_risk_score = min(100, total_risk_score) is_fraud = overall_risk_score >= 70 or any( r["risk_level"] == "critical" for r in triggered_rules ) # Generate recommendations recommendations = self._generate_recommendations( triggered_rules, overall_risk_score ) return { "is_fraud": is_fraud, "overall_risk_score": overall_risk_score, "triggered_rules": triggered_rules, "recommendations": recommendations, "evaluation_timestamp": datetime.utcnow().isoformat(), } def _generate_recommendations( self, triggered_rules: list[dict], risk_score: int ) -> list[str]: """Generate action recommendations based on results""" recommendations = [] if risk_score >= 90: recommendations.append( "IMMEDIATE ACTION: Block transaction and contact fraud team" ) recommendations.append("Freeze account pending investigation") elif risk_score >= 70: recommendations.append("HIGH RISK: Manual review required within 1 hour") recommendations.append("Contact customer for verification") elif risk_score >= 50: recommendations.append("MEDIUM RISK: Flag for review within 24 hours") recommendations.append("Add to watchlist") elif risk_score >= 30: recommendations.append("LOW RISK: Automated monitoring") # Specific recommendations based on rule types rule_types = {r["rule_type"] for r in triggered_rules} if "velocity" in rule_types: recommendations.append("Check for account takeover or card testing") if "geographic" in rule_types: recommendations.append("Verify customer location and recent activity") if "amount" in rule_types: recommendations.append("Confirm transaction with customer") return recommendations def get_stats(self) -> dict[str, Any]: """Get engine statistics""" return { "total_rules": len(self.rules), "enabled_rules": sum(1 for r in self.rules if r.enabled), "total_evaluations": self.evaluation_count, "rules_by_type": { rule_type.value: sum(1 for r in self.rules if r.rule_type == rule_type) for rule_type in RuleType }, "most_triggered_rules": sorted( [ {"rule_id": r.rule_id, "name": r.name, "count": r.triggered_count} for r in self.rules ], key=lambda x: x["count"], reverse=True, )[:5], } # Default rule set def create_default_rules() -> RuleEngine: """Create engine with default fraud detection rules""" engine = RuleEngine() # Velocity rules engine.add_rule( VelocityRule( "VEL001", max_transactions=5, time_window_minutes=5, risk_level=RiskLevel.HIGH, ) ) engine.add_rule( VelocityRule( "VEL002", max_transactions=10, time_window_minutes=60, risk_level=RiskLevel.MEDIUM, ) ) # Amount rules engine.add_rule( AmountThresholdRule( "AMT001", threshold_amount=10000.0, risk_level=RiskLevel.HIGH ) ) engine.add_rule( AmountThresholdRule( "AMT002", threshold_amount=50000.0, risk_level=RiskLevel.CRITICAL ) ) # Geographic rules engine.add_rule( GeographicAnomalyRule( "GEO001", max_distance_km=500, min_time_hours=1, risk_level=RiskLevel.CRITICAL, ) ) return engine