File size: 5,372 Bytes
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import logging
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime
from typing import Any

from core.plugin_system import PluginContext, PluginInterface, PluginMetadata

logger = logging.getLogger(__name__)


@dataclass
class StructuringConfig:
    reporting_limit: float
    structuring_threshold: float
    time_window_days: int


@dataclass
class StructuringAlert:
    entity_name: str
    confidence: float
    pattern_type: str
    transaction_count: int
    total_amount: float
    transaction_ids: list[str]


def detect_structuring(
    transactions: list[dict[str, Any]],
    reporting_limit: float = 10000.0,
    structuring_threshold: float = 0.9,
    time_window_days: int = 30,
) -> list[StructuringAlert]:
    """
    Detects structuring (smurfing): multiple transactions just below reporting limit.
    """
    alerts = []
    # Group by customer
    customer_txs = defaultdict(list)
    for tx in transactions:
        cust = tx.get("customer_id")
        if cust:
            customer_txs[cust].append(tx)

    reporting_limit * (1.0 - structuring_threshold)  # e.g. 10% below = 9000
    # Actually usually it means just below, e.g. 9000-9999.
    # structuring_threshold 0.9 means we care about tx > 90% of limit?
    # Or strict just-below? Let's assume txs in range [Limit * Threshold, Limit)

    threshold_val = reporting_limit * structuring_threshold

    for customer, txs in customer_txs.items():
        # Filter for "just below" transactions
        suspicious_txs = []
        for tx in txs:
            amt = float(tx.get("amount", 0))
            if threshold_val <= amt < reporting_limit:
                suspicious_txs.append(tx)

        # If multiple suspicious txs found
        if len(suspicious_txs) >= 2:
            # Check time window (simplified: max-min date < window)
            dates = []
            for t in suspicious_txs:
                d = t.get("date")
                if isinstance(d, str):
                    try:
                        d = datetime.fromisoformat(d.replace("Z", "+00:00"))
                        dates.append(d)
                    except Exception:
                        pass

            if dates:
                dates.sort()
                window = (dates[-1] - dates[0]).days
                if window <= time_window_days:
                    total_amt = sum(float(t.get("amount", 0)) for t in suspicious_txs)

                    # If total exceeds limit, it's classic structuring to avoid reporting
                    if total_amt > reporting_limit:
                        alerts.append(
                            StructuringAlert(
                                entity_name=customer,
                                confidence=0.85 + (0.05 * len(suspicious_txs)),
                                pattern_type="smurfing_just_below_limit",
                                transaction_count=len(suspicious_txs),
                                total_amount=total_amt,
                                transaction_ids=[t.get("id") for t in suspicious_txs],
                            )
                        )

    return alerts


class StructuringPlugin(PluginInterface):
    @property
    def metadata(self) -> PluginMetadata:
        return PluginMetadata(
            name="structuring",
            version="1.0.0",
            namespace="zenith/detection/fraud/structuring",
            author="Zenith Team",
            description="Detects transaction structuring (smurfing)",
            dependencies={},
            capabilities=["fraud_detection"],
            security_level="official",
            api_version="v1",
        )

    async def initialize(self, context: PluginContext) -> bool:
        self.context = context
        config_dict = (
            context.config
            if context.config
            else {
                "reporting_limit": 10000.0,
                "structuring_threshold": 0.85,  # Look for txs > 8500
                "time_window_days": 7,
            }
        )
        self.config = StructuringConfig(**config_dict)
        return True

    async def execute(self, inputs: dict[str, Any]) -> dict[str, Any]:
        transactions = inputs.get("transactions", [])

        alerts = detect_structuring(
            transactions,
            reporting_limit=self.config.reporting_limit,
            structuring_threshold=self.config.structuring_threshold,
            time_window_days=self.config.time_window_days,
        )

        results = []
        for alert in alerts:
            results.append(
                {
                    "entity_name": alert.entity_name,
                    "is_fraud": True,
                    "risk_score": 85.0,
                    "confidence": alert.confidence,
                    "reason": f"Structuring detected ({alert.pattern_type}). {alert.transaction_count} transactions totaling {alert.total_amount}",
                    "details": {
                        "total_amount": alert.total_amount,
                        "count": alert.transaction_count,
                        "pattern": alert.pattern_type,
                    },
                }
            )

        return {"alerts": results}

    async def cleanup(self) -> None:
        pass

    def validate_config(self, config: dict[str, Any]) -> list[str]:
        return []