import logging from datetime import datetime from typing import Any logger = logging.getLogger(__name__) class BehaviorBaselineService: """ Advanced Behavior Analytics for detecting deviations from "normal" patterns. Ref: 2025 Forensic Trends (Proactive Monitoring) """ def __init__(self, db_session): self.db = db_session async def get_account_baseline(self, account_id: str) -> dict[str, Any]: """ Calculates a statistical baseline for an account. """ logger.info(f"Calculating behavior baseline for account {account_id}") # Real implementation would run statistical aggregates over the last 90 days # Mocking the baseline results return { "account_id": account_id, "avg_transaction_value": 1250.0, "volatility_index": 0.15, "geographic_footprint": ["USA", "UK"], "peak_activity_hours": [9, 10, 11, 14, 15], "merchant_categories": ["GROCERY", "UTILITIES", "TECHNOLOGY"], "last_calculated": datetime.utcnow().isoformat(), } async def detect_anomalies( self, account_id: str, current_transactions: list[dict] ) -> list[dict]: """ Compares current transactions against the baseline. """ baseline = await self.get_account_baseline(account_id) anomalies = [] for tx in current_transactions: tx_value = tx.get("amount", 0) # Simple threshold check if tx_value > (baseline["avg_transaction_value"] * 5): anomalies.append( { "transaction_id": tx.get("id"), "type": "VALUE_SPIKE", "severity": "HIGH", "reason": f"Value {tx_value} is 5x higher than baseline average.", } ) # Simple Geo check if tx.get("location") not in baseline["geographic_footprint"]: anomalies.append( { "transaction_id": tx.get("id"), "type": "GEO_ANOMALY", "severity": "MEDIUM", "reason": f"Transaction from {tx.get('location')} is outside typical footprint.", } ) return anomalies async def track_comingling_ratio( self, account_id: str, transactions: list[dict[str, Any]], initial_personal_buffer: float = 1000.0, ) -> dict[str, Any]: """ Implementation of the LIBR Engine (Lowest Intermediate Balance Rule). Tracks co-mingled personal/business funds to identify illicit source dependency. """ logger.info(f"Applying LIBR analysis for account {account_id}") # 1. Classify transactions # (Simplified classification for simulation) buffer = initial_personal_buffer illicit_dependency = 0.0 business_survival_total = 0.0 points = [] sorted_txs = sorted( transactions, key=lambda x: x.get("timestamp") or x.get("date") or datetime.now(), ) for tx in sorted_txs: amount = abs(tx.get("amount", 0)) is_credit = tx.get("transaction_type") == "CREDIT" # Simplified heuristic for personal vs business desc = tx.get("description", "").upper() is_personal = any( term in desc for term in ["GROCERY", "CINEMA", "REST", "PHARMACY", "RENT"] ) is_business = any( term in desc for term in ["PAYROLL", "SUPPLY", "INVOICE", "VENDOR"] ) if is_credit: if is_personal: buffer += amount else: # Business income - does not count towards LIBR "Personal Buffer" pass else: # DEBIT if is_business: business_survival_total += amount if buffer >= amount: # Business expense covered by personal float (Co-mingling) buffer -= amount else: # Buffer exhausted! Illicit/Business funds must be filling the gap shortfall = amount - buffer illicit_dependency += shortfall buffer = 0 else: # Personal expense buffer = max(0, buffer - amount) points.append( { "date": tx.get("date"), "buffer": buffer, "dependency_ratio": ( (illicit_dependency / business_survival_total) if business_survival_total > 0 else 0 ), } ) ratio = ( (illicit_dependency / business_survival_total) if business_survival_total > 0 else 0 ) return { "account_id": account_id, "comingling_ratio": ratio, "total_illicit_infusion": illicit_dependency, "status": ( "CRITICAL_DEPENDENCY" if ratio > 0.5 else "HIGH_CO_MINGLING" if ratio > 0.1 else "HEALTHY" ), "heatmap_data": points, "libr_verdict": f"Business survived on {ratio:.1%} illicit/external float after personal buffer exhaustion.", } def get_behavior_service(db): return BehaviorBaselineService(db)