File size: 11,476 Bytes
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4ae946d
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
4ae946d
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4ae946d
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4ae946d
 
 
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4ae946d
 
 
 
 
4a2ab42
 
 
 
4ae946d
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4ae946d
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
4ae946d
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4ae946d
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
# services/fraud/engine.py
import asyncio
import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import UTC, datetime
from enum import Enum
from typing import Any

from core.plugin_system.registry import plugin_registry_service

# Imports for deleted rules removed


logger = logging.getLogger(__name__)


class AlertSeverity(Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"


@dataclass
class FraudAlert:
    """Base alert class for all fraud detection alerts"""

    alert_id: str
    rule_name: str
    severity: AlertSeverity
    confidence: float  # 0.0 to 1.0
    risk_score: float  # 0.0 to 100.0
    description: str
    detected_at: datetime
    case_id: str | None = None
    transaction_ids: list[str] = field(default_factory=list)
    entities: list[str] = field(
        default_factory=list
    )  # customer IDs, merchant names, etc.
    metadata: dict[str, Any] = field(default_factory=dict)
    recommendations: list[str] = field(default_factory=list)


class FraudRule(ABC):
    """Abstract base class for fraud detection rules"""

    def __init__(self, name: str, severity: AlertSeverity, enabled: bool = True):
        self.name = name
        self.severity = severity
        self.enabled = enabled
        self.last_run = None

    @abstractmethod
    async def execute(
        self, transactions: list[dict[str, Any]], context: dict[str, Any] | None = None
    ) -> list[FraudAlert]:
        """Execute the rule and return alerts"""

    @abstractmethod
    def get_config_schema(self) -> dict[str, Any]:
        """Return configuration schema for this rule"""


class PluginAdapterRule(FraudRule):
    """Adapter to make Plugins look like FraudRules"""

    def __init__(self, plugin_instance):
        metadata = plugin_instance.metadata
        super().__init__(metadata.name, AlertSeverity.HIGH)  # Default severity
        self.plugin = plugin_instance
        self.namespace = metadata.namespace

    async def execute(
        self, transactions: list[dict[str, Any]], context: dict[str, Any] | None = None
    ) -> list[FraudAlert]:
        # Plugins usually take a specific input format
        # We need to bridge the gap. Most of our plugins expect {"transactions": [...]}
        input_data = {"transactions": transactions, "context": context}

        try:
            # Execute plugin (async native)
            result = await self.plugin.execute(input_data)

            alerts = []
            if result and "alerts" in result:
                for plugin_alert in result["alerts"]:
                    # Dynamic Severity Mapping from centralized settings
                    from core.config import settings

                    risk_score = plugin_alert.get("risk_score", 0.0)
                    if risk_score >= settings.FRAUD_SCORE_CRITICAL:
                        severity = AlertSeverity.CRITICAL
                    elif risk_score >= settings.FRAUD_SCORE_HIGH:
                        severity = AlertSeverity.HIGH
                    elif risk_score >= settings.FRAUD_SCORE_MEDIUM:
                        severity = AlertSeverity.MEDIUM
                    else:
                        severity = AlertSeverity.LOW

                    # Convert plugin alert dict to FraudAlert object
                    alert = FraudAlert(
                        alert_id=f"PL_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}_{len(alerts)}",
                        rule_name=self.name,
                        severity=severity,
                        confidence=plugin_alert.get("confidence", 0.8),
                        risk_score=risk_score,
                        description=plugin_alert.get("reason", "Plugin detected fraud"),
                        detected_at=datetime.now(UTC),
                        metadata=plugin_alert.get("details", {}),
                        recommendations=["Review plugin findings"],
                    )
                    alerts.append(alert)
            return alerts

        except Exception as e:
            logger.error(f"Error executing plugin {self.name}: {e}")
            return []

    def get_config_schema(self) -> dict[str, Any]:
        return (
            self.plugin.metadata.config_schema
            if hasattr(self.plugin.metadata, "config_schema")
            else {}
        )


class RuleEngine:
    """Main fraud detection rule engine"""

    def __init__(self):
        self.rules: dict[str, FraudRule] = {}
        self.rule_registry: dict[str, type[FraudRule]] = {}
        self.execution_history: list[dict[str, Any]] = []
        self._register_builtin_rules()
        # Plugins loaded via async initialize() call

    def _register_builtin_rules(self):
        """Register built-in fraud detection rules (Legacy)"""
        # We keeping these for backward compatibility / shadow mode if needed,
        # but User asked to delete unused files.
        # We will remove them from here if implementation is now fully plugin-based.
        # But for safety, we might mistakenly successfully run plugins that are NOT yet fully working.
        # However, plan says "Move to plugins".
        # Let's rely on _load_plugin_rules primarily.

    async def initialize(self):
        """Async initialization to load plugin rules"""
        await self._load_plugin_rules()

    async def _load_plugin_rules(self):
        """Load rules from PluginRegistry"""
        # In a synchronous init, we can't await.
        # We might need a start() method or similar.
        # Or just run loop here for init.
        try:
            # Helper to fetch active detection plugins
            from core.database import SessionLocal
            from core.plugin_system.models import PluginRegistry as PluginRegistryModel

            db = SessionLocal()
            plugin_instances = []
            try:
                # Fetch all active plugins
                db_plugins = (
                    db.query(PluginRegistryModel)
                    .filter(PluginRegistryModel.status == "active")
                    .all()
                )

                for p in db_plugins:
                    if "fraud_detection" in p.metadata_json.get("capabilities", []):
                        # Load instance
                        instance = await plugin_registry_service.get_plugin(
                            p.plugin_id, db
                        )
                        plugin_instances.append(instance)
            except Exception as e:
                logger.error(f"Failed to load plugins from DB: {e}")
            finally:
                db.close()

            for p in plugin_instances:
                adapter = PluginAdapterRule(p)
                self.register_rule(adapter)

        except Exception as e:
            logger.error(f"Failed to load plugin rules: {e}")

    def register_rule(self, rule: FraudRule):
        """Register a fraud detection rule"""
        self.rules[rule.name] = rule
        logger.info(f"Registered fraud rule: {rule.name}")

    def unregister_rule(self, rule_name: str):
        """Unregister a fraud detection rule"""
        if rule_name in self.rules:
            del self.rules[rule_name]
            logger.info(f"Unregistered fraud rule: {rule_name}")

    def enable_rule(self, rule_name: str):
        """Enable a specific rule"""
        if rule_name in self.rules:
            self.rules[rule_name].enabled = True
            logger.info(f"Enabled fraud rule: {rule_name}")

    def disable_rule(self, rule_name: str):
        """Disable a specific rule"""
        if rule_name in self.rules:
            self.rules[rule_name].enabled = False
            logger.info(f"Disabled fraud rule: {rule_name}")

    async def execute_rules(
        self, transactions: list[dict[str, Any]], context: dict[str, Any] | None = None
    ) -> list[FraudAlert]:
        """Execute all enabled rules and return combined alerts"""
        # Ensure initialized
        if not hasattr(self, "_initialized") or not self._initialized:
            await self.initialize()
            self._initialized = True

        if context is None:
            context = {}

        all_alerts = []
        execution_start = datetime.now(UTC)

        logger.info(
            f"Executing {len([r for r in self.rules.values() if r.enabled])} rules on {len(transactions)} transactions"
        )

        for rule in self.rules.values():
            if not rule.enabled:
                continue

            try:
                rule_start = datetime.now(UTC)
                if asyncio.iscoroutinefunction(rule.execute):
                    rule_alerts = await rule.execute(transactions, context)
                else:
                    rule_alerts = rule.execute(transactions, context)
                rule_end = datetime.now(UTC)

                # Update rule execution stats
                rule.last_run = rule_end
                all_alerts.extend(rule_alerts)

                # Record execution history
                self.execution_history.append(
                    {
                        "rule_name": rule.name,
                        "executed_at": rule_start,
                        "duration_ms": (rule_end - rule_start).total_seconds() * 1000,
                        "alerts_generated": len(rule_alerts),
                        "transactions_processed": len(transactions),
                    }
                )

                logger.debug(
                    f"Rule {rule.name} generated {len(rule_alerts)} alerts in {(rule_end - rule_start).total_seconds():.3f}s"
                )

            except Exception as e:
                logger.error(f"Error executing rule {rule.name}: {e!s}")
                # Continue with other rules even if one fails
                continue

        execution_end = datetime.now(UTC)

        # Sort alerts by risk score (highest first)
        all_alerts.sort(key=lambda x: x.risk_score, reverse=True)

        # Log execution summary
        self.execution_history.append(
            {
                "rule_name": "ENGINE_TOTAL",
                "executed_at": execution_start,
                "duration_ms": (execution_end - execution_start).total_seconds() * 1000,
                "alerts_generated": len(all_alerts),
                "transactions_processed": len(transactions),
                "rules_executed": len([r for r in self.rules.values() if r.enabled]),
            }
        )

        logger.info(
            f"Rule engine execution completed in {(execution_end - execution_start).total_seconds():.3f}s, generated {len(all_alerts)} alerts"
        )

        return all_alerts

    def get_rule_status(self) -> dict[str, dict[str, Any]]:
        """Get status of all registered rules"""
        status = {}
        for name, rule in self.rules.items():
            status[name] = {
                "enabled": rule.enabled,
                "severity": rule.severity.value,
                "last_run": rule.last_run.isoformat() if rule.last_run else None,
                "config_schema": rule.get_config_schema(),
            }
        return status

    def get_execution_stats(self, limit: int = 100) -> list[dict[str, Any]]:
        """Get recent execution statistics"""
        return self.execution_history[-limit:] if self.execution_history else []


# Legacy built-in rules removed. Logic migrated to plugins.


# Global shared instance
rule_engine = RuleEngine()