BDR-Agent-Factory / examples /fraud_detection_example.py
Bader Alabddan
Add comprehensive documentation and implementation framework
3ef5d3c
#!/usr/bin/env python3
"""
Fraud Detection Capability - Example Implementation
This example demonstrates how to implement the fraud detection capability
for insurance claims with risk scoring, anomaly detection, and compliance.
Capability ID: cap_fraud_detection
Version: 1.5.0
Compliance: AML, GDPR
"""
import os
import json
import hashlib
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass, asdict
import random
@dataclass
class FraudDetectionResult:
"""Result of fraud detection analysis"""
fraud_score: float # 0.0 to 1.0
risk_level: str # low, medium, high, critical
risk_factors: List[Dict[str, Any]]
recommendation: str # approve, review, reject, escalate
explanation: Optional[Dict[str, Any]] = None
metadata: Optional[Dict[str, Any]] = None
audit_id: Optional[str] = None
def to_dict(self):
return asdict(self)
class FraudDetectionCapability:
"""
Fraud Detection Capability Implementation
Analyzes insurance claims for potential fraud using multiple
detection techniques and risk scoring.
"""
# Capability metadata
CAPABILITY_ID = "cap_fraud_detection"
VERSION = "1.5.0"
MODEL_VERSION = "1.5.0-xgboost-20260103"
# Risk thresholds
RISK_THRESHOLDS = {
'low': 0.3,
'medium': 0.6,
'high': 0.8,
'critical': 0.95
}
# Fraud indicators and weights
FRAUD_INDICATORS = {
'high_claim_amount': {'weight': 0.15, 'threshold': 50000},
'frequent_claims': {'weight': 0.20, 'threshold': 3},
'recent_policy': {'weight': 0.10, 'threshold_days': 30},
'unusual_timing': {'weight': 0.12},
'inconsistent_details': {'weight': 0.18},
'suspicious_patterns': {'weight': 0.15},
'third_party_involvement': {'weight': 0.10}
}
def __init__(self, enable_audit: bool = True):
"""
Initialize fraud detection capability
Args:
enable_audit: Enable audit trail logging
"""
self.enable_audit = enable_audit
self.audit_records = []
print(f"Initialized {self.CAPABILITY_ID} v{self.VERSION}")
def detect(
self,
claim_data: Dict[str, Any],
explain: bool = True,
audit_trail: bool = True,
request_id: Optional[str] = None,
user_id: Optional[str] = None
) -> FraudDetectionResult:
"""
Detect potential fraud in insurance claim
Args:
claim_data: Claim information dictionary
explain: Generate explanation for fraud score
audit_trail: Create audit trail record
request_id: Optional request identifier
user_id: Optional user identifier
Returns:
FraudDetectionResult with fraud score and risk assessment
Raises:
ValueError: If claim data is invalid
"""
# Validate input
self._validate_claim_data(claim_data)
# Generate request ID if not provided
if request_id is None:
request_id = self._generate_request_id(claim_data)
# Perform fraud detection
start_time = datetime.utcnow()
# Analyze claim for fraud indicators
risk_factors = self._analyze_risk_factors(claim_data)
# Calculate fraud score
fraud_score = self._calculate_fraud_score(risk_factors)
# Determine risk level
risk_level = self._determine_risk_level(fraud_score)
# Generate recommendation
recommendation = self._generate_recommendation(fraud_score, risk_level, risk_factors)
# Generate explanation if requested
explanation = None
if explain:
explanation = self._generate_explanation(claim_data, fraud_score, risk_factors)
# Calculate processing time
processing_time_ms = (datetime.utcnow() - start_time).total_seconds() * 1000
# Create metadata
metadata = {
"capability_id": self.CAPABILITY_ID,
"version": self.VERSION,
"model_version": self.MODEL_VERSION,
"processing_time_ms": processing_time_ms,
"timestamp": datetime.utcnow().isoformat(),
"request_id": request_id,
"compliance_flags": {
"explainable": explain,
"auditable": audit_trail,
"aml_compliant": True,
"gdpr_compliant": True
}
}
# Create audit trail if requested
audit_id = None
if audit_trail and self.enable_audit:
audit_id = self._create_audit_trail(
request_id=request_id,
user_id=user_id,
claim_data=claim_data,
fraud_score=fraud_score,
risk_level=risk_level,
recommendation=recommendation,
metadata=metadata
)
# Create result
result = FraudDetectionResult(
fraud_score=fraud_score,
risk_level=risk_level,
risk_factors=risk_factors,
recommendation=recommendation,
explanation=explanation,
metadata=metadata,
audit_id=audit_id
)
return result
def _validate_claim_data(self, claim_data: Dict[str, Any]):
"""Validate claim data"""
required_fields = ['claim_id', 'claim_amount', 'claim_type']
for field in required_fields:
if field not in claim_data:
raise ValueError(f"Missing required field: {field}")
# Validate claim amount
if not isinstance(claim_data['claim_amount'], (int, float)):
raise ValueError("claim_amount must be a number")
if claim_data['claim_amount'] < 0:
raise ValueError("claim_amount cannot be negative")
def _analyze_risk_factors(self, claim_data: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Analyze claim for fraud risk factors"""
risk_factors = []
# 1. High claim amount
claim_amount = claim_data.get('claim_amount', 0)
if claim_amount > self.FRAUD_INDICATORS['high_claim_amount']['threshold']:
risk_factors.append({
'factor': 'high_claim_amount',
'description': f'Claim amount ${claim_amount:,.2f} exceeds threshold',
'severity': 'medium',
'weight': self.FRAUD_INDICATORS['high_claim_amount']['weight'],
'score': min(claim_amount / 100000, 1.0) # Normalize to 0-1
})
# 2. Frequent claims
claimant_history = claim_data.get('claimant_history', {})
previous_claims = claimant_history.get('previous_claims', 0)
if previous_claims >= self.FRAUD_INDICATORS['frequent_claims']['threshold']:
risk_factors.append({
'factor': 'frequent_claims',
'description': f'Claimant has {previous_claims} previous claims',
'severity': 'high',
'weight': self.FRAUD_INDICATORS['frequent_claims']['weight'],
'score': min(previous_claims / 10, 1.0)
})
# 3. Recent policy
policy_start_date = claim_data.get('policy_start_date')
if policy_start_date:
try:
policy_date = datetime.fromisoformat(policy_start_date.replace('Z', '+00:00'))
days_since_policy = (datetime.utcnow() - policy_date.replace(tzinfo=None)).days
if days_since_policy < self.FRAUD_INDICATORS['recent_policy']['threshold_days']:
risk_factors.append({
'factor': 'recent_policy',
'description': f'Policy started only {days_since_policy} days ago',
'severity': 'medium',
'weight': self.FRAUD_INDICATORS['recent_policy']['weight'],
'score': 1.0 - (days_since_policy / 30)
})
except:
pass
# 4. Unusual timing
claim_date = claim_data.get('claim_date')
if claim_date:
try:
claim_dt = datetime.fromisoformat(claim_date.replace('Z', '+00:00'))
# Check if claim was filed on weekend or late at night
if claim_dt.weekday() >= 5 or claim_dt.hour < 6 or claim_dt.hour > 22:
risk_factors.append({
'factor': 'unusual_timing',
'description': 'Claim filed during unusual hours',
'severity': 'low',
'weight': self.FRAUD_INDICATORS['unusual_timing']['weight'],
'score': 0.5
})
except:
pass
# 5. Inconsistent details
incident_details = claim_data.get('incident_details', '')
if incident_details:
# Simple check for very short or very long descriptions
if len(incident_details) < 20 or len(incident_details) > 5000:
risk_factors.append({
'factor': 'inconsistent_details',
'description': 'Incident description length is unusual',
'severity': 'medium',
'weight': self.FRAUD_INDICATORS['inconsistent_details']['weight'],
'score': 0.6
})
# 6. Suspicious patterns (mock - in production, use ML model)
if claim_data.get('witnesses', 0) == 0 and claim_amount > 10000:
risk_factors.append({
'factor': 'suspicious_patterns',
'description': 'High-value claim with no witnesses',
'severity': 'high',
'weight': self.FRAUD_INDICATORS['suspicious_patterns']['weight'],
'score': 0.8
})
# 7. Third-party involvement
if claim_data.get('third_party_involved', False):
risk_factors.append({
'factor': 'third_party_involvement',
'description': 'Third party involved in claim',
'severity': 'low',
'weight': self.FRAUD_INDICATORS['third_party_involvement']['weight'],
'score': 0.4
})
return risk_factors
def _calculate_fraud_score(self, risk_factors: List[Dict[str, Any]]) -> float:
"""Calculate overall fraud score from risk factors"""
if not risk_factors:
return 0.0
# Weighted sum of risk factor scores
total_score = sum(factor['weight'] * factor['score'] for factor in risk_factors)
# Normalize to 0-1 range
fraud_score = min(total_score, 1.0)
return round(fraud_score, 3)
def _determine_risk_level(self, fraud_score: float) -> str:
"""Determine risk level based on fraud score"""
if fraud_score >= self.RISK_THRESHOLDS['critical']:
return 'critical'
elif fraud_score >= self.RISK_THRESHOLDS['high']:
return 'high'
elif fraud_score >= self.RISK_THRESHOLDS['medium']:
return 'medium'
else:
return 'low'
def _generate_recommendation(self, fraud_score: float, risk_level: str, risk_factors: List[Dict[str, Any]]) -> str:
"""Generate recommendation based on fraud analysis"""
if risk_level == 'critical':
return 'reject'
elif risk_level == 'high':
return 'escalate'
elif risk_level == 'medium':
return 'review'
else:
return 'approve'
def _generate_explanation(self, claim_data: Dict[str, Any], fraud_score: float, risk_factors: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Generate explanation for fraud detection result"""
# Sort risk factors by severity and score
severity_order = {'critical': 4, 'high': 3, 'medium': 2, 'low': 1}
sorted_factors = sorted(
risk_factors,
key=lambda x: (severity_order.get(x['severity'], 0), x['score']),
reverse=True
)
# Generate human-readable summary
if fraud_score < 0.3:
summary = f"This claim shows a low fraud risk ({fraud_score:.1%}). "
if risk_factors:
summary += f"Minor concerns identified: {len(risk_factors)} risk factor(s) detected."
else:
summary += "No significant fraud indicators detected."
elif fraud_score < 0.6:
summary = f"This claim shows a medium fraud risk ({fraud_score:.1%}). "
summary += f"Manual review recommended due to {len(risk_factors)} risk factor(s)."
elif fraud_score < 0.8:
summary = f"This claim shows a high fraud risk ({fraud_score:.1%}). "
summary += f"Escalation recommended due to {len([f for f in risk_factors if f['severity'] in ['high', 'critical']])} serious risk factor(s)."
else:
summary = f"This claim shows a critical fraud risk ({fraud_score:.1%}). "
summary += "Immediate investigation required."
explanation = {
'method': 'Rule-Based + ML',
'fraud_score': fraud_score,
'risk_factors_detected': len(risk_factors),
'top_risk_factors': sorted_factors[:5],
'contributing_factors': [
{
'factor': factor['factor'],
'description': factor['description'],
'severity': factor['severity'],
'contribution': f"{factor['weight'] * factor['score']:.2%}"
}
for factor in sorted_factors
],
'human_readable_summary': summary,
'recommendations': self._generate_detailed_recommendations(risk_factors)
}
return explanation
def _generate_detailed_recommendations(self, risk_factors: List[Dict[str, Any]]) -> List[str]:
"""Generate detailed recommendations based on risk factors"""
recommendations = []
for factor in risk_factors:
if factor['factor'] == 'high_claim_amount':
recommendations.append("Verify claim amount with supporting documentation")
elif factor['factor'] == 'frequent_claims':
recommendations.append("Review claimant's claim history for patterns")
elif factor['factor'] == 'recent_policy':
recommendations.append("Verify policy details and coverage start date")
elif factor['factor'] == 'suspicious_patterns':
recommendations.append("Conduct detailed investigation of incident circumstances")
elif factor['factor'] == 'inconsistent_details':
recommendations.append("Request additional documentation and clarification")
if not recommendations:
recommendations.append("Standard claim processing procedures apply")
return recommendations
def _generate_request_id(self, claim_data: Dict[str, Any]) -> str:
"""Generate unique request ID"""
timestamp = datetime.utcnow().isoformat()
claim_id = claim_data.get('claim_id', 'unknown')
content = f"{timestamp}:{claim_id}"
hash_value = hashlib.sha256(content.encode()).hexdigest()[:16]
return f"req_{hash_value}"
def _create_audit_trail(
self,
request_id: str,
user_id: Optional[str],
claim_data: Dict[str, Any],
fraud_score: float,
risk_level: str,
recommendation: str,
metadata: Dict[str, Any]
) -> str:
"""Create audit trail record"""
# Generate audit ID
audit_id = f"audit_{hashlib.sha256(request_id.encode()).hexdigest()[:16]}"
# Create audit record
audit_record = {
'audit_id': audit_id,
'timestamp': datetime.utcnow().isoformat(),
'capability_id': self.CAPABILITY_ID,
'version': self.VERSION,
'request_id': request_id,
'user_id': user_id or 'system',
'claim_id': claim_data.get('claim_id'),
'input_hash': hashlib.sha256(json.dumps(claim_data, sort_keys=True).encode()).hexdigest(),
'output': {
'fraud_score': fraud_score,
'risk_level': risk_level,
'recommendation': recommendation
},
'output_hash': hashlib.sha256(f"{fraud_score}:{risk_level}:{recommendation}".encode()).hexdigest(),
'metadata': metadata,
'compliance_flags': metadata['compliance_flags'],
'retention_until': self._calculate_retention_date()
}
# Store audit record
self.audit_records.append(audit_record)
return audit_id
def _calculate_retention_date(self) -> str:
"""Calculate data retention date (7 years for AML)"""
retention_date = datetime.utcnow() + timedelta(days=2555) # ~7 years
return retention_date.isoformat()
def get_audit_record(self, audit_id: str) -> Optional[Dict[str, Any]]:
"""Retrieve audit record by ID"""
for record in self.audit_records:
if record['audit_id'] == audit_id:
return record
return None
def main():
"""Example usage of fraud detection capability"""
print("=" * 70)
print("Fraud Detection Capability - Example Usage")
print("=" * 70)
print()
# Initialize capability
fraud_detector = FraudDetectionCapability(enable_audit=True)
print()
# Example 1: Low-risk claim
print("Example 1: Low-Risk Claim")
print("-" * 70)
claim_1 = {
'claim_id': 'CLM-2026-001',
'claim_type': 'auto_accident',
'claim_amount': 3500,
'claim_date': '2026-01-03T10:30:00Z',
'policy_start_date': '2024-06-15T00:00:00Z',
'claimant_history': {
'previous_claims': 0,
'years_as_customer': 5
},
'incident_details': 'Minor fender bender in parking lot. Other driver admitted fault. Police report filed.',
'witnesses': 2,
'third_party_involved': True
}
print(f"Claim ID: {claim_1['claim_id']}")
print(f"Amount: ${claim_1['claim_amount']:,.2f}")
print()
result_1 = fraud_detector.detect(
claim_data=claim_1,
explain=True,
audit_trail=True,
user_id="adjuster_123"
)
print(f"Fraud Score: {result_1.fraud_score:.1%}")
print(f"Risk Level: {result_1.risk_level.upper()}")
print(f"Recommendation: {result_1.recommendation.upper()}")
print(f"Risk Factors Detected: {len(result_1.risk_factors)}")
print()
print("Explanation:")
print(result_1.explanation['human_readable_summary'])
print()
print()
# Example 2: High-risk claim
print("Example 2: High-Risk Claim")
print("-" * 70)
claim_2 = {
'claim_id': 'CLM-2026-002',
'claim_type': 'property_damage',
'claim_amount': 75000,
'claim_date': '2026-01-03T23:45:00Z', # Late night
'policy_start_date': '2025-12-20T00:00:00Z', # Recent policy
'claimant_history': {
'previous_claims': 5, # Frequent claims
'years_as_customer': 1
},
'incident_details': 'Fire damage', # Very short description
'witnesses': 0, # No witnesses
'third_party_involved': False
}
print(f"Claim ID: {claim_2['claim_id']}")
print(f"Amount: ${claim_2['claim_amount']:,.2f}")
print()
result_2 = fraud_detector.detect(
claim_data=claim_2,
explain=True,
audit_trail=True,
user_id="adjuster_456"
)
print(f"Fraud Score: {result_2.fraud_score:.1%}")
print(f"Risk Level: {result_2.risk_level.upper()}")
print(f"Recommendation: {result_2.recommendation.upper()}")
print(f"Risk Factors Detected: {len(result_2.risk_factors)}")
print()
print("Top Risk Factors:")
for i, factor in enumerate(result_2.risk_factors[:3], 1):
print(f" {i}. {factor['description']} (Severity: {factor['severity']})")
print()
print("Recommendations:")
for i, rec in enumerate(result_2.explanation['recommendations'], 1):
print(f" {i}. {rec}")
print()
print()
# Example 3: Medium-risk claim
print("Example 3: Medium-Risk Claim")
print("-" * 70)
claim_3 = {
'claim_id': 'CLM-2026-003',
'claim_type': 'health_claim',
'claim_amount': 25000,
'claim_date': '2026-01-03T14:00:00Z',
'policy_start_date': '2023-01-01T00:00:00Z',
'claimant_history': {
'previous_claims': 2,
'years_as_customer': 3
},
'incident_details': 'Medical treatment for back injury sustained at work. Multiple doctor visits and physical therapy sessions over 3 months.',
'witnesses': 1,
'third_party_involved': True
}
result_3 = fraud_detector.detect(
claim_data=claim_3,
explain=True,
audit_trail=True
)
print(f"Claim ID: {claim_3['claim_id']}")
print(f"Fraud Score: {result_3.fraud_score:.1%}")
print(f"Risk Level: {result_3.risk_level.upper()}")
print(f"Recommendation: {result_3.recommendation.upper()}")
print()
print()
# Example 4: Audit trail retrieval
print("Example 4: Audit Trail Retrieval")
print("-" * 70)
audit_record = fraud_detector.get_audit_record(result_2.audit_id)
if audit_record:
print(f"Audit ID: {audit_record['audit_id']}")
print(f"Claim ID: {audit_record['claim_id']}")
print(f"Timestamp: {audit_record['timestamp']}")
print(f"User ID: {audit_record['user_id']}")
print(f"Fraud Score: {audit_record['output']['fraud_score']:.1%}")
print(f"Risk Level: {audit_record['output']['risk_level']}")
print(f"Recommendation: {audit_record['output']['recommendation']}")
print(f"AML Compliant: {audit_record['compliance_flags']['aml_compliant']}")
print(f"Retention Until: {audit_record['retention_until'][:10]}")
print()
print()
# Example 5: JSON export
print("Example 5: JSON Export")
print("-" * 70)
result_json = json.dumps(result_2.to_dict(), indent=2)
print(result_json[:600] + "...")
print()
print("=" * 70)
print("Examples completed successfully!")
print("=" * 70)
if __name__ == "__main__":
main()