# services/fraud/engine.py import asyncio import logging from abc import ABC, abstractmethod from dataclasses import dataclass, field from datetime import UTC, datetime from enum import Enum from typing import Any from core.plugin_system.registry import plugin_registry_service # Imports for deleted rules removed logger = logging.getLogger(__name__) class AlertSeverity(Enum): LOW = "low" MEDIUM = "medium" HIGH = "high" CRITICAL = "critical" @dataclass class FraudAlert: """Base alert class for all fraud detection alerts""" alert_id: str rule_name: str severity: AlertSeverity confidence: float # 0.0 to 1.0 risk_score: float # 0.0 to 100.0 description: str detected_at: datetime case_id: str | None = None transaction_ids: list[str] = field(default_factory=list) entities: list[str] = field( default_factory=list ) # customer IDs, merchant names, etc. metadata: dict[str, Any] = field(default_factory=dict) recommendations: list[str] = field(default_factory=list) class FraudRule(ABC): """Abstract base class for fraud detection rules""" def __init__(self, name: str, severity: AlertSeverity, enabled: bool = True): self.name = name self.severity = severity self.enabled = enabled self.last_run = None @abstractmethod async def execute( self, transactions: list[dict[str, Any]], context: dict[str, Any] | None = None ) -> list[FraudAlert]: """Execute the rule and return alerts""" @abstractmethod def get_config_schema(self) -> dict[str, Any]: """Return configuration schema for this rule""" class PluginAdapterRule(FraudRule): """Adapter to make Plugins look like FraudRules""" def __init__(self, plugin_instance): metadata = plugin_instance.metadata super().__init__(metadata.name, AlertSeverity.HIGH) # Default severity self.plugin = plugin_instance self.namespace = metadata.namespace async def execute( self, transactions: list[dict[str, Any]], context: dict[str, Any] | None = None ) -> list[FraudAlert]: # Plugins usually take a specific input format # We need to bridge the gap. Most of our plugins expect {"transactions": [...]} input_data = {"transactions": transactions, "context": context} try: # Execute plugin (async native) result = await self.plugin.execute(input_data) alerts = [] if result and "alerts" in result: for plugin_alert in result["alerts"]: # Dynamic Severity Mapping from centralized settings from core.config import settings risk_score = plugin_alert.get("risk_score", 0.0) if risk_score >= settings.FRAUD_SCORE_CRITICAL: severity = AlertSeverity.CRITICAL elif risk_score >= settings.FRAUD_SCORE_HIGH: severity = AlertSeverity.HIGH elif risk_score >= settings.FRAUD_SCORE_MEDIUM: severity = AlertSeverity.MEDIUM else: severity = AlertSeverity.LOW # Convert plugin alert dict to FraudAlert object alert = FraudAlert( alert_id=f"PL_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}_{len(alerts)}", rule_name=self.name, severity=severity, confidence=plugin_alert.get("confidence", 0.8), risk_score=risk_score, description=plugin_alert.get("reason", "Plugin detected fraud"), detected_at=datetime.now(UTC), metadata=plugin_alert.get("details", {}), recommendations=["Review plugin findings"], ) alerts.append(alert) return alerts except Exception as e: logger.error(f"Error executing plugin {self.name}: {e}") return [] def get_config_schema(self) -> dict[str, Any]: return ( self.plugin.metadata.config_schema if hasattr(self.plugin.metadata, "config_schema") else {} ) class RuleEngine: """Main fraud detection rule engine""" def __init__(self): self.rules: dict[str, FraudRule] = {} self.rule_registry: dict[str, type[FraudRule]] = {} self.execution_history: list[dict[str, Any]] = [] self._register_builtin_rules() # Plugins loaded via async initialize() call def _register_builtin_rules(self): """Register built-in fraud detection rules (Legacy)""" # We keeping these for backward compatibility / shadow mode if needed, # but User asked to delete unused files. # We will remove them from here if implementation is now fully plugin-based. # But for safety, we might mistakenly successfully run plugins that are NOT yet fully working. # However, plan says "Move to plugins". # Let's rely on _load_plugin_rules primarily. async def initialize(self): """Async initialization to load plugin rules""" await self._load_plugin_rules() async def _load_plugin_rules(self): """Load rules from PluginRegistry""" # In a synchronous init, we can't await. # We might need a start() method or similar. # Or just run loop here for init. try: # Helper to fetch active detection plugins from core.database import SessionLocal from core.plugin_system.models import PluginRegistry as PluginRegistryModel db = SessionLocal() plugin_instances = [] try: # Fetch all active plugins db_plugins = ( db.query(PluginRegistryModel) .filter(PluginRegistryModel.status == "active") .all() ) for p in db_plugins: if "fraud_detection" in p.metadata_json.get("capabilities", []): # Load instance instance = await plugin_registry_service.get_plugin( p.plugin_id, db ) plugin_instances.append(instance) except Exception as e: logger.error(f"Failed to load plugins from DB: {e}") finally: db.close() for p in plugin_instances: adapter = PluginAdapterRule(p) self.register_rule(adapter) except Exception as e: logger.error(f"Failed to load plugin rules: {e}") def register_rule(self, rule: FraudRule): """Register a fraud detection rule""" self.rules[rule.name] = rule logger.info(f"Registered fraud rule: {rule.name}") def unregister_rule(self, rule_name: str): """Unregister a fraud detection rule""" if rule_name in self.rules: del self.rules[rule_name] logger.info(f"Unregistered fraud rule: {rule_name}") def enable_rule(self, rule_name: str): """Enable a specific rule""" if rule_name in self.rules: self.rules[rule_name].enabled = True logger.info(f"Enabled fraud rule: {rule_name}") def disable_rule(self, rule_name: str): """Disable a specific rule""" if rule_name in self.rules: self.rules[rule_name].enabled = False logger.info(f"Disabled fraud rule: {rule_name}") async def execute_rules( self, transactions: list[dict[str, Any]], context: dict[str, Any] | None = None ) -> list[FraudAlert]: """Execute all enabled rules and return combined alerts""" # Ensure initialized if not hasattr(self, "_initialized") or not self._initialized: await self.initialize() self._initialized = True if context is None: context = {} all_alerts = [] execution_start = datetime.now(UTC) logger.info( f"Executing {len([r for r in self.rules.values() if r.enabled])} rules on {len(transactions)} transactions" ) for rule in self.rules.values(): if not rule.enabled: continue try: rule_start = datetime.now(UTC) if asyncio.iscoroutinefunction(rule.execute): rule_alerts = await rule.execute(transactions, context) else: rule_alerts = rule.execute(transactions, context) rule_end = datetime.now(UTC) # Update rule execution stats rule.last_run = rule_end all_alerts.extend(rule_alerts) # Record execution history self.execution_history.append( { "rule_name": rule.name, "executed_at": rule_start, "duration_ms": (rule_end - rule_start).total_seconds() * 1000, "alerts_generated": len(rule_alerts), "transactions_processed": len(transactions), } ) logger.debug( f"Rule {rule.name} generated {len(rule_alerts)} alerts in {(rule_end - rule_start).total_seconds():.3f}s" ) except Exception as e: logger.error(f"Error executing rule {rule.name}: {e!s}") # Continue with other rules even if one fails continue execution_end = datetime.now(UTC) # Sort alerts by risk score (highest first) all_alerts.sort(key=lambda x: x.risk_score, reverse=True) # Log execution summary self.execution_history.append( { "rule_name": "ENGINE_TOTAL", "executed_at": execution_start, "duration_ms": (execution_end - execution_start).total_seconds() * 1000, "alerts_generated": len(all_alerts), "transactions_processed": len(transactions), "rules_executed": len([r for r in self.rules.values() if r.enabled]), } ) logger.info( f"Rule engine execution completed in {(execution_end - execution_start).total_seconds():.3f}s, generated {len(all_alerts)} alerts" ) return all_alerts def get_rule_status(self) -> dict[str, dict[str, Any]]: """Get status of all registered rules""" status = {} for name, rule in self.rules.items(): status[name] = { "enabled": rule.enabled, "severity": rule.severity.value, "last_run": rule.last_run.isoformat() if rule.last_run else None, "config_schema": rule.get_config_schema(), } return status def get_execution_stats(self, limit: int = 100) -> list[dict[str, Any]]: """Get recent execution statistics""" return self.execution_history[-limit:] if self.execution_history else [] # Legacy built-in rules removed. Logic migrated to plugins. # Global shared instance rule_engine = RuleEngine()