Spaces:
Sleeping
Sleeping
| """Fraud Engine - Core Decision Logic | |
| This module orchestrates the fraud detection decision process. | |
| It coordinates multiple agents and produces the final decision: investigate | allow | |
| """ | |
| import json | |
| from typing import Dict, List, Any | |
| from datetime import datetime | |
| class FraudEngine: | |
| """Core fraud detection engine that orchestrates decision-making.""" | |
| def __init__(self): | |
| self.version = "1.0.0" | |
| self.decision_threshold = 0.65 | |
| def process_claim(self, claim_data: Dict[str, Any]) -> Dict[str, Any]: | |
| """Process a claim and return fraud decision. | |
| Args: | |
| claim_data: Structured claim information | |
| Returns: | |
| Decision contract with action, evidence, and explainability | |
| """ | |
| # Step 1: Feature Engineering | |
| features = self._engineer_features(claim_data) | |
| # Step 2: Multi-Agent Analysis | |
| pattern_analysis = self._analyze_patterns(features) | |
| anomaly_analysis = self._detect_anomalies(features) | |
| risk_score = self._calculate_risk_score(pattern_analysis, anomaly_analysis) | |
| # Step 3: Decision Logic | |
| decision = self._make_decision(risk_score) | |
| # Step 4: Build Explainability | |
| explainability = self._build_explainability( | |
| pattern_analysis, | |
| anomaly_analysis, | |
| risk_score | |
| ) | |
| # Step 5: Governance & Audit | |
| audit_log = self._create_audit_log(claim_data, decision, explainability) | |
| return { | |
| "decision": decision, | |
| "fraud_score": risk_score["score"], | |
| "risk_band": risk_score["band"], | |
| "evidence": explainability["evidence"], | |
| "confidence": explainability["confidence"], | |
| "audit_id": audit_log["audit_id"], | |
| "timestamp": audit_log["timestamp"] | |
| } | |
| def _engineer_features(self, claim_data: Dict[str, Any]) -> Dict[str, Any]: | |
| """Extract and engineer features from claim data.""" | |
| return { | |
| "amount": claim_data.get("amount", 0), | |
| "claim_type": claim_data.get("type", "unknown"), | |
| "claimant_id": claim_data.get("claimant_id", ""), | |
| "policy_age_days": claim_data.get("days_since_policy_start", 365), | |
| "claim_history": claim_data.get("claimant_history", {}), | |
| "documents": claim_data.get("documents", []), | |
| "temporal_data": claim_data.get("temporal_data", {}), | |
| "entity_links": claim_data.get("linked_entities", []) | |
| } | |
| def _analyze_patterns(self, features: Dict[str, Any]) -> Dict[str, Any]: | |
| """Analyze claim patterns for fraud indicators.""" | |
| patterns = {} | |
| # Frequency pattern | |
| claim_count = features.get("claim_history", {}).get("claim_count", 0) | |
| patterns["high_frequency"] = claim_count > 5 | |
| patterns["frequency_score"] = min(claim_count / 10.0, 1.0) | |
| # Amount pattern | |
| amount = features.get("amount", 0) | |
| avg_amount = features.get("claim_history", {}).get("avg_amount", 5000) | |
| deviation = abs(amount - avg_amount) / avg_amount if avg_amount > 0 else 0 | |
| patterns["amount_deviation"] = deviation | |
| patterns["unusual_amount"] = deviation > 0.5 | |
| # Temporal pattern | |
| policy_age = features.get("policy_age_days", 365) | |
| patterns["early_claim"] = policy_age < 30 | |
| patterns["temporal_score"] = 1.0 if policy_age < 30 else 0.0 | |
| return patterns | |
| def _detect_anomalies(self, features: Dict[str, Any]) -> Dict[str, Any]: | |
| """Detect anomalies in claim data.""" | |
| anomalies = {} | |
| # Document anomalies | |
| documents = features.get("documents", []) | |
| anomalies["missing_documents"] = len(documents) < 2 | |
| anomalies["document_score"] = 1.0 if len(documents) < 2 else 0.0 | |
| # Entity linkage anomalies | |
| entity_links = features.get("entity_links", []) | |
| anomalies["suspicious_links"] = len(entity_links) > 0 | |
| anomalies["entity_score"] = min(len(entity_links) / 5.0, 1.0) | |
| # Behavioral anomalies | |
| claim_history = features.get("claim_history", {}) | |
| anomalies["behavioral_score"] = 0.5 if claim_history.get("claim_count", 0) > 3 else 0.0 | |
| return anomalies | |
| def _calculate_risk_score( | |
| self, | |
| pattern_analysis: Dict[str, Any], | |
| anomaly_analysis: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """Calculate overall fraud risk score.""" | |
| # Weighted scoring | |
| pattern_weight = 0.6 | |
| anomaly_weight = 0.4 | |
| pattern_score = ( | |
| pattern_analysis.get("frequency_score", 0) * 0.4 + | |
| pattern_analysis.get("amount_deviation", 0) * 0.3 + | |
| pattern_analysis.get("temporal_score", 0) * 0.3 | |
| ) | |
| anomaly_score = ( | |
| anomaly_analysis.get("document_score", 0) * 0.4 + | |
| anomaly_analysis.get("entity_score", 0) * 0.4 + | |
| anomaly_analysis.get("behavioral_score", 0) * 0.2 | |
| ) | |
| overall_score = (pattern_score * pattern_weight) + (anomaly_score * anomaly_weight) | |
| # Determine risk band | |
| if overall_score >= 0.7: | |
| risk_band = "high" | |
| elif overall_score >= 0.4: | |
| risk_band = "medium" | |
| else: | |
| risk_band = "low" | |
| return { | |
| "score": round(overall_score, 3), | |
| "band": risk_band, | |
| "pattern_score": round(pattern_score, 3), | |
| "anomaly_score": round(anomaly_score, 3) | |
| } | |
| def _make_decision(self, risk_score: Dict[str, Any]) -> str: | |
| """Make final decision: investigate | allow.""" | |
| score = risk_score["score"] | |
| return "investigate" if score >= self.decision_threshold else "allow" | |
| def _build_explainability( | |
| self, | |
| pattern_analysis: Dict[str, Any], | |
| anomaly_analysis: Dict[str, Any], | |
| risk_score: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """Build explainability payload.""" | |
| evidence = [] | |
| # Pattern evidence | |
| if pattern_analysis.get("high_frequency"): | |
| evidence.append("High claim frequency detected") | |
| if pattern_analysis.get("unusual_amount"): | |
| evidence.append("Unusual claim amount") | |
| if pattern_analysis.get("early_claim"): | |
| evidence.append("Claim filed shortly after policy inception") | |
| # Anomaly evidence | |
| if anomaly_analysis.get("missing_documents"): | |
| evidence.append("Insufficient documentation") | |
| if anomaly_analysis.get("suspicious_links"): | |
| evidence.append("Linked to suspicious entities") | |
| # Calculate confidence | |
| score_variance = abs(risk_score["pattern_score"] - risk_score["anomaly_score"]) | |
| confidence = 1.0 - (score_variance * 0.5) | |
| return { | |
| "evidence": evidence, | |
| "confidence": round(max(confidence, 0.5), 3), | |
| "pattern_analysis": pattern_analysis, | |
| "anomaly_analysis": anomaly_analysis | |
| } | |
| def _create_audit_log( | |
| self, | |
| claim_data: Dict[str, Any], | |
| decision: str, | |
| explainability: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """Create audit log entry.""" | |
| import hashlib | |
| timestamp = datetime.utcnow().isoformat() | |
| audit_id = hashlib.sha256( | |
| f"{claim_data.get('claim_id', 'unknown')}_{timestamp}".encode() | |
| ).hexdigest()[:16] | |
| return { | |
| "audit_id": audit_id, | |
| "timestamp": timestamp, | |
| "claim_id": claim_data.get("claim_id", "unknown"), | |
| "decision": decision, | |
| "evidence_count": len(explainability.get("evidence", [])), | |
| "model_version": self.version | |
| } | |