Spaces:
Sleeping
Sleeping
File size: 7,993 Bytes
9d20d0b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
"""Fraud Engine - Core Decision Logic
This module orchestrates the fraud detection decision process.
It coordinates multiple agents and produces the final decision: investigate | allow
"""
import json
from typing import Dict, List, Any
from datetime import datetime
class FraudEngine:
"""Core fraud detection engine that orchestrates decision-making."""
def __init__(self):
self.version = "1.0.0"
self.decision_threshold = 0.65
def process_claim(self, claim_data: Dict[str, Any]) -> Dict[str, Any]:
"""Process a claim and return fraud decision.
Args:
claim_data: Structured claim information
Returns:
Decision contract with action, evidence, and explainability
"""
# Step 1: Feature Engineering
features = self._engineer_features(claim_data)
# Step 2: Multi-Agent Analysis
pattern_analysis = self._analyze_patterns(features)
anomaly_analysis = self._detect_anomalies(features)
risk_score = self._calculate_risk_score(pattern_analysis, anomaly_analysis)
# Step 3: Decision Logic
decision = self._make_decision(risk_score)
# Step 4: Build Explainability
explainability = self._build_explainability(
pattern_analysis,
anomaly_analysis,
risk_score
)
# Step 5: Governance & Audit
audit_log = self._create_audit_log(claim_data, decision, explainability)
return {
"decision": decision,
"fraud_score": risk_score["score"],
"risk_band": risk_score["band"],
"evidence": explainability["evidence"],
"confidence": explainability["confidence"],
"audit_id": audit_log["audit_id"],
"timestamp": audit_log["timestamp"]
}
def _engineer_features(self, claim_data: Dict[str, Any]) -> Dict[str, Any]:
"""Extract and engineer features from claim data."""
return {
"amount": claim_data.get("amount", 0),
"claim_type": claim_data.get("type", "unknown"),
"claimant_id": claim_data.get("claimant_id", ""),
"policy_age_days": claim_data.get("days_since_policy_start", 365),
"claim_history": claim_data.get("claimant_history", {}),
"documents": claim_data.get("documents", []),
"temporal_data": claim_data.get("temporal_data", {}),
"entity_links": claim_data.get("linked_entities", [])
}
def _analyze_patterns(self, features: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze claim patterns for fraud indicators."""
patterns = {}
# Frequency pattern
claim_count = features.get("claim_history", {}).get("claim_count", 0)
patterns["high_frequency"] = claim_count > 5
patterns["frequency_score"] = min(claim_count / 10.0, 1.0)
# Amount pattern
amount = features.get("amount", 0)
avg_amount = features.get("claim_history", {}).get("avg_amount", 5000)
deviation = abs(amount - avg_amount) / avg_amount if avg_amount > 0 else 0
patterns["amount_deviation"] = deviation
patterns["unusual_amount"] = deviation > 0.5
# Temporal pattern
policy_age = features.get("policy_age_days", 365)
patterns["early_claim"] = policy_age < 30
patterns["temporal_score"] = 1.0 if policy_age < 30 else 0.0
return patterns
def _detect_anomalies(self, features: Dict[str, Any]) -> Dict[str, Any]:
"""Detect anomalies in claim data."""
anomalies = {}
# Document anomalies
documents = features.get("documents", [])
anomalies["missing_documents"] = len(documents) < 2
anomalies["document_score"] = 1.0 if len(documents) < 2 else 0.0
# Entity linkage anomalies
entity_links = features.get("entity_links", [])
anomalies["suspicious_links"] = len(entity_links) > 0
anomalies["entity_score"] = min(len(entity_links) / 5.0, 1.0)
# Behavioral anomalies
claim_history = features.get("claim_history", {})
anomalies["behavioral_score"] = 0.5 if claim_history.get("claim_count", 0) > 3 else 0.0
return anomalies
def _calculate_risk_score(
self,
pattern_analysis: Dict[str, Any],
anomaly_analysis: Dict[str, Any]
) -> Dict[str, Any]:
"""Calculate overall fraud risk score."""
# Weighted scoring
pattern_weight = 0.6
anomaly_weight = 0.4
pattern_score = (
pattern_analysis.get("frequency_score", 0) * 0.4 +
pattern_analysis.get("amount_deviation", 0) * 0.3 +
pattern_analysis.get("temporal_score", 0) * 0.3
)
anomaly_score = (
anomaly_analysis.get("document_score", 0) * 0.4 +
anomaly_analysis.get("entity_score", 0) * 0.4 +
anomaly_analysis.get("behavioral_score", 0) * 0.2
)
overall_score = (pattern_score * pattern_weight) + (anomaly_score * anomaly_weight)
# Determine risk band
if overall_score >= 0.7:
risk_band = "high"
elif overall_score >= 0.4:
risk_band = "medium"
else:
risk_band = "low"
return {
"score": round(overall_score, 3),
"band": risk_band,
"pattern_score": round(pattern_score, 3),
"anomaly_score": round(anomaly_score, 3)
}
def _make_decision(self, risk_score: Dict[str, Any]) -> str:
"""Make final decision: investigate | allow."""
score = risk_score["score"]
return "investigate" if score >= self.decision_threshold else "allow"
def _build_explainability(
self,
pattern_analysis: Dict[str, Any],
anomaly_analysis: Dict[str, Any],
risk_score: Dict[str, Any]
) -> Dict[str, Any]:
"""Build explainability payload."""
evidence = []
# Pattern evidence
if pattern_analysis.get("high_frequency"):
evidence.append("High claim frequency detected")
if pattern_analysis.get("unusual_amount"):
evidence.append("Unusual claim amount")
if pattern_analysis.get("early_claim"):
evidence.append("Claim filed shortly after policy inception")
# Anomaly evidence
if anomaly_analysis.get("missing_documents"):
evidence.append("Insufficient documentation")
if anomaly_analysis.get("suspicious_links"):
evidence.append("Linked to suspicious entities")
# Calculate confidence
score_variance = abs(risk_score["pattern_score"] - risk_score["anomaly_score"])
confidence = 1.0 - (score_variance * 0.5)
return {
"evidence": evidence,
"confidence": round(max(confidence, 0.5), 3),
"pattern_analysis": pattern_analysis,
"anomaly_analysis": anomaly_analysis
}
def _create_audit_log(
self,
claim_data: Dict[str, Any],
decision: str,
explainability: Dict[str, Any]
) -> Dict[str, Any]:
"""Create audit log entry."""
import hashlib
timestamp = datetime.utcnow().isoformat()
audit_id = hashlib.sha256(
f"{claim_data.get('claim_id', 'unknown')}_{timestamp}".encode()
).hexdigest()[:16]
return {
"audit_id": audit_id,
"timestamp": timestamp,
"claim_id": claim_data.get("claim_id", "unknown"),
"decision": decision,
"evidence_count": len(explainability.get("evidence", [])),
"model_version": self.version
}
|