Spaces:
Paused
Paused
File size: 5,372 Bytes
4a2ab42 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 | import logging
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime
from typing import Any
from core.plugin_system import PluginContext, PluginInterface, PluginMetadata
logger = logging.getLogger(__name__)
@dataclass
class StructuringConfig:
reporting_limit: float
structuring_threshold: float
time_window_days: int
@dataclass
class StructuringAlert:
entity_name: str
confidence: float
pattern_type: str
transaction_count: int
total_amount: float
transaction_ids: list[str]
def detect_structuring(
transactions: list[dict[str, Any]],
reporting_limit: float = 10000.0,
structuring_threshold: float = 0.9,
time_window_days: int = 30,
) -> list[StructuringAlert]:
"""
Detects structuring (smurfing): multiple transactions just below reporting limit.
"""
alerts = []
# Group by customer
customer_txs = defaultdict(list)
for tx in transactions:
cust = tx.get("customer_id")
if cust:
customer_txs[cust].append(tx)
reporting_limit * (1.0 - structuring_threshold) # e.g. 10% below = 9000
# Actually usually it means just below, e.g. 9000-9999.
# structuring_threshold 0.9 means we care about tx > 90% of limit?
# Or strict just-below? Let's assume txs in range [Limit * Threshold, Limit)
threshold_val = reporting_limit * structuring_threshold
for customer, txs in customer_txs.items():
# Filter for "just below" transactions
suspicious_txs = []
for tx in txs:
amt = float(tx.get("amount", 0))
if threshold_val <= amt < reporting_limit:
suspicious_txs.append(tx)
# If multiple suspicious txs found
if len(suspicious_txs) >= 2:
# Check time window (simplified: max-min date < window)
dates = []
for t in suspicious_txs:
d = t.get("date")
if isinstance(d, str):
try:
d = datetime.fromisoformat(d.replace("Z", "+00:00"))
dates.append(d)
except Exception:
pass
if dates:
dates.sort()
window = (dates[-1] - dates[0]).days
if window <= time_window_days:
total_amt = sum(float(t.get("amount", 0)) for t in suspicious_txs)
# If total exceeds limit, it's classic structuring to avoid reporting
if total_amt > reporting_limit:
alerts.append(
StructuringAlert(
entity_name=customer,
confidence=0.85 + (0.05 * len(suspicious_txs)),
pattern_type="smurfing_just_below_limit",
transaction_count=len(suspicious_txs),
total_amount=total_amt,
transaction_ids=[t.get("id") for t in suspicious_txs],
)
)
return alerts
class StructuringPlugin(PluginInterface):
@property
def metadata(self) -> PluginMetadata:
return PluginMetadata(
name="structuring",
version="1.0.0",
namespace="zenith/detection/fraud/structuring",
author="Zenith Team",
description="Detects transaction structuring (smurfing)",
dependencies={},
capabilities=["fraud_detection"],
security_level="official",
api_version="v1",
)
async def initialize(self, context: PluginContext) -> bool:
self.context = context
config_dict = (
context.config
if context.config
else {
"reporting_limit": 10000.0,
"structuring_threshold": 0.85, # Look for txs > 8500
"time_window_days": 7,
}
)
self.config = StructuringConfig(**config_dict)
return True
async def execute(self, inputs: dict[str, Any]) -> dict[str, Any]:
transactions = inputs.get("transactions", [])
alerts = detect_structuring(
transactions,
reporting_limit=self.config.reporting_limit,
structuring_threshold=self.config.structuring_threshold,
time_window_days=self.config.time_window_days,
)
results = []
for alert in alerts:
results.append(
{
"entity_name": alert.entity_name,
"is_fraud": True,
"risk_score": 85.0,
"confidence": alert.confidence,
"reason": f"Structuring detected ({alert.pattern_type}). {alert.transaction_count} transactions totaling {alert.total_amount}",
"details": {
"total_amount": alert.total_amount,
"count": alert.transaction_count,
"pattern": alert.pattern_type,
},
}
)
return {"alerts": results}
async def cleanup(self) -> None:
pass
def validate_config(self, config: dict[str, Any]) -> list[str]:
return []
|