File size: 21,747 Bytes
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4ae946d
4a2ab42
 
 
4ae946d
4a2ab42
 
 
4ae946d
4a2ab42
 
 
4ae946d
4a2ab42
 
 
4ae946d
4a2ab42
 
 
4ae946d
4a2ab42
 
 
4ae946d
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4ae946d
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
4ae946d
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4ae946d
 
 
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
"""
Fraud Detection Engine - Rule-Based Detection System
Task 4.1 from Orchestration Plan

This module implements pattern-based fraud detection algorithms:
- Structuring detection (transactions split to avoid reporting thresholds)
- Round-trip transactions (circular money flow)
- Velocity checks (rapid transaction patterns)
- Risk scoring system (0-100)
"""

from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import Any


class FraudType(Enum):
    """Types of fraud patterns detected"""

    STRUCTURING = "structuring"
    ROUND_TRIP = "round_trip"
    VELOCITY = "velocity"
    UNUSUAL_PATTERN = "unusual_pattern"
    HIGH_RISK_JURISDICTION = "high_risk_jurisdiction"


@dataclass
class Transaction:
    """Transaction data model"""

    id: str
    amount: float
    timestamp: datetime
    source_account: str
    destination_account: str
    description: str
    merchant: str = ""
    category: str = ""
    metadata: dict[str, Any] = None


@dataclass
class FraudAlert:
    """Fraud detection alert"""

    alert_id: str
    fraud_type: FraudType
    risk_score: int  # 0-100
    confidence: float  # 0.0-1.0
    transactions: list[str]  # Transaction IDs
    description: str
    detected_at: datetime
    details: dict[str, Any]


