File size: 4,381 Bytes
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4ae946d
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4ae946d
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4ae946d
 
 
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import logging
from collections import defaultdict
from dataclasses import dataclass
from typing import Any

from core.plugin_system import PluginContext, PluginInterface, PluginMetadata

logger = logging.getLogger(__name__)


@dataclass
class ShellCompanyConfig:
    min_transaction_volume: float
    pass_through_threshold: float


@dataclass
class ShellCompanyAlert:
    merchant_name: str
    risk_score: float
    indicators: list[str]


def detect_shell_companies(
    transactions: list[dict[str, Any]],
) -> list[ShellCompanyAlert]:
    """
    Detects potential shell companies based on transaction patterns:
    - High velocity of funds (pass-through)
    - Round number transactions
    - Lack of typical business expenses (inferred)
    """
    alerts = []
    merchant_stats = defaultdict(
        lambda: {"inflow": 0.0, "outflow": 0.0, "count": 0, "round_amounts": 0}
    )

    for tx in transactions:
        merchant = tx.get("merchant_name")
        if not merchant:
            continue

        amount = float(tx.get("amount", 0))
        tx_type = tx.get("type")

        stats = merchant_stats[merchant]
        stats["count"] += 1

        if amount % 100 == 0:
            stats["round_amounts"] += 1

        if (
            tx_type == "CREDIT"
        ):  # Income for merchant (assuming merchant view or outgoing from user)
            # NOTE: In transaction list, CREDIT/DEBIT usually refers to the Account Holder.
            # If "merchant_name" is the counterparty.
            # DEBIT = User pays Merchant (Merchant Inflow)
            # CREDIT = Merchant pays User (Merchant Outflow)
            stats["inflow"] += amount
        else:
            stats["outflow"] += amount

    for merchant, stats in merchant_stats.items():
        indicators = []
        risk_score = 0.0

        # High volume of round numbers
        if stats["count"] > 2 and (stats["round_amounts"] / stats["count"]) > 0.8:
            indicators.append("High frequency of round amounts")
            risk_score += 30

        # Pass-through detection (Inflow ~= Outflow within margin)
        total_vol = stats["inflow"] + stats["outflow"]
        if total_vol > 1000:  # Min volume
            net_flow = abs(stats["inflow"] - stats["outflow"])
            if net_flow < (total_vol * 0.05):  # 5% retained only
                indicators.append("Pass-through account behavior")
                risk_score += 50

        if risk_score > 0:
            alerts.append(
                ShellCompanyAlert(
                    merchant_name=merchant,
                    risk_score=min(risk_score, 100),
                    indicators=indicators,
                )
            )

    return alerts


class ShellCompanyPlugin(PluginInterface):
    @property
    def metadata(self) -> PluginMetadata:
        return PluginMetadata(
            name="shell_company",
            version="1.0.0",
            namespace="zenith/detection/fraud/shell_company",
            author="Zenith Team",
            description="Detects potential shell companies",
            dependencies={},
            capabilities=["fraud_detection"],
            security_level="official",
            api_version="v1",
        )

    async def initialize(self, context: PluginContext) -> bool:
        self.context = context
        config_dict = (
            context.config
            if context.config
            else {"min_transaction_volume": 1000.0, "pass_through_threshold": 0.05}
        )
        self.config = ShellCompanyConfig(**config_dict)
        return True

    async def execute(self, inputs: dict[str, Any]) -> dict[str, Any]:
        transactions = inputs.get("transactions", [])

        alerts = detect_shell_companies(transactions)

        results = []
        for alert in alerts:
            results.append(
                {
                    "entity_name": alert.merchant_name,
                    "is_fraud": True,
                    "risk_score": alert.risk_score,
                    "confidence": 0.8,
                    "reason": f"Shell company indicators: {', '.join(alert.indicators)}",
                    "details": {"indicators": alert.indicators},
                }
            )

        return {"alerts": results}

    async def cleanup(self) -> None:
        pass

    def validate_config(self, config: dict[str, Any]) -> list[str]:
        return []