Spaces:
Paused
Paused
| """ | |
| Fraud Detection Engine - Rule-Based Detection System | |
| Task 4.1 from Orchestration Plan | |
| This module implements pattern-based fraud detection algorithms: | |
| - Structuring detection (transactions split to avoid reporting thresholds) | |
| - Round-trip transactions (circular money flow) | |
| - Velocity checks (rapid transaction patterns) | |
| - Risk scoring system (0-100) | |
| """ | |
| from dataclasses import dataclass | |
| from datetime import datetime, timedelta | |
| from enum import Enum | |
| from typing import Any | |
| class FraudType(Enum): | |
| """Types of fraud patterns detected""" | |
| STRUCTURING = "structuring" | |
| ROUND_TRIP = "round_trip" | |
| VELOCITY = "velocity" | |
| UNUSUAL_PATTERN = "unusual_pattern" | |
| HIGH_RISK_JURISDICTION = "high_risk_jurisdiction" | |
| class Transaction: | |
| """Transaction data model""" | |
| id: str | |
| amount: float | |
| timestamp: datetime | |
| source_account: str | |
| destination_account: str | |
| description: str | |
| merchant: str = "" | |
| category: str = "" | |
| metadata: dict[str, Any] = None | |
| class FraudAlert: | |
| """Fraud detection alert""" | |
| alert_id: str | |
| fraud_type: FraudType | |
| risk_score: int # 0-100 | |
| confidence: float # 0.0-1.0 | |
| transactions: list[str] # Transaction IDs | |
| description: str | |
| detected_at: datetime | |
| details: dict[str, Any] | |
| class FraudDetectionEngine: | |
| """ | |
| Rule-based fraud detection engine | |
| Detects common fraud patterns in transaction data: | |
| - Structuring: Multiple transactions just below reporting threshold | |
| - Round-trip: Money flowing in a circle back to origin | |
| - Velocity: Too many transactions in short time period | |
| """ | |
| # Default thresholds (fallback if DB fails or empty) | |
| DEFAULT_CONFIG = { | |
| "STRUCTURING_THRESHOLD": 10000, | |
| "STRUCTURING_WINDOW_HOURS": 24, | |
| "STRUCTURING_MIN_TRANSACTIONS": 3, | |
| "VELOCITY_MAX_TRANSACTIONS": 10, | |
| "VELOCITY_WINDOW_MINUTES": 60, | |
| "ROUND_TRIP_MAX_HOPS": 5, | |
| "ROUND_TRIP_TIME_WINDOW_HOURS": 72, | |
| } | |
| def __init__(self, db_session=None): | |
| self.alerts: list[FraudAlert] = [] | |
| self.db = db_session | |
| self._load_config() | |
| def _load_config(self): | |
| """Load configuration from DB or use defaults""" | |
| self.config = dict(self.DEFAULT_CONFIG) | |
| if self.db: | |
| try: | |
| from core.database import FraudRule | |
| rules = self.db.query(FraudRule).filter(FraudRule.is_active).all() | |
| for rule in rules: | |
| if rule.rule_id in self.config: | |
| # Simple casting based on value_type | |
| if rule.value_type == "int": | |
| self.config[rule.rule_id] = int(rule.value) | |
| elif rule.value_type == "float": | |
| self.config[rule.rule_id] = float(rule.value) | |
| elif rule.value_type == "json": | |
| import json | |
| self.config[rule.rule_id] = json.loads(rule.value) | |
| except Exception: | |
| # Fallback to defaults if DB fails | |
| pass | |
| def structuring_threshold(self): | |
| return self.config["STRUCTURING_THRESHOLD"] | |
| def structuring_window_hours(self): | |
| return self.config["STRUCTURING_WINDOW_HOURS"] | |
| def structuring_min_transactions(self): | |
| return self.config["STRUCTURING_MIN_TRANSACTIONS"] | |
| def velocity_max_transactions(self): | |
| return self.config["VELOCITY_MAX_TRANSACTIONS"] | |
| def velocity_window_minutes(self): | |
| return self.config["VELOCITY_WINDOW_MINUTES"] | |
| def round_trip_max_hops(self): | |
| return self.config["ROUND_TRIP_MAX_HOPS"] | |
| def round_trip_time_window_hours(self): | |
| return self.config["ROUND_TRIP_TIME_WINDOW_HOURS"] | |
| def analyze_transactions(self, transactions: list[Transaction]) -> list[FraudAlert]: | |
| """ | |
| Analyze a batch of transactions for fraud patterns | |
| Args: | |
| transactions: List of transactions to analyze | |
| Returns: | |
| List of fraud alerts detected | |
| Raises: | |
| ValueError: If transactions is empty or None | |
| TypeError: If transactions is not a list or contains invalid items | |
| """ | |
| # Input validation | |
| if transactions is None: | |
| raise ValueError("Transactions list cannot be None") | |
| if not isinstance(transactions, list): | |
| raise TypeError(f"Transactions must be a list, got {type(transactions).__name__}") | |
| if len(transactions) == 0: | |
| raise ValueError("Transactions list cannot be empty") | |
| # Validate each transaction | |
| for i, tx in enumerate(transactions): | |
| if not isinstance(tx, Transaction): | |
| raise TypeError(f"Transaction at index {i} must be a Transaction instance, got {type(tx).__name__}") | |
| self.alerts = [] | |
| # Run all detection algorithms | |
| self.alerts.extend(self._detect_structuring(transactions)) | |
| self.alerts.extend(self._detect_velocity(transactions)) | |
| self.alerts.extend(self._detect_round_trips(transactions)) | |
| return self.alerts | |
| def _detect_structuring(self, transactions: list[Transaction]) -> list[FraudAlert]: | |
| """ | |
| Detect structuring: Multiple transactions just below reporting threshold | |
| """ | |
| account_groups = self._group_transactions_by_account_pairs(transactions) | |
| return self._analyze_account_groups_for_structuring(account_groups) | |
| def _group_transactions_by_account_pairs( | |
| self, transactions: list[Transaction] | |
| ) -> dict[tuple[str, str], list[Transaction]]: | |
| """Group transactions by account pairs for structuring analysis""" | |
| account_groups: dict[tuple[str, str], list[Transaction]] = {} | |
| for tx in transactions: | |
| account_pair = tuple(sorted([tx.source_account, tx.destination_account])) | |
| if account_pair not in account_groups: | |
| account_groups[account_pair] = [] | |
| account_groups[account_pair].append(tx) | |
| return account_groups | |
| def _analyze_account_groups_for_structuring( | |
| self, account_groups: dict[tuple[str, str], list[Transaction]] | |
| ) -> list[FraudAlert]: | |
| """Analyze grouped transactions for structuring patterns""" | |
| alerts = [] | |
| for account_pair, txs in account_groups.items(): | |
| txs.sort(key=lambda x: x.timestamp) | |
| structuring_windows = self._find_structuring_windows(txs) | |
| for window_data in structuring_windows: | |
| if self._is_significant_structuring(window_data): | |
| alert = self._create_structuring_alert(account_pair, window_data) | |
| alerts.append(alert) | |
| return alerts | |
| def _find_structuring_windows(self, transactions: list[Transaction]) -> list[dict[str, Any]]: | |
| """Find windows of transactions that may indicate structuring""" | |
| windows = [] | |
| for i in range(len(transactions)): | |
| window_txs = [] | |
| window_start = transactions[i].timestamp | |
| total_amount = 0 | |
| # Collect transactions within time window | |
| for tx in transactions[i:]: | |
| if self._is_within_structuring_window(tx, window_start): | |
| if self._is_suspicious_amount(tx.amount): | |
| window_txs.append(tx) | |
| total_amount += tx.amount | |
| else: | |
| break | |
| if window_txs: | |
| windows.append( | |
| { | |
| "transactions": window_txs, | |
| "total_amount": total_amount, | |
| "start_time": window_start, | |
| "end_time": window_txs[-1].timestamp, | |
| } | |
| ) | |
| return windows | |
| def _is_within_structuring_window(self, transaction: Transaction, window_start) -> bool: | |
| """Check if transaction is within structuring time window""" | |
| return (transaction.timestamp - window_start).total_seconds() / 3600 <= self.STRUCTURING_WINDOW_HOURS | |
| def _is_suspicious_amount(self, amount: float) -> bool: | |
| """Check if transaction amount is suspiciously close to threshold""" | |
| return 0.8 * self.STRUCTURING_THRESHOLD <= amount < self.STRUCTURING_THRESHOLD | |
| def _is_significant_structuring(self, window_data: dict[str, Any]) -> bool: | |
| """Determine if a transaction window represents significant structuring""" | |
| txs = window_data["transactions"] | |
| total_amount = window_data["total_amount"] | |
| return len(txs) >= self.STRUCTURING_MIN_TRANSACTIONS and total_amount >= self.STRUCTURING_THRESHOLD | |
| def _create_structuring_alert(self, account_pair: tuple[str, str], window_data: dict[str, Any]) -> FraudAlert: | |
| """Create a structuring fraud alert""" | |
| txs = window_data["transactions"] | |
| total_amount = window_data["total_amount"] | |
| # Calculate risk score based on various factors | |
| base_score = 70 # Structuring is inherently suspicious | |
| proximity_bonus = self._calculate_proximity_bonus(txs) | |
| count_bonus = min(len(txs) * 5, 30) # Up to 30 points for many transactions | |
| risk_score = min(base_score + proximity_bonus + count_bonus, 100) | |
| return FraudAlert( | |
| alert_id=f"STRUCTURING_{account_pair[0]}_{account_pair[1]}_{int(window_data['start_time'].timestamp())}", | |
| case_id=None, # Will be assigned by caller | |
| title="Potential Structuring Detected", | |
| description=self._generate_structuring_description(txs, total_amount), | |
| severity=self._calculate_severity(risk_score), | |
| risk_score=risk_score, | |
| alert_type="structuring", | |
| entities=[ | |
| {"type": "account", "value": account_pair[0], "confidence": 0.9}, | |
| {"type": "account", "value": account_pair[1], "confidence": 0.9}, | |
| ], | |
| metadata={ | |
| "transaction_count": len(txs), | |
| "total_amount": total_amount, | |
| "time_window_hours": self.STRUCTURING_WINDOW_HOURS, | |
| "threshold": self.STRUCTURING_THRESHOLD, | |
| }, | |
| confidence=0.85, | |
| detection_method="transaction_pattern_analysis", | |
| ) | |
| def _calculate_proximity_bonus(self, transactions: list[Transaction]) -> int: | |
| """Calculate bonus based on how close amounts are to threshold""" | |
| if not transactions: | |
| return 0 | |
| # Average proximity to threshold (0-40 points) | |
| proximities = [] | |
| for tx in transactions: | |
| if tx.amount < self.STRUCTURING_THRESHOLD: | |
| proximity = (self.STRUCTURING_THRESHOLD - tx.amount) / self.STRUCTURING_THRESHOLD | |
| proximities.append(proximity) | |
| avg_proximity = sum(proximities) / len(proximities) if proximities else 0 | |
| return int(avg_proximity * 40) | |
| def _generate_structuring_description(self, transactions: list[Transaction], total_amount: float) -> str: | |
| """Generate human-readable description of structuring pattern""" | |
| count = len(transactions) | |
| avg_amount = total_amount / count | |
| return ( | |
| f"Detected {count} transactions totaling ${total_amount:,.2f} " | |
| f"(avg: ${avg_amount:,.2f}) within {self.STRUCTURING_WINDOW_HOURS} hours. " | |
| f"All amounts suspiciously close to ${self.STRUCTURING_THRESHOLD:,.0f} CTR threshold." | |
| ) | |
| def _calculate_severity(self, risk_score: int) -> str: | |
| """Convert risk score to severity level""" | |
| if risk_score >= 90: | |
| return "critical" | |
| elif risk_score >= 75: | |
| return "high" | |
| elif risk_score >= 60: | |
| return "medium" | |
| else: | |
| return "low" | |
| return alerts | |
| def _detect_velocity(self, transactions: list[Transaction]) -> list[FraudAlert]: | |
| """ | |
| Detect velocity fraud: Too many transactions in short time period | |
| Pattern: Unusual burst of transaction activity that may indicate: | |
| - Account takeover | |
| - Card testing | |
| - Automated fraud | |
| """ | |
| alerts = [] | |
| # Group by source account | |
| account_txs: dict[str, list[Transaction]] = {} | |
| for tx in transactions: | |
| if tx.source_account not in account_txs: | |
| account_txs[tx.source_account] = [] | |
| account_txs[tx.source_account].append(tx) | |
| # Check each account for velocity issues | |
| for account, txs in account_txs.items(): | |
| txs.sort(key=lambda x: x.timestamp) | |
| # Sliding window to detect bursts | |
| for i in range(len(txs)): | |
| window_start = txs[i].timestamp | |
| window_txs = [] | |
| for tx in txs[i:]: | |
| if (tx.timestamp - window_start).total_seconds() / 60 <= self.VELOCITY_WINDOW_MINUTES: | |
| window_txs.append(tx) | |
| else: | |
| break | |
| # Alert if too many transactions in window | |
| if len(window_txs) >= self.VELOCITY_MAX_TRANSACTIONS: | |
| # Calculate risk score based on velocity | |
| base_score = 50 | |
| velocity_bonus = min((len(window_txs) - self.VELOCITY_MAX_TRANSACTIONS) * 5, 40) | |
| # Bonus for very short time window | |
| actual_window_minutes = (window_txs[-1].timestamp - window_txs[0].timestamp).total_seconds() / 60 | |
| time_bonus = int((1 - actual_window_minutes / self.VELOCITY_WINDOW_MINUTES) * 10) | |
| risk_score = min(base_score + velocity_bonus + time_bonus, 100) | |
| alert = FraudAlert( | |
| alert_id=f"VEL_{account}_{window_txs[0].id}", | |
| fraud_type=FraudType.VELOCITY, | |
| risk_score=risk_score, | |
| confidence=0.75, | |
| transactions=[tx.id for tx in window_txs], | |
| description=f"High velocity detected: {len(window_txs)} transactions from account {account} " | |
| f"in {actual_window_minutes:.1f} minutes", | |
| detected_at=datetime.now(), | |
| details={ | |
| "account": account, | |
| "transaction_count": len(window_txs), | |
| "window_minutes": actual_window_minutes, | |
| "threshold": self.VELOCITY_MAX_TRANSACTIONS, | |
| }, | |
| ) | |
| alerts.append(alert) | |
| break # One alert per account per window | |
| return alerts | |
| def _detect_round_trips(self, transactions: list[Transaction]) -> list[FraudAlert]: | |
| """ | |
| Detect round-trip transactions: Money flowing in a circle | |
| Pattern: A → B → C → A (money returns to origin) | |
| Often used for: | |
| - Money laundering (layering phase) | |
| - Creating false business activity | |
| - Tax evasion | |
| """ | |
| alerts = [] | |
| # Build transaction graph | |
| graph: dict[str, list[tuple[str, Transaction]]] = {} | |
| for tx in transactions: | |
| if tx.source_account not in graph: | |
| graph[tx.source_account] = [] | |
| graph[tx.source_account].append((tx.destination_account, tx)) | |
| # Find cycles using DFS | |
| def find_cycle(start: str, current: str, path: list[Transaction], visited: set) -> list[Transaction]: | |
| """Find cycle from current node back to start""" | |
| if current == start and len(path) > 1: | |
| return path # Found cycle! | |
| if current in visited or len(path) > self.ROUND_TRIP_MAX_HOPS: | |
| return [] | |
| visited.add(current) | |
| if current in graph: | |
| for next_account, tx in graph[current]: | |
| # Check if within time window | |
| if ( | |
| path | |
| and (tx.timestamp - path[0].timestamp).total_seconds() / 3600 | |
| > self.ROUND_TRIP_TIME_WINDOW_HOURS | |
| ): | |
| continue | |
| result = find_cycle(start, next_account, [*path, tx], visited.copy()) | |
| if result: | |
| return result | |
| return [] | |
| # Check each account for round trips | |
| checked_cycles = set() | |
| for start_account in graph: | |
| cycle = find_cycle(start_account, start_account, [], set()) | |
| if cycle: | |
| # Create unique cycle identifier | |
| cycle_id = "_".join(sorted([tx.id for tx in cycle])) | |
| if cycle_id not in checked_cycles: | |
| checked_cycles.add(cycle_id) | |
| total_amount = sum(tx.amount for tx in cycle) | |
| # Calculate risk score | |
| base_score = 70 # Round trips are inherently suspicious | |
| hop_bonus = len(cycle) * 5 # More hops = more suspicious | |
| amount_bonus = min(int(total_amount / 10000) * 5, 20) | |
| risk_score = min(base_score + hop_bonus + amount_bonus, 100) | |
| # Build path description | |
| path_desc = " → ".join([cycle[0].source_account] + [tx.destination_account for tx in cycle]) | |
| alert = FraudAlert( | |
| alert_id=f"ROUND_{cycle_id}", | |
| fraud_type=FraudType.ROUND_TRIP, | |
| risk_score=risk_score, | |
| confidence=0.90, | |
| transactions=[tx.id for tx in cycle], | |
| description=f"Round-trip detected: {len(cycle)} transactions forming cycle {path_desc}, " | |
| f"total amount: ${total_amount:,.2f}", | |
| detected_at=datetime.now(), | |
| details={ | |
| "cycle_length": len(cycle), | |
| "total_amount": total_amount, | |
| "path": path_desc, | |
| "time_span_hours": (cycle[-1].timestamp - cycle[0].timestamp).total_seconds() / 3600, | |
| }, | |
| ) | |
| alerts.append(alert) | |
| return alerts | |
| def calculate_overall_risk(self, account: str, transactions: list[Transaction]) -> int: | |
| """ | |
| Calculate overall risk score for an account (0-100) | |
| Considers: | |
| - Number of fraud alerts | |
| - Severity of alerts | |
| - Transaction patterns | |
| """ | |
| account_txs = [tx for tx in transactions if tx.source_account == account or tx.destination_account == account] | |
| if not account_txs: | |
| return 0 | |
| # Analyze account's transactions | |
| alerts = self.analyze_transactions(account_txs) | |
| account_alerts = [a for a in alerts if account in str(a.details)] | |
| if not account_alerts: | |
| return 10 # Base risk for any active account | |
| # Weighted average of alert scores | |
| total_score = sum(alert.risk_score * alert.confidence for alert in account_alerts) | |
| avg_score = int(total_score / len(account_alerts)) | |
| # Bonus for multiple different fraud types | |
| fraud_types = len({a.fraud_type for a in account_alerts}) | |
| diversity_bonus = min(fraud_types * 10, 30) | |
| return min(avg_score + diversity_bonus, 100) | |
| # Example usage and testing | |
| if __name__ == "__main__": | |
| # Create test transactions | |
| test_txs = [ | |
| # Structuring example: 3 transactions just below $10k threshold | |
| Transaction( | |
| "tx1", | |
| 9900, | |
| datetime.now() - timedelta(hours=2), | |
| "ACC001", | |
| "ACC002", | |
| "Payment 1", | |
| ), | |
| Transaction( | |
| "tx2", | |
| 9800, | |
| datetime.now() - timedelta(hours=1), | |
| "ACC001", | |
| "ACC002", | |
| "Payment 2", | |
| ), | |
| Transaction("tx3", 9700, datetime.now(), "ACC001", "ACC002", "Payment 3"), | |
| # Velocity example: 12 transactions in 30 minutes | |
| *[ | |
| Transaction( | |
| f"vtx{i}", | |
| 100, | |
| datetime.now() - timedelta(minutes=30 - i * 2), | |
| "ACC003", | |
| f"SHOP{i}", | |
| f"Purchase {i}", | |
| ) | |
| for i in range(12) | |
| ], | |
| # Round trip example: A → B → C → A | |
| Transaction( | |
| "rtx1", | |
| 5000, | |
| datetime.now() - timedelta(hours=48), | |
| "ACC004", | |
| "ACC005", | |
| "Transfer", | |
| ), | |
| Transaction( | |
| "rtx2", | |
| 4800, | |
| datetime.now() - timedelta(hours=24), | |
| "ACC005", | |
| "ACC006", | |
| "Payment", | |
| ), | |
| Transaction("rtx3", 4600, datetime.now(), "ACC006", "ACC004", "Return"), | |
| ] | |
| # Run detection | |
| engine = FraudDetectionEngine() | |
| alerts = engine.analyze_transactions(test_txs) | |
| print(f"\n🚨 Detected {len(alerts)} fraud alerts:\n") | |
| for alert in alerts: | |
| print(f"Alert ID: {alert.alert_id}") | |
| print(f"Type: {alert.fraud_type.value}") | |
| print(f"Risk Score: {alert.risk_score}/100") | |
| print(f"Confidence: {alert.confidence:.0%}") | |
| print(f"Description: {alert.description}") | |
| print(f"Transactions: {len(alert.transactions)}") | |
| print("-" * 80) | |