Spaces:
Paused
Paused
| """ | |
| Rule-Based Fraud Detection Engine | |
| Core engine for configurable fraud detection rules | |
| """ | |
| from datetime import datetime, timedelta | |
| from enum import Enum | |
| from typing import Any | |
| class RuleType(Enum): | |
| """Types of fraud detection rules""" | |
| VELOCITY = "velocity" # Transaction frequency | |
| AMOUNT = "amount" # Transaction amount thresholds | |
| PATTERN = "pattern" # Behavioral patterns | |
| GEOGRAPHIC = "geographic" # Location-based rules | |
| TIME = "time" # Time-based rules | |
| ACCOUNT = "account" # Account-based rules | |
| class RiskLevel(Enum): | |
| """Risk levels for flagged transactions""" | |
| LOW = "low" | |
| MEDIUM = "medium" | |
| HIGH = "high" | |
| CRITICAL = "critical" | |
| class FraudRule: | |
| """Base class for fraud detection rules""" | |
| def __init__( | |
| self, | |
| rule_id: str, | |
| name: str, | |
| description: str, | |
| rule_type: RuleType, | |
| risk_level: RiskLevel, | |
| enabled: bool = True, | |
| ): | |
| self.rule_id = rule_id | |
| self.name = name | |
| self.description = description | |
| self.rule_type = rule_type | |
| self.risk_level = risk_level | |
| self.enabled = enabled | |
| self.created_at = datetime.utcnow() | |
| self.triggered_count = 0 | |
| def evaluate( | |
| self, transaction: dict[str, Any], context: dict[str, Any] | |
| ) -> dict[str, Any]: | |
| """ | |
| Evaluate if transaction triggers this rule | |
| Returns: | |
| { | |
| "triggered": bool, | |
| "risk_score": int (0-100), | |
| "reason": str, | |
| "details": dict | |
| } | |
| """ | |
| raise NotImplementedError("Subclasses must implement evaluate()") | |
| class VelocityRule(FraudRule): | |
| """Check transaction velocity (frequency) patterns""" | |
| def __init__( | |
| self, | |
| rule_id: str, | |
| max_transactions: int, | |
| time_window_minutes: int, | |
| risk_level: RiskLevel = RiskLevel.MEDIUM, | |
| ): | |
| super().__init__( | |
| rule_id=rule_id, | |
| name=f"Velocity Check: {max_transactions} transactions in {time_window_minutes} minutes", | |
| description=f"Flags if more than {max_transactions} transactions occur within {time_window_minutes} minutes", | |
| rule_type=RuleType.VELOCITY, | |
| risk_level=risk_level, | |
| ) | |
| self.max_transactions = max_transactions | |
| self.time_window_minutes = time_window_minutes | |
| def evaluate( | |
| self, transaction: dict[str, Any], context: dict[str, Any] | |
| ) -> dict[str, Any]: | |
| """Check if velocity threshold exceeded""" | |
| account_id = transaction.get("account_id") | |
| timestamp = datetime.fromisoformat( | |
| transaction.get("timestamp", datetime.utcnow().isoformat()) | |
| ) | |
| # Get recent transactions from context | |
| recent_transactions = context.get("recent_transactions", []) | |
| # Filter transactions within time window | |
| cutoff_time = timestamp - timedelta(minutes=self.time_window_minutes) | |
| transactions_in_window = [ | |
| t | |
| for t in recent_transactions | |
| if datetime.fromisoformat(t.get("timestamp", "")) > cutoff_time | |
| and t.get("account_id") == account_id | |
| ] | |
| count = len(transactions_in_window) + 1 # +1 for current transaction | |
| triggered = count > self.max_transactions | |
| if triggered: | |
| self.triggered_count += 1 | |
| risk_score = ( | |
| min(100, int((count / self.max_transactions) * 50)) if triggered else 0 | |
| ) | |
| return { | |
| "triggered": triggered, | |
| "risk_score": risk_score, | |
| "reason": f"Velocity exceeded: {count} transactions in {self.time_window_minutes} minutes (limit: {self.max_transactions})", | |
| "details": { | |
| "transaction_count": count, | |
| "time_window_minutes": self.time_window_minutes, | |
| "threshold": self.max_transactions, | |
| }, | |
| } | |
| class AmountThresholdRule(FraudRule): | |
| """Check if transaction amount exceeds thresholds""" | |
| def __init__( | |
| self, | |
| rule_id: str, | |
| threshold_amount: float, | |
| currency: str = "USD", | |
| risk_level: RiskLevel = RiskLevel.HIGH, | |
| ): | |
| super().__init__( | |
| rule_id=rule_id, | |
| name=f"Amount Threshold: {currency} {threshold_amount}", | |
| description=f"Flags transactions exceeding {currency} {threshold_amount}", | |
| rule_type=RuleType.AMOUNT, | |
| risk_level=risk_level, | |
| ) | |
| self.threshold_amount = threshold_amount | |
| self.currency = currency | |
| def evaluate( | |
| self, transaction: dict[str, Any], context: dict[str, Any] | |
| ) -> dict[str, Any]: | |
| """Check if amount exceeds threshold""" | |
| amount = float(transaction.get("amount", 0)) | |
| currency = transaction.get("currency", "USD") | |
| triggered = amount > self.threshold_amount and currency == self.currency | |
| if triggered: | |
| self.triggered_count += 1 | |
| risk_score = ( | |
| min(100, int((amount / self.threshold_amount) * 60)) if triggered else 0 | |
| ) | |
| return { | |
| "triggered": triggered, | |
| "risk_score": risk_score, | |
| "reason": f"Amount {currency} {amount} exceeds threshold {self.currency} {self.threshold_amount}", | |
| "details": { | |
| "amount": amount, | |
| "currency": currency, | |
| "threshold": self.threshold_amount, | |
| "excess_percentage": ( | |
| ((amount / self.threshold_amount) - 1) * 100 if triggered else 0 | |
| ), | |
| }, | |
| } | |
| class GeographicAnomalyRule(FraudRule): | |
| """Detect geographic anomalies (impossible travel, unusual locations)""" | |
| def __init__( | |
| self, | |
| rule_id: str, | |
| max_distance_km: float = 500, | |
| min_time_hours: float = 1, | |
| risk_level: RiskLevel = RiskLevel.CRITICAL, | |
| ): | |
| super().__init__( | |
| rule_id=rule_id, | |
| name=f"Geographic Anomaly: {max_distance_km}km in {min_time_hours}h", | |
| description="Detects impossible travel patterns", | |
| rule_type=RuleType.GEOGRAPHIC, | |
| risk_level=risk_level, | |
| ) | |
| self.max_distance_km = max_distance_km | |
| self.min_time_hours = min_time_hours | |
| def evaluate( | |
| self, transaction: dict[str, Any], context: dict[str, Any] | |
| ) -> dict[str, Any]: | |
| """Check for impossible travel""" | |
| location = transaction.get("location", {}) | |
| timestamp = datetime.fromisoformat( | |
| transaction.get("timestamp", datetime.utcnow().isoformat()) | |
| ) | |
| last_transaction = context.get("last_transaction", {}) | |
| if not last_transaction: | |
| return { | |
| "triggered": False, | |
| "risk_score": 0, | |
| "reason": "No previous transaction", | |
| "details": {}, | |
| } | |
| last_location = last_transaction.get("location", {}) | |
| last_timestamp = datetime.fromisoformat(last_transaction.get("timestamp", "")) | |
| # Simplified distance calculation (should use haversine in production) | |
| distance_km = self._calculate_distance(location, last_location) | |
| time_diff_hours = (timestamp - last_timestamp).total_seconds() / 3600 | |
| # Check if travel is physically impossible | |
| required_speed = ( | |
| distance_km / time_diff_hours if time_diff_hours > 0 else float("inf") | |
| ) | |
| triggered = ( | |
| distance_km > self.max_distance_km and time_diff_hours < self.min_time_hours | |
| ) | |
| if triggered: | |
| self.triggered_count += 1 | |
| risk_score = ( | |
| min(100, int((distance_km / self.max_distance_km) * 80)) if triggered else 0 | |
| ) | |
| return { | |
| "triggered": triggered, | |
| "risk_score": risk_score, | |
| "reason": f"Impossible travel: {distance_km:.0f}km in {time_diff_hours:.1f}h (speed: {required_speed:.0f}km/h)", | |
| "details": { | |
| "distance_km": distance_km, | |
| "time_hours": time_diff_hours, | |
| "required_speed_kmh": required_speed, | |
| "from_location": last_location, | |
| "to_location": location, | |
| }, | |
| } | |
| def _calculate_distance(self, loc1: dict, loc2: dict) -> float: | |
| """Simplified distance calculation - use haversine in production""" | |
| if not loc1 or not loc2: | |
| return 0 | |
| lat1, lon1 = loc1.get("lat", 0), loc1.get("lon", 0) | |
| lat2, lon2 = loc2.get("lat", 0), loc2.get("lon", 0) | |
| # Simplified: 1 degree ≈ 111km | |
| return abs(lat1 - lat2) * 111 + abs(lon1 - lon2) * 111 | |
| class RuleEngine: | |
| """Main fraud detection rule engine""" | |
| def __init__(self): | |
| self.rules: list[FraudRule] = [] | |
| self.evaluation_count = 0 | |
| def add_rule(self, rule: FraudRule): | |
| """Add a rule to the engine""" | |
| self.rules.append(rule) | |
| def remove_rule(self, rule_id: str): | |
| """Remove a rule by ID""" | |
| self.rules = [r for r in self.rules if r.rule_id != rule_id] | |
| def get_rule(self, rule_id: str) -> FraudRule | None: | |
| """Get a specific rule""" | |
| return next((r for r in self.rules if r.rule_id == rule_id), None) | |
| def evaluate_transaction( | |
| self, transaction: dict[str, Any], context: dict[str, Any] | |
| ) -> dict[str, Any]: | |
| """ | |
| Evaluate a transaction against all enabled rules | |
| Returns: | |
| { | |
| "is_fraud": bool, | |
| "overall_risk_score": int (0-100), | |
| "triggered_rules": list[dict], | |
| "recommendations": list[str] | |
| } | |
| """ | |
| self.evaluation_count += 1 | |
| triggered_rules = [] | |
| total_risk_score = 0 | |
| for rule in self.rules: | |
| if not rule.enabled: | |
| continue | |
| result = rule.evaluate(transaction, context) | |
| if result["triggered"]: | |
| triggered_rules.append( | |
| { | |
| "rule_id": rule.rule_id, | |
| "rule_name": rule.name, | |
| "rule_type": rule.rule_type.value, | |
| "risk_level": rule.risk_level.value, | |
| "risk_score": result["risk_score"], | |
| "reason": result["reason"], | |
| "details": result["details"], | |
| } | |
| ) | |
| total_risk_score += result["risk_score"] | |
| # Cap at 100 | |
| overall_risk_score = min(100, total_risk_score) | |
| is_fraud = overall_risk_score >= 70 or any( | |
| r["risk_level"] == "critical" for r in triggered_rules | |
| ) | |
| # Generate recommendations | |
| recommendations = self._generate_recommendations( | |
| triggered_rules, overall_risk_score | |
| ) | |
| return { | |
| "is_fraud": is_fraud, | |
| "overall_risk_score": overall_risk_score, | |
| "triggered_rules": triggered_rules, | |
| "recommendations": recommendations, | |
| "evaluation_timestamp": datetime.utcnow().isoformat(), | |
| } | |
| def _generate_recommendations( | |
| self, triggered_rules: list[dict], risk_score: int | |
| ) -> list[str]: | |
| """Generate action recommendations based on results""" | |
| recommendations = [] | |
| if risk_score >= 90: | |
| recommendations.append( | |
| "IMMEDIATE ACTION: Block transaction and contact fraud team" | |
| ) | |
| recommendations.append("Freeze account pending investigation") | |
| elif risk_score >= 70: | |
| recommendations.append("HIGH RISK: Manual review required within 1 hour") | |
| recommendations.append("Contact customer for verification") | |
| elif risk_score >= 50: | |
| recommendations.append("MEDIUM RISK: Flag for review within 24 hours") | |
| recommendations.append("Add to watchlist") | |
| elif risk_score >= 30: | |
| recommendations.append("LOW RISK: Automated monitoring") | |
| # Specific recommendations based on rule types | |
| rule_types = {r["rule_type"] for r in triggered_rules} | |
| if "velocity" in rule_types: | |
| recommendations.append("Check for account takeover or card testing") | |
| if "geographic" in rule_types: | |
| recommendations.append("Verify customer location and recent activity") | |
| if "amount" in rule_types: | |
| recommendations.append("Confirm transaction with customer") | |
| return recommendations | |
| def get_stats(self) -> dict[str, Any]: | |
| """Get engine statistics""" | |
| return { | |
| "total_rules": len(self.rules), | |
| "enabled_rules": sum(1 for r in self.rules if r.enabled), | |
| "total_evaluations": self.evaluation_count, | |
| "rules_by_type": { | |
| rule_type.value: sum(1 for r in self.rules if r.rule_type == rule_type) | |
| for rule_type in RuleType | |
| }, | |
| "most_triggered_rules": sorted( | |
| [ | |
| {"rule_id": r.rule_id, "name": r.name, "count": r.triggered_count} | |
| for r in self.rules | |
| ], | |
| key=lambda x: x["count"], | |
| reverse=True, | |
| )[:5], | |
| } | |
| # Default rule set | |
| def create_default_rules() -> RuleEngine: | |
| """Create engine with default fraud detection rules""" | |
| engine = RuleEngine() | |
| # Velocity rules | |
| engine.add_rule( | |
| VelocityRule( | |
| "VEL001", | |
| max_transactions=5, | |
| time_window_minutes=5, | |
| risk_level=RiskLevel.HIGH, | |
| ) | |
| ) | |
| engine.add_rule( | |
| VelocityRule( | |
| "VEL002", | |
| max_transactions=10, | |
| time_window_minutes=60, | |
| risk_level=RiskLevel.MEDIUM, | |
| ) | |
| ) | |
| # Amount rules | |
| engine.add_rule( | |
| AmountThresholdRule( | |
| "AMT001", threshold_amount=10000.0, risk_level=RiskLevel.HIGH | |
| ) | |
| ) | |
| engine.add_rule( | |
| AmountThresholdRule( | |
| "AMT002", threshold_amount=50000.0, risk_level=RiskLevel.CRITICAL | |
| ) | |
| ) | |
| # Geographic rules | |
| engine.add_rule( | |
| GeographicAnomalyRule( | |
| "GEO001", | |
| max_distance_km=500, | |
| min_time_hours=1, | |
| risk_level=RiskLevel.CRITICAL, | |
| ) | |
| ) | |
| return engine | |