"""Fraud Engine - Core Decision Logic This module orchestrates the fraud detection decision process. It coordinates multiple agents and produces the final decision: investigate | allow """ import json from typing import Dict, List, Any from datetime import datetime class FraudEngine: """Core fraud detection engine that orchestrates decision-making.""" def __init__(self): self.version = "1.0.0" self.decision_threshold = 0.65 def process_claim(self, claim_data: Dict[str, Any]) -> Dict[str, Any]: """Process a claim and return fraud decision. Args: claim_data: Structured claim information Returns: Decision contract with action, evidence, and explainability """ # Step 1: Feature Engineering features = self._engineer_features(claim_data) # Step 2: Multi-Agent Analysis pattern_analysis = self._analyze_patterns(features) anomaly_analysis = self._detect_anomalies(features) risk_score = self._calculate_risk_score(pattern_analysis, anomaly_analysis) # Step 3: Decision Logic decision = self._make_decision(risk_score) # Step 4: Build Explainability explainability = self._build_explainability( pattern_analysis, anomaly_analysis, risk_score ) # Step 5: Governance & Audit audit_log = self._create_audit_log(claim_data, decision, explainability) return { "decision": decision, "fraud_score": risk_score["score"], "risk_band": risk_score["band"], "evidence": explainability["evidence"], "confidence": explainability["confidence"], "audit_id": audit_log["audit_id"], "timestamp": audit_log["timestamp"] } def _engineer_features(self, claim_data: Dict[str, Any]) -> Dict[str, Any]: """Extract and engineer features from claim data.""" return { "amount": claim_data.get("amount", 0), "claim_type": claim_data.get("type", "unknown"), "claimant_id": claim_data.get("claimant_id", ""), "policy_age_days": claim_data.get("days_since_policy_start", 365), "claim_history": claim_data.get("claimant_history", {}), "documents": claim_data.get("documents", []), "temporal_data": claim_data.get("temporal_data", {}), "entity_links": claim_data.get("linked_entities", []) } def _analyze_patterns(self, features: Dict[str, Any]) -> Dict[str, Any]: """Analyze claim patterns for fraud indicators.""" patterns = {} # Frequency pattern claim_count = features.get("claim_history", {}).get("claim_count", 0) patterns["high_frequency"] = claim_count > 5 patterns["frequency_score"] = min(claim_count / 10.0, 1.0) # Amount pattern amount = features.get("amount", 0) avg_amount = features.get("claim_history", {}).get("avg_amount", 5000) deviation = abs(amount - avg_amount) / avg_amount if avg_amount > 0 else 0 patterns["amount_deviation"] = deviation patterns["unusual_amount"] = deviation > 0.5 # Temporal pattern policy_age = features.get("policy_age_days", 365) patterns["early_claim"] = policy_age < 30 patterns["temporal_score"] = 1.0 if policy_age < 30 else 0.0 return patterns def _detect_anomalies(self, features: Dict[str, Any]) -> Dict[str, Any]: """Detect anomalies in claim data.""" anomalies = {} # Document anomalies documents = features.get("documents", []) anomalies["missing_documents"] = len(documents) < 2 anomalies["document_score"] = 1.0 if len(documents) < 2 else 0.0 # Entity linkage anomalies entity_links = features.get("entity_links", []) anomalies["suspicious_links"] = len(entity_links) > 0 anomalies["entity_score"] = min(len(entity_links) / 5.0, 1.0) # Behavioral anomalies claim_history = features.get("claim_history", {}) anomalies["behavioral_score"] = 0.5 if claim_history.get("claim_count", 0) > 3 else 0.0 return anomalies def _calculate_risk_score( self, pattern_analysis: Dict[str, Any], anomaly_analysis: Dict[str, Any] ) -> Dict[str, Any]: """Calculate overall fraud risk score.""" # Weighted scoring pattern_weight = 0.6 anomaly_weight = 0.4 pattern_score = ( pattern_analysis.get("frequency_score", 0) * 0.4 + pattern_analysis.get("amount_deviation", 0) * 0.3 + pattern_analysis.get("temporal_score", 0) * 0.3 ) anomaly_score = ( anomaly_analysis.get("document_score", 0) * 0.4 + anomaly_analysis.get("entity_score", 0) * 0.4 + anomaly_analysis.get("behavioral_score", 0) * 0.2 ) overall_score = (pattern_score * pattern_weight) + (anomaly_score * anomaly_weight) # Determine risk band if overall_score >= 0.7: risk_band = "high" elif overall_score >= 0.4: risk_band = "medium" else: risk_band = "low" return { "score": round(overall_score, 3), "band": risk_band, "pattern_score": round(pattern_score, 3), "anomaly_score": round(anomaly_score, 3) } def _make_decision(self, risk_score: Dict[str, Any]) -> str: """Make final decision: investigate | allow.""" score = risk_score["score"] return "investigate" if score >= self.decision_threshold else "allow" def _build_explainability( self, pattern_analysis: Dict[str, Any], anomaly_analysis: Dict[str, Any], risk_score: Dict[str, Any] ) -> Dict[str, Any]: """Build explainability payload.""" evidence = [] # Pattern evidence if pattern_analysis.get("high_frequency"): evidence.append("High claim frequency detected") if pattern_analysis.get("unusual_amount"): evidence.append("Unusual claim amount") if pattern_analysis.get("early_claim"): evidence.append("Claim filed shortly after policy inception") # Anomaly evidence if anomaly_analysis.get("missing_documents"): evidence.append("Insufficient documentation") if anomaly_analysis.get("suspicious_links"): evidence.append("Linked to suspicious entities") # Calculate confidence score_variance = abs(risk_score["pattern_score"] - risk_score["anomaly_score"]) confidence = 1.0 - (score_variance * 0.5) return { "evidence": evidence, "confidence": round(max(confidence, 0.5), 3), "pattern_analysis": pattern_analysis, "anomaly_analysis": anomaly_analysis } def _create_audit_log( self, claim_data: Dict[str, Any], decision: str, explainability: Dict[str, Any] ) -> Dict[str, Any]: """Create audit log entry.""" import hashlib timestamp = datetime.utcnow().isoformat() audit_id = hashlib.sha256( f"{claim_data.get('claim_id', 'unknown')}_{timestamp}".encode() ).hexdigest()[:16] return { "audit_id": audit_id, "timestamp": timestamp, "claim_id": claim_data.get("claim_id", "unknown"), "decision": decision, "evidence_count": len(explainability.get("evidence", [])), "model_version": self.version }