| | """Fraud Engine - Core Decision Logic |
| | |
| | This module orchestrates the fraud detection decision process. |
| | It coordinates multiple agents and produces the final decision: investigate | allow |
| | """ |
| |
|
| | import json |
| | from typing import Dict, List, Any |
| | from datetime import datetime |
| | from text_processor import InsuranceTextProcessor |
| |
|
| |
|
| | class FraudEngine: |
| | """Core fraud detection engine that orchestrates decision-making.""" |
| | |
| | def __init__(self): |
| | self.version = "1.0.0" |
| | self.decision_threshold = 0.65 |
| | self.text_processor = InsuranceTextProcessor() |
| | |
| | def process_claim(self, claim_data: Dict[str, Any]) -> Dict[str, Any]: |
| | """Process a claim and return fraud decision. |
| | |
| | Args: |
| | claim_data: Structured claim information |
| | |
| | Returns: |
| | Decision contract with action, evidence, and explainability |
| | """ |
| | |
| | features = self._engineer_features(claim_data) |
| |
|
| | |
| | text_analysis = None |
| | if 'claim_description' in claim_data: |
| | text_analysis = self.text_processor.analyze_claim_text(claim_data['claim_description']) |
| | |
| | |
| | pattern_analysis = self._analyze_patterns(features) |
| | anomaly_analysis = self._detect_anomalies(features) |
| | |
| | rule_score = self._calculate_risk_score(pattern_analysis, anomaly_analysis) |
| | if text_analysis: |
| | text_score = text_analysis['fraud_score'] |
| | risk_score = 0.3 * text_score + 0.7 * rule_score |
| | else: |
| | risk_score = rule_score |
| | |
| | decision = self._make_decision(risk_score) |
| | |
| | |
| | explainability = self._build_explainability( |
| | pattern_analysis, |
| | anomaly_analysis, |
| | risk_score |
| | ) |
| | |
| | |
| | audit_log = self._create_audit_log(claim_data, decision, explainability) |
| | |
| | return { |
| | "decision": decision, |
| | "fraud_score": risk_score["score"], |
| | "risk_band": risk_score["band"], |
| | "evidence": explainability["evidence"], |
| | "confidence": explainability["confidence"], |
| | "audit_id": audit_log["audit_id"], |
| | "timestamp": audit_log["timestamp"], |
| | "text_analysis": text_analysis, |
| | "method": "Hybrid (Text + Rules)" if text_analysis else "Rule-based only" |
| | } |
| | |
| | def _engineer_features(self, claim_data: Dict[str, Any]) -> Dict[str, Any]: |
| | """Extract and engineer features from claim data.""" |
| | return { |
| | "amount": claim_data.get("amount", 0), |
| | "claim_type": claim_data.get("type", "unknown"), |
| | "claimant_id": claim_data.get("claimant_id", ""), |
| | "policy_age_days": claim_data.get("days_since_policy_start", 365), |
| | "claim_history": claim_data.get("claimant_history", {}), |
| | "documents": claim_data.get("documents", []), |
| | "temporal_data": claim_data.get("temporal_data", {}), |
| | "entity_links": claim_data.get("linked_entities", []) |
| | } |
| | |
| | def _analyze_patterns(self, features: Dict[str, Any]) -> Dict[str, Any]: |
| | """Analyze claim patterns for fraud indicators.""" |
| | patterns = {} |
| | |
| | |
| | claim_count = features.get("claim_history", {}).get("claim_count", 0) |
| | patterns["high_frequency"] = claim_count > 5 |
| | patterns["frequency_score"] = min(claim_count / 10.0, 1.0) |
| | |
| | |
| | amount = features.get("amount", 0) |
| | avg_amount = features.get("claim_history", {}).get("avg_amount", 5000) |
| | deviation = abs(amount - avg_amount) / avg_amount if avg_amount > 0 else 0 |
| | patterns["amount_deviation"] = deviation |
| | patterns["unusual_amount"] = deviation > 0.5 |
| | |
| | |
| | policy_age = features.get("policy_age_days", 365) |
| | patterns["early_claim"] = policy_age < 30 |
| | patterns["temporal_score"] = 1.0 if policy_age < 30 else 0.0 |
| | |
| | return patterns |
| | |
| | def _detect_anomalies(self, features: Dict[str, Any]) -> Dict[str, Any]: |
| | """Detect anomalies in claim data.""" |
| | anomalies = {} |
| | |
| | |
| | documents = features.get("documents", []) |
| | anomalies["missing_documents"] = len(documents) < 2 |
| | anomalies["document_score"] = 1.0 if len(documents) < 2 else 0.0 |
| | |
| | |
| | entity_links = features.get("entity_links", []) |
| | anomalies["suspicious_links"] = len(entity_links) > 0 |
| | anomalies["entity_score"] = min(len(entity_links) / 5.0, 1.0) |
| | |
| | |
| | claim_history = features.get("claim_history", {}) |
| | anomalies["behavioral_score"] = 0.5 if claim_history.get("claim_count", 0) > 3 else 0.0 |
| | |
| | return anomalies |
| | |
| | def _calculate_risk_score( |
| | self, |
| | pattern_analysis: Dict[str, Any], |
| | anomaly_analysis: Dict[str, Any] |
| | ) -> Dict[str, Any]: |
| | """Calculate overall fraud risk score.""" |
| | |
| | pattern_weight = 0.6 |
| | anomaly_weight = 0.4 |
| | |
| | pattern_score = ( |
| | pattern_analysis.get("frequency_score", 0) * 0.4 + |
| | pattern_analysis.get("amount_deviation", 0) * 0.3 + |
| | pattern_analysis.get("temporal_score", 0) * 0.3 |
| | ) |
| | |
| | anomaly_score = ( |
| | anomaly_analysis.get("document_score", 0) * 0.4 + |
| | anomaly_analysis.get("entity_score", 0) * 0.4 + |
| | anomaly_analysis.get("behavioral_score", 0) * 0.2 |
| | ) |
| | |
| | overall_score = (pattern_score * pattern_weight) + (anomaly_score * anomaly_weight) |
| | |
| | |
| | if overall_score >= 0.7: |
| | risk_band = "high" |
| | elif overall_score >= 0.4: |
| | risk_band = "medium" |
| | else: |
| | risk_band = "low" |
| | |
| | return { |
| | "score": round(overall_score, 3), |
| | "band": risk_band, |
| | "pattern_score": round(pattern_score, 3), |
| | "anomaly_score": round(anomaly_score, 3) |
| | } |
| | |
| | def _make_decision(self, risk_score: Dict[str, Any]) -> str: |
| | """Make final decision: investigate | allow.""" |
| | score = risk_score["score"] |
| | return "investigate" if score >= self.decision_threshold else "allow" |
| | |
| | def _build_explainability( |
| | self, |
| | pattern_analysis: Dict[str, Any], |
| | anomaly_analysis: Dict[str, Any], |
| | risk_score: Dict[str, Any] |
| | ) -> Dict[str, Any]: |
| | """Build explainability payload.""" |
| | evidence = [] |
| | |
| | |
| | if pattern_analysis.get("high_frequency"): |
| | evidence.append("High claim frequency detected") |
| | if pattern_analysis.get("unusual_amount"): |
| | evidence.append("Unusual claim amount") |
| | if pattern_analysis.get("early_claim"): |
| | evidence.append("Claim filed shortly after policy inception") |
| | |
| | |
| | if anomaly_analysis.get("missing_documents"): |
| | evidence.append("Insufficient documentation") |
| | if anomaly_analysis.get("suspicious_links"): |
| | evidence.append("Linked to suspicious entities") |
| | |
| | |
| | score_variance = abs(risk_score["pattern_score"] - risk_score["anomaly_score"]) |
| | confidence = 1.0 - (score_variance * 0.5) |
| | |
| | return { |
| | "evidence": evidence, |
| | "confidence": round(max(confidence, 0.5), 3), |
| | "pattern_analysis": pattern_analysis, |
| | "anomaly_analysis": anomaly_analysis |
| | } |
| | |
| | def _create_audit_log( |
| | self, |
| | claim_data: Dict[str, Any], |
| | decision: str, |
| | explainability: Dict[str, Any] |
| | ) -> Dict[str, Any]: |
| | """Create audit log entry.""" |
| | import hashlib |
| | |
| | timestamp = datetime.utcnow().isoformat() |
| | audit_id = hashlib.sha256( |
| | f"{claim_data.get('claim_id', 'unknown')}_{timestamp}".encode() |
| | ).hexdigest()[:16] |
| | |
| | return { |
| | "audit_id": audit_id, |
| | "timestamp": timestamp, |
| | "claim_id": claim_data.get("claim_id", "unknown"), |
| | "decision": decision, |
| | "evidence_count": len(explainability.get("evidence", [])), |
| | "model_version": self.version |
| | } |
| |
|