FraudSimulator-AI / fraud_engine.py
Bader Alabddan
Add master prompt compliance: models/, data/, docs/, fraud_engine.py
9d20d0b
"""Fraud Engine - Core Decision Logic
This module orchestrates the fraud detection decision process.
It coordinates multiple agents and produces the final decision: investigate | allow
"""
import json
from typing import Dict, List, Any
from datetime import datetime
class FraudEngine:
"""Core fraud detection engine that orchestrates decision-making."""
def __init__(self):
self.version = "1.0.0"
self.decision_threshold = 0.65
def process_claim(self, claim_data: Dict[str, Any]) -> Dict[str, Any]:
"""Process a claim and return fraud decision.
Args:
claim_data: Structured claim information
Returns:
Decision contract with action, evidence, and explainability
"""
# Step 1: Feature Engineering
features = self._engineer_features(claim_data)
# Step 2: Multi-Agent Analysis
pattern_analysis = self._analyze_patterns(features)
anomaly_analysis = self._detect_anomalies(features)
risk_score = self._calculate_risk_score(pattern_analysis, anomaly_analysis)
# Step 3: Decision Logic
decision = self._make_decision(risk_score)
# Step 4: Build Explainability
explainability = self._build_explainability(
pattern_analysis,
anomaly_analysis,
risk_score
)
# Step 5: Governance & Audit
audit_log = self._create_audit_log(claim_data, decision, explainability)
return {
"decision": decision,
"fraud_score": risk_score["score"],
"risk_band": risk_score["band"],
"evidence": explainability["evidence"],
"confidence": explainability["confidence"],
"audit_id": audit_log["audit_id"],
"timestamp": audit_log["timestamp"]
}
def _engineer_features(self, claim_data: Dict[str, Any]) -> Dict[str, Any]:
"""Extract and engineer features from claim data."""
return {
"amount": claim_data.get("amount", 0),
"claim_type": claim_data.get("type", "unknown"),
"claimant_id": claim_data.get("claimant_id", ""),
"policy_age_days": claim_data.get("days_since_policy_start", 365),
"claim_history": claim_data.get("claimant_history", {}),
"documents": claim_data.get("documents", []),
"temporal_data": claim_data.get("temporal_data", {}),
"entity_links": claim_data.get("linked_entities", [])
}
def _analyze_patterns(self, features: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze claim patterns for fraud indicators."""
patterns = {}
# Frequency pattern
claim_count = features.get("claim_history", {}).get("claim_count", 0)
patterns["high_frequency"] = claim_count > 5
patterns["frequency_score"] = min(claim_count / 10.0, 1.0)
# Amount pattern
amount = features.get("amount", 0)
avg_amount = features.get("claim_history", {}).get("avg_amount", 5000)
deviation = abs(amount - avg_amount) / avg_amount if avg_amount > 0 else 0
patterns["amount_deviation"] = deviation
patterns["unusual_amount"] = deviation > 0.5
# Temporal pattern
policy_age = features.get("policy_age_days", 365)
patterns["early_claim"] = policy_age < 30
patterns["temporal_score"] = 1.0 if policy_age < 30 else 0.0
return patterns
def _detect_anomalies(self, features: Dict[str, Any]) -> Dict[str, Any]:
"""Detect anomalies in claim data."""
anomalies = {}
# Document anomalies
documents = features.get("documents", [])
anomalies["missing_documents"] = len(documents) < 2
anomalies["document_score"] = 1.0 if len(documents) < 2 else 0.0
# Entity linkage anomalies
entity_links = features.get("entity_links", [])
anomalies["suspicious_links"] = len(entity_links) > 0
anomalies["entity_score"] = min(len(entity_links) / 5.0, 1.0)
# Behavioral anomalies
claim_history = features.get("claim_history", {})
anomalies["behavioral_score"] = 0.5 if claim_history.get("claim_count", 0) > 3 else 0.0
return anomalies
def _calculate_risk_score(
self,
pattern_analysis: Dict[str, Any],
anomaly_analysis: Dict[str, Any]
) -> Dict[str, Any]:
"""Calculate overall fraud risk score."""
# Weighted scoring
pattern_weight = 0.6
anomaly_weight = 0.4
pattern_score = (
pattern_analysis.get("frequency_score", 0) * 0.4 +
pattern_analysis.get("amount_deviation", 0) * 0.3 +
pattern_analysis.get("temporal_score", 0) * 0.3
)
anomaly_score = (
anomaly_analysis.get("document_score", 0) * 0.4 +
anomaly_analysis.get("entity_score", 0) * 0.4 +
anomaly_analysis.get("behavioral_score", 0) * 0.2
)
overall_score = (pattern_score * pattern_weight) + (anomaly_score * anomaly_weight)
# Determine risk band
if overall_score >= 0.7:
risk_band = "high"
elif overall_score >= 0.4:
risk_band = "medium"
else:
risk_band = "low"
return {
"score": round(overall_score, 3),
"band": risk_band,
"pattern_score": round(pattern_score, 3),
"anomaly_score": round(anomaly_score, 3)
}
def _make_decision(self, risk_score: Dict[str, Any]) -> str:
"""Make final decision: investigate | allow."""
score = risk_score["score"]
return "investigate" if score >= self.decision_threshold else "allow"
def _build_explainability(
self,
pattern_analysis: Dict[str, Any],
anomaly_analysis: Dict[str, Any],
risk_score: Dict[str, Any]
) -> Dict[str, Any]:
"""Build explainability payload."""
evidence = []
# Pattern evidence
if pattern_analysis.get("high_frequency"):
evidence.append("High claim frequency detected")
if pattern_analysis.get("unusual_amount"):
evidence.append("Unusual claim amount")
if pattern_analysis.get("early_claim"):
evidence.append("Claim filed shortly after policy inception")
# Anomaly evidence
if anomaly_analysis.get("missing_documents"):
evidence.append("Insufficient documentation")
if anomaly_analysis.get("suspicious_links"):
evidence.append("Linked to suspicious entities")
# Calculate confidence
score_variance = abs(risk_score["pattern_score"] - risk_score["anomaly_score"])
confidence = 1.0 - (score_variance * 0.5)
return {
"evidence": evidence,
"confidence": round(max(confidence, 0.5), 3),
"pattern_analysis": pattern_analysis,
"anomaly_analysis": anomaly_analysis
}
def _create_audit_log(
self,
claim_data: Dict[str, Any],
decision: str,
explainability: Dict[str, Any]
) -> Dict[str, Any]:
"""Create audit log entry."""
import hashlib
timestamp = datetime.utcnow().isoformat()
audit_id = hashlib.sha256(
f"{claim_data.get('claim_id', 'unknown')}_{timestamp}".encode()
).hexdigest()[:16]
return {
"audit_id": audit_id,
"timestamp": timestamp,
"claim_id": claim_data.get("claim_id", "unknown"),
"decision": decision,
"evidence_count": len(explainability.get("evidence", [])),
"model_version": self.version
}