zenith-backend / app /services /intelligence /fraud_detection_engine.py
teoat's picture
Upload folder using huggingface_hub
4ae946d verified
"""
Fraud Detection Engine - Rule-Based Detection System
Task 4.1 from Orchestration Plan
This module implements pattern-based fraud detection algorithms:
- Structuring detection (transactions split to avoid reporting thresholds)
- Round-trip transactions (circular money flow)
- Velocity checks (rapid transaction patterns)
- Risk scoring system (0-100)
"""
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import Any
class FraudType(Enum):
"""Types of fraud patterns detected"""
STRUCTURING = "structuring"
ROUND_TRIP = "round_trip"
VELOCITY = "velocity"
UNUSUAL_PATTERN = "unusual_pattern"
HIGH_RISK_JURISDICTION = "high_risk_jurisdiction"
@dataclass
class Transaction:
"""Transaction data model"""
id: str
amount: float
timestamp: datetime
source_account: str
destination_account: str
description: str
merchant: str = ""
category: str = ""
metadata: dict[str, Any] = None
@dataclass
class FraudAlert:
"""Fraud detection alert"""
alert_id: str
fraud_type: FraudType
risk_score: int # 0-100
confidence: float # 0.0-1.0
transactions: list[str] # Transaction IDs
description: str
detected_at: datetime
details: dict[str, Any]
class FraudDetectionEngine:
"""
Rule-based fraud detection engine
Detects common fraud patterns in transaction data:
- Structuring: Multiple transactions just below reporting threshold
- Round-trip: Money flowing in a circle back to origin
- Velocity: Too many transactions in short time period
"""
# Default thresholds (fallback if DB fails or empty)
DEFAULT_CONFIG = {
"STRUCTURING_THRESHOLD": 10000,
"STRUCTURING_WINDOW_HOURS": 24,
"STRUCTURING_MIN_TRANSACTIONS": 3,
"VELOCITY_MAX_TRANSACTIONS": 10,
"VELOCITY_WINDOW_MINUTES": 60,
"ROUND_TRIP_MAX_HOPS": 5,
"ROUND_TRIP_TIME_WINDOW_HOURS": 72,
}
def __init__(self, db_session=None):
self.alerts: list[FraudAlert] = []
self.db = db_session
self._load_config()
def _load_config(self):
"""Load configuration from DB or use defaults"""
self.config = dict(self.DEFAULT_CONFIG)
if self.db:
try:
from core.database import FraudRule
rules = self.db.query(FraudRule).filter(FraudRule.is_active).all()
for rule in rules:
if rule.rule_id in self.config:
# Simple casting based on value_type
if rule.value_type == "int":
self.config[rule.rule_id] = int(rule.value)
elif rule.value_type == "float":
self.config[rule.rule_id] = float(rule.value)
elif rule.value_type == "json":
import json
self.config[rule.rule_id] = json.loads(rule.value)
except Exception:
# Fallback to defaults if DB fails
pass
@property
def structuring_threshold(self):
return self.config["STRUCTURING_THRESHOLD"]
@property
def structuring_window_hours(self):
return self.config["STRUCTURING_WINDOW_HOURS"]
@property
def structuring_min_transactions(self):
return self.config["STRUCTURING_MIN_TRANSACTIONS"]
@property
def velocity_max_transactions(self):
return self.config["VELOCITY_MAX_TRANSACTIONS"]
@property
def velocity_window_minutes(self):
return self.config["VELOCITY_WINDOW_MINUTES"]
@property
def round_trip_max_hops(self):
return self.config["ROUND_TRIP_MAX_HOPS"]
@property
def round_trip_time_window_hours(self):
return self.config["ROUND_TRIP_TIME_WINDOW_HOURS"]
def analyze_transactions(self, transactions: list[Transaction]) -> list[FraudAlert]:
"""
Analyze a batch of transactions for fraud patterns
Args:
transactions: List of transactions to analyze
Returns:
List of fraud alerts detected
Raises:
ValueError: If transactions is empty or None
TypeError: If transactions is not a list or contains invalid items
"""
# Input validation
if transactions is None:
raise ValueError("Transactions list cannot be None")
if not isinstance(transactions, list):
raise TypeError(f"Transactions must be a list, got {type(transactions).__name__}")
if len(transactions) == 0:
raise ValueError("Transactions list cannot be empty")
# Validate each transaction
for i, tx in enumerate(transactions):
if not isinstance(tx, Transaction):
raise TypeError(f"Transaction at index {i} must be a Transaction instance, got {type(tx).__name__}")
self.alerts = []
# Run all detection algorithms
self.alerts.extend(self._detect_structuring(transactions))
self.alerts.extend(self._detect_velocity(transactions))
self.alerts.extend(self._detect_round_trips(transactions))
return self.alerts
def _detect_structuring(self, transactions: list[Transaction]) -> list[FraudAlert]:
"""
Detect structuring: Multiple transactions just below reporting threshold
"""
account_groups = self._group_transactions_by_account_pairs(transactions)
return self._analyze_account_groups_for_structuring(account_groups)
def _group_transactions_by_account_pairs(
self, transactions: list[Transaction]
) -> dict[tuple[str, str], list[Transaction]]:
"""Group transactions by account pairs for structuring analysis"""
account_groups: dict[tuple[str, str], list[Transaction]] = {}
for tx in transactions:
account_pair = tuple(sorted([tx.source_account, tx.destination_account]))
if account_pair not in account_groups:
account_groups[account_pair] = []
account_groups[account_pair].append(tx)
return account_groups
def _analyze_account_groups_for_structuring(
self, account_groups: dict[tuple[str, str], list[Transaction]]
) -> list[FraudAlert]:
"""Analyze grouped transactions for structuring patterns"""
alerts = []
for account_pair, txs in account_groups.items():
txs.sort(key=lambda x: x.timestamp)
structuring_windows = self._find_structuring_windows(txs)
for window_data in structuring_windows:
if self._is_significant_structuring(window_data):
alert = self._create_structuring_alert(account_pair, window_data)
alerts.append(alert)
return alerts
def _find_structuring_windows(self, transactions: list[Transaction]) -> list[dict[str, Any]]:
"""Find windows of transactions that may indicate structuring"""
windows = []
for i in range(len(transactions)):
window_txs = []
window_start = transactions[i].timestamp
total_amount = 0
# Collect transactions within time window
for tx in transactions[i:]:
if self._is_within_structuring_window(tx, window_start):
if self._is_suspicious_amount(tx.amount):
window_txs.append(tx)
total_amount += tx.amount
else:
break
if window_txs:
windows.append(
{
"transactions": window_txs,
"total_amount": total_amount,
"start_time": window_start,
"end_time": window_txs[-1].timestamp,
}
)
return windows
def _is_within_structuring_window(self, transaction: Transaction, window_start) -> bool:
"""Check if transaction is within structuring time window"""
return (transaction.timestamp - window_start).total_seconds() / 3600 <= self.STRUCTURING_WINDOW_HOURS
def _is_suspicious_amount(self, amount: float) -> bool:
"""Check if transaction amount is suspiciously close to threshold"""
return 0.8 * self.STRUCTURING_THRESHOLD <= amount < self.STRUCTURING_THRESHOLD
def _is_significant_structuring(self, window_data: dict[str, Any]) -> bool:
"""Determine if a transaction window represents significant structuring"""
txs = window_data["transactions"]
total_amount = window_data["total_amount"]
return len(txs) >= self.STRUCTURING_MIN_TRANSACTIONS and total_amount >= self.STRUCTURING_THRESHOLD
def _create_structuring_alert(self, account_pair: tuple[str, str], window_data: dict[str, Any]) -> FraudAlert:
"""Create a structuring fraud alert"""
txs = window_data["transactions"]
total_amount = window_data["total_amount"]
# Calculate risk score based on various factors
base_score = 70 # Structuring is inherently suspicious
proximity_bonus = self._calculate_proximity_bonus(txs)
count_bonus = min(len(txs) * 5, 30) # Up to 30 points for many transactions
risk_score = min(base_score + proximity_bonus + count_bonus, 100)
return FraudAlert(
alert_id=f"STRUCTURING_{account_pair[0]}_{account_pair[1]}_{int(window_data['start_time'].timestamp())}",
case_id=None, # Will be assigned by caller
title="Potential Structuring Detected",
description=self._generate_structuring_description(txs, total_amount),
severity=self._calculate_severity(risk_score),
risk_score=risk_score,
alert_type="structuring",
entities=[
{"type": "account", "value": account_pair[0], "confidence": 0.9},
{"type": "account", "value": account_pair[1], "confidence": 0.9},
],
metadata={
"transaction_count": len(txs),
"total_amount": total_amount,
"time_window_hours": self.STRUCTURING_WINDOW_HOURS,
"threshold": self.STRUCTURING_THRESHOLD,
},
confidence=0.85,
detection_method="transaction_pattern_analysis",
)
def _calculate_proximity_bonus(self, transactions: list[Transaction]) -> int:
"""Calculate bonus based on how close amounts are to threshold"""
if not transactions:
return 0
# Average proximity to threshold (0-40 points)
proximities = []
for tx in transactions:
if tx.amount < self.STRUCTURING_THRESHOLD:
proximity = (self.STRUCTURING_THRESHOLD - tx.amount) / self.STRUCTURING_THRESHOLD
proximities.append(proximity)
avg_proximity = sum(proximities) / len(proximities) if proximities else 0
return int(avg_proximity * 40)
def _generate_structuring_description(self, transactions: list[Transaction], total_amount: float) -> str:
"""Generate human-readable description of structuring pattern"""
count = len(transactions)
avg_amount = total_amount / count
return (
f"Detected {count} transactions totaling ${total_amount:,.2f} "
f"(avg: ${avg_amount:,.2f}) within {self.STRUCTURING_WINDOW_HOURS} hours. "
f"All amounts suspiciously close to ${self.STRUCTURING_THRESHOLD:,.0f} CTR threshold."
)
def _calculate_severity(self, risk_score: int) -> str:
"""Convert risk score to severity level"""
if risk_score >= 90:
return "critical"
elif risk_score >= 75:
return "high"
elif risk_score >= 60:
return "medium"
else:
return "low"
return alerts
def _detect_velocity(self, transactions: list[Transaction]) -> list[FraudAlert]:
"""
Detect velocity fraud: Too many transactions in short time period
Pattern: Unusual burst of transaction activity that may indicate:
- Account takeover
- Card testing
- Automated fraud
"""
alerts = []
# Group by source account
account_txs: dict[str, list[Transaction]] = {}
for tx in transactions:
if tx.source_account not in account_txs:
account_txs[tx.source_account] = []
account_txs[tx.source_account].append(tx)
# Check each account for velocity issues
for account, txs in account_txs.items():
txs.sort(key=lambda x: x.timestamp)
# Sliding window to detect bursts
for i in range(len(txs)):
window_start = txs[i].timestamp
window_txs = []
for tx in txs[i:]:
if (tx.timestamp - window_start).total_seconds() / 60 <= self.VELOCITY_WINDOW_MINUTES:
window_txs.append(tx)
else:
break
# Alert if too many transactions in window
if len(window_txs) >= self.VELOCITY_MAX_TRANSACTIONS:
# Calculate risk score based on velocity
base_score = 50
velocity_bonus = min((len(window_txs) - self.VELOCITY_MAX_TRANSACTIONS) * 5, 40)
# Bonus for very short time window
actual_window_minutes = (window_txs[-1].timestamp - window_txs[0].timestamp).total_seconds() / 60
time_bonus = int((1 - actual_window_minutes / self.VELOCITY_WINDOW_MINUTES) * 10)
risk_score = min(base_score + velocity_bonus + time_bonus, 100)
alert = FraudAlert(
alert_id=f"VEL_{account}_{window_txs[0].id}",
fraud_type=FraudType.VELOCITY,
risk_score=risk_score,
confidence=0.75,
transactions=[tx.id for tx in window_txs],
description=f"High velocity detected: {len(window_txs)} transactions from account {account} "
f"in {actual_window_minutes:.1f} minutes",
detected_at=datetime.now(),
details={
"account": account,
"transaction_count": len(window_txs),
"window_minutes": actual_window_minutes,
"threshold": self.VELOCITY_MAX_TRANSACTIONS,
},
)
alerts.append(alert)
break # One alert per account per window
return alerts
def _detect_round_trips(self, transactions: list[Transaction]) -> list[FraudAlert]:
"""
Detect round-trip transactions: Money flowing in a circle
Pattern: A → B → C → A (money returns to origin)
Often used for:
- Money laundering (layering phase)
- Creating false business activity
- Tax evasion
"""
alerts = []
# Build transaction graph
graph: dict[str, list[tuple[str, Transaction]]] = {}
for tx in transactions:
if tx.source_account not in graph:
graph[tx.source_account] = []
graph[tx.source_account].append((tx.destination_account, tx))
# Find cycles using DFS
def find_cycle(start: str, current: str, path: list[Transaction], visited: set) -> list[Transaction]:
"""Find cycle from current node back to start"""
if current == start and len(path) > 1:
return path # Found cycle!
if current in visited or len(path) > self.ROUND_TRIP_MAX_HOPS:
return []
visited.add(current)
if current in graph:
for next_account, tx in graph[current]:
# Check if within time window
if (
path
and (tx.timestamp - path[0].timestamp).total_seconds() / 3600
> self.ROUND_TRIP_TIME_WINDOW_HOURS
):
continue
result = find_cycle(start, next_account, [*path, tx], visited.copy())
if result:
return result
return []
# Check each account for round trips
checked_cycles = set()
for start_account in graph:
cycle = find_cycle(start_account, start_account, [], set())
if cycle:
# Create unique cycle identifier
cycle_id = "_".join(sorted([tx.id for tx in cycle]))
if cycle_id not in checked_cycles:
checked_cycles.add(cycle_id)
total_amount = sum(tx.amount for tx in cycle)
# Calculate risk score
base_score = 70 # Round trips are inherently suspicious
hop_bonus = len(cycle) * 5 # More hops = more suspicious
amount_bonus = min(int(total_amount / 10000) * 5, 20)
risk_score = min(base_score + hop_bonus + amount_bonus, 100)
# Build path description
path_desc = " → ".join([cycle[0].source_account] + [tx.destination_account for tx in cycle])
alert = FraudAlert(
alert_id=f"ROUND_{cycle_id}",
fraud_type=FraudType.ROUND_TRIP,
risk_score=risk_score,
confidence=0.90,
transactions=[tx.id for tx in cycle],
description=f"Round-trip detected: {len(cycle)} transactions forming cycle {path_desc}, "
f"total amount: ${total_amount:,.2f}",
detected_at=datetime.now(),
details={
"cycle_length": len(cycle),
"total_amount": total_amount,
"path": path_desc,
"time_span_hours": (cycle[-1].timestamp - cycle[0].timestamp).total_seconds() / 3600,
},
)
alerts.append(alert)
return alerts
def calculate_overall_risk(self, account: str, transactions: list[Transaction]) -> int:
"""
Calculate overall risk score for an account (0-100)
Considers:
- Number of fraud alerts
- Severity of alerts
- Transaction patterns
"""
account_txs = [tx for tx in transactions if tx.source_account == account or tx.destination_account == account]
if not account_txs:
return 0
# Analyze account's transactions
alerts = self.analyze_transactions(account_txs)
account_alerts = [a for a in alerts if account in str(a.details)]
if not account_alerts:
return 10 # Base risk for any active account
# Weighted average of alert scores
total_score = sum(alert.risk_score * alert.confidence for alert in account_alerts)
avg_score = int(total_score / len(account_alerts))
# Bonus for multiple different fraud types
fraud_types = len({a.fraud_type for a in account_alerts})
diversity_bonus = min(fraud_types * 10, 30)
return min(avg_score + diversity_bonus, 100)
# Example usage and testing
if __name__ == "__main__":
# Create test transactions
test_txs = [
# Structuring example: 3 transactions just below $10k threshold
Transaction(
"tx1",
9900,
datetime.now() - timedelta(hours=2),
"ACC001",
"ACC002",
"Payment 1",
),
Transaction(
"tx2",
9800,
datetime.now() - timedelta(hours=1),
"ACC001",
"ACC002",
"Payment 2",
),
Transaction("tx3", 9700, datetime.now(), "ACC001", "ACC002", "Payment 3"),
# Velocity example: 12 transactions in 30 minutes
*[
Transaction(
f"vtx{i}",
100,
datetime.now() - timedelta(minutes=30 - i * 2),
"ACC003",
f"SHOP{i}",
f"Purchase {i}",
)
for i in range(12)
],
# Round trip example: A → B → C → A
Transaction(
"rtx1",
5000,
datetime.now() - timedelta(hours=48),
"ACC004",
"ACC005",
"Transfer",
),
Transaction(
"rtx2",
4800,
datetime.now() - timedelta(hours=24),
"ACC005",
"ACC006",
"Payment",
),
Transaction("rtx3", 4600, datetime.now(), "ACC006", "ACC004", "Return"),
]
# Run detection
engine = FraudDetectionEngine()
alerts = engine.analyze_transactions(test_txs)
print(f"\n🚨 Detected {len(alerts)} fraud alerts:\n")
for alert in alerts:
print(f"Alert ID: {alert.alert_id}")
print(f"Type: {alert.fraud_type.value}")
print(f"Risk Score: {alert.risk_score}/100")
print(f"Confidence: {alert.confidence:.0%}")
print(f"Description: {alert.description}")
print(f"Transactions: {len(alert.transactions)}")
print("-" * 80)