Spaces:
Paused
Paused
| # services/fraud/engine.py | |
| import asyncio | |
| import logging | |
| from abc import ABC, abstractmethod | |
| from dataclasses import dataclass, field | |
| from datetime import UTC, datetime | |
| from enum import Enum | |
| from typing import Any | |
| from core.plugin_system.registry import plugin_registry_service | |
| # Imports for deleted rules removed | |
| logger = logging.getLogger(__name__) | |
| class AlertSeverity(Enum): | |
| LOW = "low" | |
| MEDIUM = "medium" | |
| HIGH = "high" | |
| CRITICAL = "critical" | |
| class FraudAlert: | |
| """Base alert class for all fraud detection alerts""" | |
| alert_id: str | |
| rule_name: str | |
| severity: AlertSeverity | |
| confidence: float # 0.0 to 1.0 | |
| risk_score: float # 0.0 to 100.0 | |
| description: str | |
| detected_at: datetime | |
| case_id: str | None = None | |
| transaction_ids: list[str] = field(default_factory=list) | |
| entities: list[str] = field( | |
| default_factory=list | |
| ) # customer IDs, merchant names, etc. | |
| metadata: dict[str, Any] = field(default_factory=dict) | |
| recommendations: list[str] = field(default_factory=list) | |
| class FraudRule(ABC): | |
| """Abstract base class for fraud detection rules""" | |
| def __init__(self, name: str, severity: AlertSeverity, enabled: bool = True): | |
| self.name = name | |
| self.severity = severity | |
| self.enabled = enabled | |
| self.last_run = None | |
| async def execute( | |
| self, transactions: list[dict[str, Any]], context: dict[str, Any] | None = None | |
| ) -> list[FraudAlert]: | |
| """Execute the rule and return alerts""" | |
| def get_config_schema(self) -> dict[str, Any]: | |
| """Return configuration schema for this rule""" | |
| class PluginAdapterRule(FraudRule): | |
| """Adapter to make Plugins look like FraudRules""" | |
| def __init__(self, plugin_instance): | |
| metadata = plugin_instance.metadata | |
| super().__init__(metadata.name, AlertSeverity.HIGH) # Default severity | |
| self.plugin = plugin_instance | |
| self.namespace = metadata.namespace | |
| async def execute( | |
| self, transactions: list[dict[str, Any]], context: dict[str, Any] | None = None | |
| ) -> list[FraudAlert]: | |
| # Plugins usually take a specific input format | |
| # We need to bridge the gap. Most of our plugins expect {"transactions": [...]} | |
| input_data = {"transactions": transactions, "context": context} | |
| try: | |
| # Execute plugin (async native) | |
| result = await self.plugin.execute(input_data) | |
| alerts = [] | |
| if result and "alerts" in result: | |
| for plugin_alert in result["alerts"]: | |
| # Dynamic Severity Mapping from centralized settings | |
| from core.config import settings | |
| risk_score = plugin_alert.get("risk_score", 0.0) | |
| if risk_score >= settings.FRAUD_SCORE_CRITICAL: | |
| severity = AlertSeverity.CRITICAL | |
| elif risk_score >= settings.FRAUD_SCORE_HIGH: | |
| severity = AlertSeverity.HIGH | |
| elif risk_score >= settings.FRAUD_SCORE_MEDIUM: | |
| severity = AlertSeverity.MEDIUM | |
| else: | |
| severity = AlertSeverity.LOW | |
| # Convert plugin alert dict to FraudAlert object | |
| alert = FraudAlert( | |
| alert_id=f"PL_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}_{len(alerts)}", | |
| rule_name=self.name, | |
| severity=severity, | |
| confidence=plugin_alert.get("confidence", 0.8), | |
| risk_score=risk_score, | |
| description=plugin_alert.get("reason", "Plugin detected fraud"), | |
| detected_at=datetime.now(UTC), | |
| metadata=plugin_alert.get("details", {}), | |
| recommendations=["Review plugin findings"], | |
| ) | |
| alerts.append(alert) | |
| return alerts | |
| except Exception as e: | |
| logger.error(f"Error executing plugin {self.name}: {e}") | |
| return [] | |
| def get_config_schema(self) -> dict[str, Any]: | |
| return ( | |
| self.plugin.metadata.config_schema | |
| if hasattr(self.plugin.metadata, "config_schema") | |
| else {} | |
| ) | |
| class RuleEngine: | |
| """Main fraud detection rule engine""" | |
| def __init__(self): | |
| self.rules: dict[str, FraudRule] = {} | |
| self.rule_registry: dict[str, type[FraudRule]] = {} | |
| self.execution_history: list[dict[str, Any]] = [] | |
| self._register_builtin_rules() | |
| # Plugins loaded via async initialize() call | |
| def _register_builtin_rules(self): | |
| """Register built-in fraud detection rules (Legacy)""" | |
| # We keeping these for backward compatibility / shadow mode if needed, | |
| # but User asked to delete unused files. | |
| # We will remove them from here if implementation is now fully plugin-based. | |
| # But for safety, we might mistakenly successfully run plugins that are NOT yet fully working. | |
| # However, plan says "Move to plugins". | |
| # Let's rely on _load_plugin_rules primarily. | |
| async def initialize(self): | |
| """Async initialization to load plugin rules""" | |
| await self._load_plugin_rules() | |
| async def _load_plugin_rules(self): | |
| """Load rules from PluginRegistry""" | |
| # In a synchronous init, we can't await. | |
| # We might need a start() method or similar. | |
| # Or just run loop here for init. | |
| try: | |
| # Helper to fetch active detection plugins | |
| from core.database import SessionLocal | |
| from core.plugin_system.models import PluginRegistry as PluginRegistryModel | |
| db = SessionLocal() | |
| plugin_instances = [] | |
| try: | |
| # Fetch all active plugins | |
| db_plugins = ( | |
| db.query(PluginRegistryModel) | |
| .filter(PluginRegistryModel.status == "active") | |
| .all() | |
| ) | |
| for p in db_plugins: | |
| if "fraud_detection" in p.metadata_json.get("capabilities", []): | |
| # Load instance | |
| instance = await plugin_registry_service.get_plugin( | |
| p.plugin_id, db | |
| ) | |
| plugin_instances.append(instance) | |
| except Exception as e: | |
| logger.error(f"Failed to load plugins from DB: {e}") | |
| finally: | |
| db.close() | |
| for p in plugin_instances: | |
| adapter = PluginAdapterRule(p) | |
| self.register_rule(adapter) | |
| except Exception as e: | |
| logger.error(f"Failed to load plugin rules: {e}") | |
| def register_rule(self, rule: FraudRule): | |
| """Register a fraud detection rule""" | |
| self.rules[rule.name] = rule | |
| logger.info(f"Registered fraud rule: {rule.name}") | |
| def unregister_rule(self, rule_name: str): | |
| """Unregister a fraud detection rule""" | |
| if rule_name in self.rules: | |
| del self.rules[rule_name] | |
| logger.info(f"Unregistered fraud rule: {rule_name}") | |
| def enable_rule(self, rule_name: str): | |
| """Enable a specific rule""" | |
| if rule_name in self.rules: | |
| self.rules[rule_name].enabled = True | |
| logger.info(f"Enabled fraud rule: {rule_name}") | |
| def disable_rule(self, rule_name: str): | |
| """Disable a specific rule""" | |
| if rule_name in self.rules: | |
| self.rules[rule_name].enabled = False | |
| logger.info(f"Disabled fraud rule: {rule_name}") | |
| async def execute_rules( | |
| self, transactions: list[dict[str, Any]], context: dict[str, Any] | None = None | |
| ) -> list[FraudAlert]: | |
| """Execute all enabled rules and return combined alerts""" | |
| # Ensure initialized | |
| if not hasattr(self, "_initialized") or not self._initialized: | |
| await self.initialize() | |
| self._initialized = True | |
| if context is None: | |
| context = {} | |
| all_alerts = [] | |
| execution_start = datetime.now(UTC) | |
| logger.info( | |
| f"Executing {len([r for r in self.rules.values() if r.enabled])} rules on {len(transactions)} transactions" | |
| ) | |
| for rule in self.rules.values(): | |
| if not rule.enabled: | |
| continue | |
| try: | |
| rule_start = datetime.now(UTC) | |
| if asyncio.iscoroutinefunction(rule.execute): | |
| rule_alerts = await rule.execute(transactions, context) | |
| else: | |
| rule_alerts = rule.execute(transactions, context) | |
| rule_end = datetime.now(UTC) | |
| # Update rule execution stats | |
| rule.last_run = rule_end | |
| all_alerts.extend(rule_alerts) | |
| # Record execution history | |
| self.execution_history.append( | |
| { | |
| "rule_name": rule.name, | |
| "executed_at": rule_start, | |
| "duration_ms": (rule_end - rule_start).total_seconds() * 1000, | |
| "alerts_generated": len(rule_alerts), | |
| "transactions_processed": len(transactions), | |
| } | |
| ) | |
| logger.debug( | |
| f"Rule {rule.name} generated {len(rule_alerts)} alerts in {(rule_end - rule_start).total_seconds():.3f}s" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error executing rule {rule.name}: {e!s}") | |
| # Continue with other rules even if one fails | |
| continue | |
| execution_end = datetime.now(UTC) | |
| # Sort alerts by risk score (highest first) | |
| all_alerts.sort(key=lambda x: x.risk_score, reverse=True) | |
| # Log execution summary | |
| self.execution_history.append( | |
| { | |
| "rule_name": "ENGINE_TOTAL", | |
| "executed_at": execution_start, | |
| "duration_ms": (execution_end - execution_start).total_seconds() * 1000, | |
| "alerts_generated": len(all_alerts), | |
| "transactions_processed": len(transactions), | |
| "rules_executed": len([r for r in self.rules.values() if r.enabled]), | |
| } | |
| ) | |
| logger.info( | |
| f"Rule engine execution completed in {(execution_end - execution_start).total_seconds():.3f}s, generated {len(all_alerts)} alerts" | |
| ) | |
| return all_alerts | |
| def get_rule_status(self) -> dict[str, dict[str, Any]]: | |
| """Get status of all registered rules""" | |
| status = {} | |
| for name, rule in self.rules.items(): | |
| status[name] = { | |
| "enabled": rule.enabled, | |
| "severity": rule.severity.value, | |
| "last_run": rule.last_run.isoformat() if rule.last_run else None, | |
| "config_schema": rule.get_config_schema(), | |
| } | |
| return status | |
| def get_execution_stats(self, limit: int = 100) -> list[dict[str, Any]]: | |
| """Get recent execution statistics""" | |
| return self.execution_history[-limit:] if self.execution_history else [] | |
| # Legacy built-in rules removed. Logic migrated to plugins. | |
| # Global shared instance | |
| rule_engine = RuleEngine() | |