class FraudDetectionEngine:
    """
    Rule-based fraud detection engine

    Detects common fraud patterns in transaction data:
    - Structuring: Multiple transactions just below reporting threshold
    - Round-trip: Money flowing in a circle back to origin
    - Velocity: Too many transactions in short time period
    """

    # Default thresholds (fallback if DB fails or empty)
    DEFAULT_CONFIG = {
        "STRUCTURING_THRESHOLD": 10000,
        "STRUCTURING_WINDOW_HOURS": 24,
        "STRUCTURING_MIN_TRANSACTIONS": 3,
        "VELOCITY_MAX_TRANSACTIONS": 10,
        "VELOCITY_WINDOW_MINUTES": 60,
        "ROUND_TRIP_MAX_HOPS": 5,
        "ROUND_TRIP_TIME_WINDOW_HOURS": 72,
    }

    def __init__(self, db_session=None):
        self.alerts: list[FraudAlert] = []
        self.db = db_session
        self._load_config()

    def _load_config(self):
        """Load configuration from DB or use defaults"""
        self.config = dict(self.DEFAULT_CONFIG)

        if self.db:
            try:
                from core.database import FraudRule

                rules = self.db.query(FraudRule).filter(FraudRule.is_active).all()
                for rule in rules:
                    if rule.rule_id in self.config:
                        # Simple casting based on value_type
                        if rule.value_type == "int":
                            self.config[rule.rule_id] = int(rule.value)
                        elif rule.value_type == "float":
                            self.config[rule.rule_id] = float(rule.value)
                        elif rule.value_type == "json":
                            import json

                            self.config[rule.rule_id] = json.loads(rule.value)
            except Exception:
                # Fallback to defaults if DB fails
                pass

    @property
    def structuring_threshold(self):
        return self.config["STRUCTURING_THRESHOLD"]

    @property
    def structuring_window_hours(self):
        return self.config["STRUCTURING_WINDOW_HOURS"]

    @property
    def structuring_min_transactions(self):
        return self.config["STRUCTURING_MIN_TRANSACTIONS"]

    @property
    def velocity_max_transactions(self):
        return self.config["VELOCITY_MAX_TRANSACTIONS"]

    @property
    def velocity_window_minutes(self):
        return self.config["VELOCITY_WINDOW_MINUTES"]

    @property
    def round_trip_max_hops(self):
        return self.config["ROUND_TRIP_MAX_HOPS"]

    @property
    def round_trip_time_window_hours(self):
        return self.config["ROUND_TRIP_TIME_WINDOW_HOURS"]

    def analyze_transactions(self, transactions: list[Transaction]) -> list[FraudAlert]:
        """
        Analyze a batch of transactions for fraud patterns

        Args:
            transactions: List of transactions to analyze

        Returns:
            List of fraud alerts detected

        Raises:
            ValueError: If transactions is empty or None
            TypeError: If transactions is not a list or contains invalid items
        """
        # Input validation
        if transactions is None:
            raise ValueError("Transactions list cannot be None")

        if not isinstance(transactions, list):
            raise TypeError(f"Transactions must be a list, got {type(transactions).__name__}")

        if len(transactions) == 0:
            raise ValueError("Transactions list cannot be empty")

        # Validate each transaction
        for i, tx in enumerate(transactions):
            if not isinstance(tx, Transaction):
                raise TypeError(f"Transaction at index {i} must be a Transaction instance, got {type(tx).__name__}")

        self.alerts = []

        # Run all detection algorithms
        self.alerts.extend(self._detect_structuring(transactions))
        self.alerts.extend(self._detect_velocity(transactions))
        self.alerts.extend(self._detect_round_trips(transactions))

        return self.alerts

    def _detect_structuring(self, transactions: list[Transaction]) -> list[FraudAlert]:
        """
        Detect structuring: Multiple transactions just below reporting threshold
        """
        account_groups = self._group_transactions_by_account_pairs(transactions)
        return self._analyze_account_groups_for_structuring(account_groups)

    def _group_transactions_by_account_pairs(
        self, transactions: list[Transaction]
    ) -> dict[tuple[str, str], list[Transaction]]:
        """Group transactions by account pairs for structuring analysis"""
        account_groups: dict[tuple[str, str], list[Transaction]] = {}

        for tx in transactions:
            account_pair = tuple(sorted([tx.source_account, tx.destination_account]))

            if account_pair not in account_groups:
                account_groups[account_pair] = []
            account_groups[account_pair].append(tx)

        return account_groups

    def _analyze_account_groups_for_structuring(
        self, account_groups: dict[tuple[str, str], list[Transaction]]
    ) -> list[FraudAlert]:
        """Analyze grouped transactions for structuring patterns"""
        alerts = []

        for account_pair, txs in account_groups.items():
            txs.sort(key=lambda x: x.timestamp)

            structuring_windows = self._find_structuring_windows(txs)

            for window_data in structuring_windows:
                if self._is_significant_structuring(window_data):
                    alert = self._create_structuring_alert(account_pair, window_data)
                    alerts.append(alert)

        return alerts

    def _find_structuring_windows(self, transactions: list[Transaction]) -> list[dict[str, Any]]:
        """Find windows of transactions that may indicate structuring"""
        windows = []

        for i in range(len(transactions)):
            window_txs = []
            window_start = transactions[i].timestamp
            total_amount = 0

            # Collect transactions within time window
            for tx in transactions[i:]:
                if self._is_within_structuring_window(tx, window_start):
                    if self._is_suspicious_amount(tx.amount):
                        window_txs.append(tx)
                        total_amount += tx.amount
                else:
                    break

            if window_txs:
                windows.append(
                    {
                        "transactions": window_txs,
                        "total_amount": total_amount,
                        "start_time": window_start,
                        "end_time": window_txs[-1].timestamp,
                    }
                )

        return windows

    def _is_within_structuring_window(self, transaction: Transaction, window_start) -> bool:
        """Check if transaction is within structuring time window"""
        return (transaction.timestamp - window_start).total_seconds() / 3600 <= self.STRUCTURING_WINDOW_HOURS

    def _is_suspicious_amount(self, amount: float) -> bool:
        """Check if transaction amount is suspiciously close to threshold"""
        return 0.8 * self.STRUCTURING_THRESHOLD <= amount < self.STRUCTURING_THRESHOLD

    def _is_significant_structuring(self, window_data: dict[str, Any]) -> bool:
        """Determine if a transaction window represents significant structuring"""
        txs = window_data["transactions"]
        total_amount = window_data["total_amount"]

        return len(txs) >= self.STRUCTURING_MIN_TRANSACTIONS and total_amount >= self.STRUCTURING_THRESHOLD

    def _create_structuring_alert(self, account_pair: tuple[str, str], window_data: dict[str, Any]) -> FraudAlert:
        """Create a structuring fraud alert"""
        txs = window_data["transactions"]
        total_amount = window_data["total_amount"]

        # Calculate risk score based on various factors
        base_score = 70  # Structuring is inherently suspicious
        proximity_bonus = self._calculate_proximity_bonus(txs)
        count_bonus = min(len(txs) * 5, 30)  # Up to 30 points for many transactions

        risk_score = min(base_score + proximity_bonus + count_bonus, 100)

        return FraudAlert(
            alert_id=f"STRUCTURING_{account_pair[0]}_{account_pair[1]}_{int(window_data['start_time'].timestamp())}",
            case_id=None,  # Will be assigned by caller
            title="Potential Structuring Detected",
            description=self._generate_structuring_description(txs, total_amount),
            severity=self._calculate_severity(risk_score),
            risk_score=risk_score,
            alert_type="structuring",
            entities=[
                {"type": "account", "value": account_pair[0], "confidence": 0.9},
                {"type": "account", "value": account_pair[1], "confidence": 0.9},
            ],
            metadata={
                "transaction_count": len(txs),
                "total_amount": total_amount,
                "time_window_hours": self.STRUCTURING_WINDOW_HOURS,
                "threshold": self.STRUCTURING_THRESHOLD,
            },
            confidence=0.85,
            detection_method="transaction_pattern_analysis",
        )

    def _calculate_proximity_bonus(self, transactions: list[Transaction]) -> int:
        """Calculate bonus based on how close amounts are to threshold"""
        if not transactions:
            return 0

        # Average proximity to threshold (0-40 points)
        proximities = []
        for tx in transactions:
            if tx.amount < self.STRUCTURING_THRESHOLD:
                proximity = (self.STRUCTURING_THRESHOLD - tx.amount) / self.STRUCTURING_THRESHOLD
                proximities.append(proximity)

        avg_proximity = sum(proximities) / len(proximities) if proximities else 0
        return int(avg_proximity * 40)

    def _generate_structuring_description(self, transactions: list[Transaction], total_amount: float) -> str:
        """Generate human-readable description of structuring pattern"""
        count = len(transactions)
        avg_amount = total_amount / count

        return (
            f"Detected {count} transactions totaling ${total_amount:,.2f} "
            f"(avg: ${avg_amount:,.2f}) within {self.STRUCTURING_WINDOW_HOURS} hours. "
            f"All amounts suspiciously close to ${self.STRUCTURING_THRESHOLD:,.0f} CTR threshold."
        )

    def _calculate_severity(self, risk_score: int) -> str:
        """Convert risk score to severity level"""
        if risk_score >= 90:
            return "critical"
        elif risk_score >= 75:
            return "high"
        elif risk_score >= 60:
            return "medium"
        else:
            return "low"

        return alerts

    def _detect_velocity(self, transactions: list[Transaction]) -> list[FraudAlert]:
        """
        Detect velocity fraud: Too many transactions in short time period

        Pattern: Unusual burst of transaction activity that may indicate:
        - Account takeover
        - Card testing
        - Automated fraud
        """
        alerts = []

        # Group by source account
        account_txs: dict[str, list[Transaction]] = {}
        for tx in transactions:
            if tx.source_account not in account_txs:
                account_txs[tx.source_account] = []
            account_txs[tx.source_account].append(tx)

        # Check each account for velocity issues
        for account, txs in account_txs.items():
            txs.sort(key=lambda x: x.timestamp)

            # Sliding window to detect bursts
            for i in range(len(txs)):
                window_start = txs[i].timestamp
                window_txs = []

                for tx in txs[i:]:
                    if (tx.timestamp - window_start).total_seconds() / 60 <= self.VELOCITY_WINDOW_MINUTES:
                        window_txs.append(tx)
                    else:
                        break

                # Alert if too many transactions in window
                if len(window_txs) >= self.VELOCITY_MAX_TRANSACTIONS:
                    # Calculate risk score based on velocity
                    base_score = 50
                    velocity_bonus = min((len(window_txs) - self.VELOCITY_MAX_TRANSACTIONS) * 5, 40)

                    # Bonus for very short time window
                    actual_window_minutes = (window_txs[-1].timestamp - window_txs[0].timestamp).total_seconds() / 60
                    time_bonus = int((1 - actual_window_minutes / self.VELOCITY_WINDOW_MINUTES) * 10)

                    risk_score = min(base_score + velocity_bonus + time_bonus, 100)

                    alert = FraudAlert(
                        alert_id=f"VEL_{account}_{window_txs[0].id}",
                        fraud_type=FraudType.VELOCITY,
                        risk_score=risk_score,
                        confidence=0.75,
                        transactions=[tx.id for tx in window_txs],
                        description=f"High velocity detected: {len(window_txs)} transactions from account {account} "
                        f"in {actual_window_minutes:.1f} minutes",
                        detected_at=datetime.now(),
                        details={
                            "account": account,
                            "transaction_count": len(window_txs),
                            "window_minutes": actual_window_minutes,
                            "threshold": self.VELOCITY_MAX_TRANSACTIONS,
                        },
                    )
                    alerts.append(alert)
                    break  # One alert per account per window

        return alerts

    def _detect_round_trips(self, transactions: list[Transaction]) -> list[FraudAlert]:
        """
        Detect round-trip transactions: Money flowing in a circle

        Pattern: A β†’ B β†’ C β†’ A (money returns to origin)
        Often used for:
        - Money laundering (layering phase)
        - Creating false business activity
        - Tax evasion
        """
        alerts = []

        # Build transaction graph
        graph: dict[str, list[tuple[str, Transaction]]] = {}
        for tx in transactions:
            if tx.source_account not in graph:
                graph[tx.source_account] = []
            graph[tx.source_account].append((tx.destination_account, tx))

        # Find cycles using DFS
        def find_cycle(start: str, current: str, path: list[Transaction], visited: set) -> list[Transaction]:
            """Find cycle from current node back to start"""
            if current == start and len(path) > 1:
                return path  # Found cycle!

            if current in visited or len(path) > self.ROUND_TRIP_MAX_HOPS:
                return []

            visited.add(current)

            if current in graph:
                for next_account, tx in graph[current]:
                    # Check if within time window
                    if (
                        path
                        and (tx.timestamp - path[0].timestamp).total_seconds() / 3600
                        > self.ROUND_TRIP_TIME_WINDOW_HOURS
                    ):
                        continue

                    result = find_cycle(start, next_account, [*path, tx], visited.copy())
                    if result:
                        return result

            return []

        # Check each account for round trips
        checked_cycles = set()
        for start_account in graph:
            cycle = find_cycle(start_account, start_account, [], set())

            if cycle:
                # Create unique cycle identifier
                cycle_id = "_".join(sorted([tx.id for tx in cycle]))

                if cycle_id not in checked_cycles:
                    checked_cycles.add(cycle_id)

                    total_amount = sum(tx.amount for tx in cycle)

                    # Calculate risk score
                    base_score = 70  # Round trips are inherently suspicious
                    hop_bonus = len(cycle) * 5  # More hops = more suspicious
                    amount_bonus = min(int(total_amount / 10000) * 5, 20)

                    risk_score = min(base_score + hop_bonus + amount_bonus, 100)

                    # Build path description
                    path_desc = " β†’ ".join([cycle[0].source_account] + [tx.destination_account for tx in cycle])

                    alert = FraudAlert(
                        alert_id=f"ROUND_{cycle_id}",
                        fraud_type=FraudType.ROUND_TRIP,
                        risk_score=risk_score,
                        confidence=0.90,
                        transactions=[tx.id for tx in cycle],
                        description=f"Round-trip detected: {len(cycle)} transactions forming cycle {path_desc}, "
                        f"total amount: ${total_amount:,.2f}",
                        detected_at=datetime.now(),
                        details={
                            "cycle_length": len(cycle),
                            "total_amount": total_amount,
                            "path": path_desc,
                            "time_span_hours": (cycle[-1].timestamp - cycle[0].timestamp).total_seconds() / 3600,
                        },
                    )
                    alerts.append(alert)

        return alerts

    def calculate_overall_risk(self, account: str, transactions: list[Transaction]) -> int:
        """
        Calculate overall risk score for an account (0-100)

        Considers:
        - Number of fraud alerts
        - Severity of alerts
        - Transaction patterns
        """
        account_txs = [tx for tx in transactions if tx.source_account == account or tx.destination_account == account]

        if not account_txs:
            return 0

        # Analyze account's transactions
        alerts = self.analyze_transactions(account_txs)
        account_alerts = [a for a in alerts if account in str(a.details)]

        if not account_alerts:
            return 10  # Base risk for any active account

        # Weighted average of alert scores
        total_score = sum(alert.risk_score * alert.confidence for alert in account_alerts)
        avg_score = int(total_score / len(account_alerts))

        # Bonus for multiple different fraud types
        fraud_types = len({a.fraud_type for a in account_alerts})
        diversity_bonus = min(fraud_types * 10, 30)

        return min(avg_score + diversity_bonus, 100)


