Spaces:
Paused
Paused
File size: 5,731 Bytes
4a2ab42 4ae946d 4a2ab42 4ae946d 4a2ab42 4ae946d 4a2ab42 4ae946d 4a2ab42 4ae946d 4a2ab42 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 | import logging
from datetime import datetime
from typing import Any
logger = logging.getLogger(__name__)
class BehaviorBaselineService:
"""
Advanced Behavior Analytics for detecting deviations from "normal" patterns.
Ref: 2025 Forensic Trends (Proactive Monitoring)
"""
def __init__(self, db_session):
self.db = db_session
async def get_account_baseline(self, account_id: str) -> dict[str, Any]:
"""
Calculates a statistical baseline for an account.
"""
logger.info(f"Calculating behavior baseline for account {account_id}")
# Real implementation would run statistical aggregates over the last 90 days
# Mocking the baseline results
return {
"account_id": account_id,
"avg_transaction_value": 1250.0,
"volatility_index": 0.15,
"geographic_footprint": ["USA", "UK"],
"peak_activity_hours": [9, 10, 11, 14, 15],
"merchant_categories": ["GROCERY", "UTILITIES", "TECHNOLOGY"],
"last_calculated": datetime.utcnow().isoformat(),
}
async def detect_anomalies(
self, account_id: str, current_transactions: list[dict]
) -> list[dict]:
"""
Compares current transactions against the baseline.
"""
baseline = await self.get_account_baseline(account_id)
anomalies = []
for tx in current_transactions:
tx_value = tx.get("amount", 0)
# Simple threshold check
if tx_value > (baseline["avg_transaction_value"] * 5):
anomalies.append(
{
"transaction_id": tx.get("id"),
"type": "VALUE_SPIKE",
"severity": "HIGH",
"reason": f"Value {tx_value} is 5x higher than baseline average.",
}
)
# Simple Geo check
if tx.get("location") not in baseline["geographic_footprint"]:
anomalies.append(
{
"transaction_id": tx.get("id"),
"type": "GEO_ANOMALY",
"severity": "MEDIUM",
"reason": f"Transaction from {tx.get('location')} is outside typical footprint.",
}
)
return anomalies
async def track_comingling_ratio(
self,
account_id: str,
transactions: list[dict[str, Any]],
initial_personal_buffer: float = 1000.0,
) -> dict[str, Any]:
"""
Implementation of the LIBR Engine (Lowest Intermediate Balance Rule).
Tracks co-mingled personal/business funds to identify illicit source dependency.
"""
logger.info(f"Applying LIBR analysis for account {account_id}")
# 1. Classify transactions
# (Simplified classification for simulation)
buffer = initial_personal_buffer
illicit_dependency = 0.0
business_survival_total = 0.0
points = []
sorted_txs = sorted(
transactions,
key=lambda x: x.get("timestamp") or x.get("date") or datetime.now(),
)
for tx in sorted_txs:
amount = abs(tx.get("amount", 0))
is_credit = tx.get("transaction_type") == "CREDIT"
# Simplified heuristic for personal vs business
desc = tx.get("description", "").upper()
is_personal = any(
term in desc
for term in ["GROCERY", "CINEMA", "REST", "PHARMACY", "RENT"]
)
is_business = any(
term in desc for term in ["PAYROLL", "SUPPLY", "INVOICE", "VENDOR"]
)
if is_credit:
if is_personal:
buffer += amount
else:
# Business income - does not count towards LIBR "Personal Buffer"
pass
else: # DEBIT
if is_business:
business_survival_total += amount
if buffer >= amount:
# Business expense covered by personal float (Co-mingling)
buffer -= amount
else:
# Buffer exhausted! Illicit/Business funds must be filling the gap
shortfall = amount - buffer
illicit_dependency += shortfall
buffer = 0
else: # Personal expense
buffer = max(0, buffer - amount)
points.append(
{
"date": tx.get("date"),
"buffer": buffer,
"dependency_ratio": (
(illicit_dependency / business_survival_total)
if business_survival_total > 0
else 0
),
}
)
ratio = (
(illicit_dependency / business_survival_total)
if business_survival_total > 0
else 0
)
return {
"account_id": account_id,
"comingling_ratio": ratio,
"total_illicit_infusion": illicit_dependency,
"status": (
"CRITICAL_DEPENDENCY"
if ratio > 0.5
else "HIGH_CO_MINGLING" if ratio > 0.1 else "HEALTHY"
),
"heatmap_data": points,
"libr_verdict": f"Business survived on {ratio:.1%} illicit/external float after personal buffer exhaustion.",
}
def get_behavior_service(db):
return BehaviorBaselineService(db)
|