File size: 5,731 Bytes
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4ae946d
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4ae946d
 
 
 
 
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4ae946d
 
 
 
 
4a2ab42
 
 
4ae946d
 
 
 
 
4a2ab42
 
 
 
 
4ae946d
 
 
 
 
4a2ab42
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import logging
from datetime import datetime
from typing import Any

logger = logging.getLogger(__name__)


class BehaviorBaselineService:
    """
    Advanced Behavior Analytics for detecting deviations from "normal" patterns.
    Ref: 2025 Forensic Trends (Proactive Monitoring)
    """

    def __init__(self, db_session):
        self.db = db_session

    async def get_account_baseline(self, account_id: str) -> dict[str, Any]:
        """
        Calculates a statistical baseline for an account.
        """
        logger.info(f"Calculating behavior baseline for account {account_id}")

        # Real implementation would run statistical aggregates over the last 90 days
        # Mocking the baseline results
        return {
            "account_id": account_id,
            "avg_transaction_value": 1250.0,
            "volatility_index": 0.15,
            "geographic_footprint": ["USA", "UK"],
            "peak_activity_hours": [9, 10, 11, 14, 15],
            "merchant_categories": ["GROCERY", "UTILITIES", "TECHNOLOGY"],
            "last_calculated": datetime.utcnow().isoformat(),
        }

    async def detect_anomalies(
        self, account_id: str, current_transactions: list[dict]
    ) -> list[dict]:
        """
        Compares current transactions against the baseline.
        """
        baseline = await self.get_account_baseline(account_id)
        anomalies = []

        for tx in current_transactions:
            tx_value = tx.get("amount", 0)
            # Simple threshold check
            if tx_value > (baseline["avg_transaction_value"] * 5):
                anomalies.append(
                    {
                        "transaction_id": tx.get("id"),
                        "type": "VALUE_SPIKE",
                        "severity": "HIGH",
                        "reason": f"Value {tx_value} is 5x higher than baseline average.",
                    }
                )

            # Simple Geo check
            if tx.get("location") not in baseline["geographic_footprint"]:
                anomalies.append(
                    {
                        "transaction_id": tx.get("id"),
                        "type": "GEO_ANOMALY",
                        "severity": "MEDIUM",
                        "reason": f"Transaction from {tx.get('location')} is outside typical footprint.",
                    }
                )

        return anomalies

    async def track_comingling_ratio(
        self,
        account_id: str,
        transactions: list[dict[str, Any]],
        initial_personal_buffer: float = 1000.0,
    ) -> dict[str, Any]:
        """
        Implementation of the LIBR Engine (Lowest Intermediate Balance Rule).
        Tracks co-mingled personal/business funds to identify illicit source dependency.
        """
        logger.info(f"Applying LIBR analysis for account {account_id}")

        # 1. Classify transactions
        # (Simplified classification for simulation)
        buffer = initial_personal_buffer
        illicit_dependency = 0.0
        business_survival_total = 0.0
        points = []

        sorted_txs = sorted(
            transactions,
            key=lambda x: x.get("timestamp") or x.get("date") or datetime.now(),
        )

        for tx in sorted_txs:
            amount = abs(tx.get("amount", 0))
            is_credit = tx.get("transaction_type") == "CREDIT"

            # Simplified heuristic for personal vs business
            desc = tx.get("description", "").upper()
            is_personal = any(
                term in desc
                for term in ["GROCERY", "CINEMA", "REST", "PHARMACY", "RENT"]
            )
            is_business = any(
                term in desc for term in ["PAYROLL", "SUPPLY", "INVOICE", "VENDOR"]
            )

            if is_credit:
                if is_personal:
                    buffer += amount
                else:
                    # Business income - does not count towards LIBR "Personal Buffer"
                    pass
            else:  # DEBIT
                if is_business:
                    business_survival_total += amount
                    if buffer >= amount:
                        # Business expense covered by personal float (Co-mingling)
                        buffer -= amount
                    else:
                        # Buffer exhausted! Illicit/Business funds must be filling the gap
                        shortfall = amount - buffer
                        illicit_dependency += shortfall
                        buffer = 0
                else:  # Personal expense
                    buffer = max(0, buffer - amount)

            points.append(
                {
                    "date": tx.get("date"),
                    "buffer": buffer,
                    "dependency_ratio": (
                        (illicit_dependency / business_survival_total)
                        if business_survival_total > 0
                        else 0
                    ),
                }
            )

        ratio = (
            (illicit_dependency / business_survival_total)
            if business_survival_total > 0
            else 0
        )

        return {
            "account_id": account_id,
            "comingling_ratio": ratio,
            "total_illicit_infusion": illicit_dependency,
            "status": (
                "CRITICAL_DEPENDENCY"
                if ratio > 0.5
                else "HIGH_CO_MINGLING" if ratio > 0.1 else "HEALTHY"
            ),
            "heatmap_data": points,
            "libr_verdict": f"Business survived on {ratio:.1%} illicit/external float after personal buffer exhaustion.",
        }


def get_behavior_service(db):
    return BehaviorBaselineService(db)