# Example usage and testing
if __name__ == "__main__":
    # Create test transactions
    test_txs = [
        # Structuring example: 3 transactions just below $10k threshold
        Transaction(
            "tx1",
            9900,
            datetime.now() - timedelta(hours=2),
            "ACC001",
            "ACC002",
            "Payment 1",
        ),
        Transaction(
            "tx2",
            9800,
            datetime.now() - timedelta(hours=1),
            "ACC001",
            "ACC002",
            "Payment 2",
        ),
        Transaction("tx3", 9700, datetime.now(), "ACC001", "ACC002", "Payment 3"),
        # Velocity example: 12 transactions in 30 minutes
        *[
            Transaction(
                f"vtx{i}",
                100,
                datetime.now() - timedelta(minutes=30 - i * 2),
                "ACC003",
                f"SHOP{i}",
                f"Purchase {i}",
            )
            for i in range(12)
        ],
        # Round trip example: A β†’ B β†’ C β†’ A
        Transaction(
            "rtx1",
            5000,
            datetime.now() - timedelta(hours=48),
            "ACC004",
            "ACC005",
            "Transfer",
        ),
        Transaction(
            "rtx2",
            4800,
            datetime.now() - timedelta(hours=24),
            "ACC005",
            "ACC006",
            "Payment",
        ),
        Transaction("rtx3", 4600, datetime.now(), "ACC006", "ACC004", "Return"),
    ]

    # Run detection
    engine = FraudDetectionEngine()
    alerts = engine.analyze_transactions(test_txs)

    print(f"\n🚨 Detected {len(alerts)} fraud alerts:\n")
    for alert in alerts:
        print(f"Alert ID: {alert.alert_id}")
        print(f"Type: {alert.fraud_type.value}")
        print(f"Risk Score: {alert.risk_score}/100")
        print(f"Confidence: {alert.confidence:.0%}")
        print(f"Description: {alert.description}")
        print(f"Transactions: {len(alert.transactions)}")
        print("-" * 80)