#!/usr/bin/env python3 """ Fraud Detection Capability - Example Implementation This example demonstrates how to implement the fraud detection capability for insurance claims with risk scoring, anomaly detection, and compliance. Capability ID: cap_fraud_detection Version: 1.5.0 Compliance: AML, GDPR """ import os import json import hashlib from datetime import datetime, timedelta from typing import Dict, List, Optional, Any, Tuple from dataclasses import dataclass, asdict import random @dataclass class FraudDetectionResult: """Result of fraud detection analysis""" fraud_score: float # 0.0 to 1.0 risk_level: str # low, medium, high, critical risk_factors: List[Dict[str, Any]] recommendation: str # approve, review, reject, escalate explanation: Optional[Dict[str, Any]] = None metadata: Optional[Dict[str, Any]] = None audit_id: Optional[str] = None def to_dict(self): return asdict(self) class FraudDetectionCapability: """ Fraud Detection Capability Implementation Analyzes insurance claims for potential fraud using multiple detection techniques and risk scoring. """ # Capability metadata CAPABILITY_ID = "cap_fraud_detection" VERSION = "1.5.0" MODEL_VERSION = "1.5.0-xgboost-20260103" # Risk thresholds RISK_THRESHOLDS = { 'low': 0.3, 'medium': 0.6, 'high': 0.8, 'critical': 0.95 } # Fraud indicators and weights FRAUD_INDICATORS = { 'high_claim_amount': {'weight': 0.15, 'threshold': 50000}, 'frequent_claims': {'weight': 0.20, 'threshold': 3}, 'recent_policy': {'weight': 0.10, 'threshold_days': 30}, 'unusual_timing': {'weight': 0.12}, 'inconsistent_details': {'weight': 0.18}, 'suspicious_patterns': {'weight': 0.15}, 'third_party_involvement': {'weight': 0.10} } def __init__(self, enable_audit: bool = True): """ Initialize fraud detection capability Args: enable_audit: Enable audit trail logging """ self.enable_audit = enable_audit self.audit_records = [] print(f"Initialized {self.CAPABILITY_ID} v{self.VERSION}") def detect( self, claim_data: Dict[str, Any], explain: bool = True, audit_trail: bool = True, request_id: Optional[str] = None, user_id: Optional[str] = None ) -> FraudDetectionResult: """ Detect potential fraud in insurance claim Args: claim_data: Claim information dictionary explain: Generate explanation for fraud score audit_trail: Create audit trail record request_id: Optional request identifier user_id: Optional user identifier Returns: FraudDetectionResult with fraud score and risk assessment Raises: ValueError: If claim data is invalid """ # Validate input self._validate_claim_data(claim_data) # Generate request ID if not provided if request_id is None: request_id = self._generate_request_id(claim_data) # Perform fraud detection start_time = datetime.utcnow() # Analyze claim for fraud indicators risk_factors = self._analyze_risk_factors(claim_data) # Calculate fraud score fraud_score = self._calculate_fraud_score(risk_factors) # Determine risk level risk_level = self._determine_risk_level(fraud_score) # Generate recommendation recommendation = self._generate_recommendation(fraud_score, risk_level, risk_factors) # Generate explanation if requested explanation = None if explain: explanation = self._generate_explanation(claim_data, fraud_score, risk_factors) # Calculate processing time processing_time_ms = (datetime.utcnow() - start_time).total_seconds() * 1000 # Create metadata metadata = { "capability_id": self.CAPABILITY_ID, "version": self.VERSION, "model_version": self.MODEL_VERSION, "processing_time_ms": processing_time_ms, "timestamp": datetime.utcnow().isoformat(), "request_id": request_id, "compliance_flags": { "explainable": explain, "auditable": audit_trail, "aml_compliant": True, "gdpr_compliant": True } } # Create audit trail if requested audit_id = None if audit_trail and self.enable_audit: audit_id = self._create_audit_trail( request_id=request_id, user_id=user_id, claim_data=claim_data, fraud_score=fraud_score, risk_level=risk_level, recommendation=recommendation, metadata=metadata ) # Create result result = FraudDetectionResult( fraud_score=fraud_score, risk_level=risk_level, risk_factors=risk_factors, recommendation=recommendation, explanation=explanation, metadata=metadata, audit_id=audit_id ) return result def _validate_claim_data(self, claim_data: Dict[str, Any]): """Validate claim data""" required_fields = ['claim_id', 'claim_amount', 'claim_type'] for field in required_fields: if field not in claim_data: raise ValueError(f"Missing required field: {field}") # Validate claim amount if not isinstance(claim_data['claim_amount'], (int, float)): raise ValueError("claim_amount must be a number") if claim_data['claim_amount'] < 0: raise ValueError("claim_amount cannot be negative") def _analyze_risk_factors(self, claim_data: Dict[str, Any]) -> List[Dict[str, Any]]: """Analyze claim for fraud risk factors""" risk_factors = [] # 1. High claim amount claim_amount = claim_data.get('claim_amount', 0) if claim_amount > self.FRAUD_INDICATORS['high_claim_amount']['threshold']: risk_factors.append({ 'factor': 'high_claim_amount', 'description': f'Claim amount ${claim_amount:,.2f} exceeds threshold', 'severity': 'medium', 'weight': self.FRAUD_INDICATORS['high_claim_amount']['weight'], 'score': min(claim_amount / 100000, 1.0) # Normalize to 0-1 }) # 2. Frequent claims claimant_history = claim_data.get('claimant_history', {}) previous_claims = claimant_history.get('previous_claims', 0) if previous_claims >= self.FRAUD_INDICATORS['frequent_claims']['threshold']: risk_factors.append({ 'factor': 'frequent_claims', 'description': f'Claimant has {previous_claims} previous claims', 'severity': 'high', 'weight': self.FRAUD_INDICATORS['frequent_claims']['weight'], 'score': min(previous_claims / 10, 1.0) }) # 3. Recent policy policy_start_date = claim_data.get('policy_start_date') if policy_start_date: try: policy_date = datetime.fromisoformat(policy_start_date.replace('Z', '+00:00')) days_since_policy = (datetime.utcnow() - policy_date.replace(tzinfo=None)).days if days_since_policy < self.FRAUD_INDICATORS['recent_policy']['threshold_days']: risk_factors.append({ 'factor': 'recent_policy', 'description': f'Policy started only {days_since_policy} days ago', 'severity': 'medium', 'weight': self.FRAUD_INDICATORS['recent_policy']['weight'], 'score': 1.0 - (days_since_policy / 30) }) except: pass # 4. Unusual timing claim_date = claim_data.get('claim_date') if claim_date: try: claim_dt = datetime.fromisoformat(claim_date.replace('Z', '+00:00')) # Check if claim was filed on weekend or late at night if claim_dt.weekday() >= 5 or claim_dt.hour < 6 or claim_dt.hour > 22: risk_factors.append({ 'factor': 'unusual_timing', 'description': 'Claim filed during unusual hours', 'severity': 'low', 'weight': self.FRAUD_INDICATORS['unusual_timing']['weight'], 'score': 0.5 }) except: pass # 5. Inconsistent details incident_details = claim_data.get('incident_details', '') if incident_details: # Simple check for very short or very long descriptions if len(incident_details) < 20 or len(incident_details) > 5000: risk_factors.append({ 'factor': 'inconsistent_details', 'description': 'Incident description length is unusual', 'severity': 'medium', 'weight': self.FRAUD_INDICATORS['inconsistent_details']['weight'], 'score': 0.6 }) # 6. Suspicious patterns (mock - in production, use ML model) if claim_data.get('witnesses', 0) == 0 and claim_amount > 10000: risk_factors.append({ 'factor': 'suspicious_patterns', 'description': 'High-value claim with no witnesses', 'severity': 'high', 'weight': self.FRAUD_INDICATORS['suspicious_patterns']['weight'], 'score': 0.8 }) # 7. Third-party involvement if claim_data.get('third_party_involved', False): risk_factors.append({ 'factor': 'third_party_involvement', 'description': 'Third party involved in claim', 'severity': 'low', 'weight': self.FRAUD_INDICATORS['third_party_involvement']['weight'], 'score': 0.4 }) return risk_factors def _calculate_fraud_score(self, risk_factors: List[Dict[str, Any]]) -> float: """Calculate overall fraud score from risk factors""" if not risk_factors: return 0.0 # Weighted sum of risk factor scores total_score = sum(factor['weight'] * factor['score'] for factor in risk_factors) # Normalize to 0-1 range fraud_score = min(total_score, 1.0) return round(fraud_score, 3) def _determine_risk_level(self, fraud_score: float) -> str: """Determine risk level based on fraud score""" if fraud_score >= self.RISK_THRESHOLDS['critical']: return 'critical' elif fraud_score >= self.RISK_THRESHOLDS['high']: return 'high' elif fraud_score >= self.RISK_THRESHOLDS['medium']: return 'medium' else: return 'low' def _generate_recommendation(self, fraud_score: float, risk_level: str, risk_factors: List[Dict[str, Any]]) -> str: """Generate recommendation based on fraud analysis""" if risk_level == 'critical': return 'reject' elif risk_level == 'high': return 'escalate' elif risk_level == 'medium': return 'review' else: return 'approve' def _generate_explanation(self, claim_data: Dict[str, Any], fraud_score: float, risk_factors: List[Dict[str, Any]]) -> Dict[str, Any]: """Generate explanation for fraud detection result""" # Sort risk factors by severity and score severity_order = {'critical': 4, 'high': 3, 'medium': 2, 'low': 1} sorted_factors = sorted( risk_factors, key=lambda x: (severity_order.get(x['severity'], 0), x['score']), reverse=True ) # Generate human-readable summary if fraud_score < 0.3: summary = f"This claim shows a low fraud risk ({fraud_score:.1%}). " if risk_factors: summary += f"Minor concerns identified: {len(risk_factors)} risk factor(s) detected." else: summary += "No significant fraud indicators detected." elif fraud_score < 0.6: summary = f"This claim shows a medium fraud risk ({fraud_score:.1%}). " summary += f"Manual review recommended due to {len(risk_factors)} risk factor(s)." elif fraud_score < 0.8: summary = f"This claim shows a high fraud risk ({fraud_score:.1%}). " summary += f"Escalation recommended due to {len([f for f in risk_factors if f['severity'] in ['high', 'critical']])} serious risk factor(s)." else: summary = f"This claim shows a critical fraud risk ({fraud_score:.1%}). " summary += "Immediate investigation required." explanation = { 'method': 'Rule-Based + ML', 'fraud_score': fraud_score, 'risk_factors_detected': len(risk_factors), 'top_risk_factors': sorted_factors[:5], 'contributing_factors': [ { 'factor': factor['factor'], 'description': factor['description'], 'severity': factor['severity'], 'contribution': f"{factor['weight'] * factor['score']:.2%}" } for factor in sorted_factors ], 'human_readable_summary': summary, 'recommendations': self._generate_detailed_recommendations(risk_factors) } return explanation def _generate_detailed_recommendations(self, risk_factors: List[Dict[str, Any]]) -> List[str]: """Generate detailed recommendations based on risk factors""" recommendations = [] for factor in risk_factors: if factor['factor'] == 'high_claim_amount': recommendations.append("Verify claim amount with supporting documentation") elif factor['factor'] == 'frequent_claims': recommendations.append("Review claimant's claim history for patterns") elif factor['factor'] == 'recent_policy': recommendations.append("Verify policy details and coverage start date") elif factor['factor'] == 'suspicious_patterns': recommendations.append("Conduct detailed investigation of incident circumstances") elif factor['factor'] == 'inconsistent_details': recommendations.append("Request additional documentation and clarification") if not recommendations: recommendations.append("Standard claim processing procedures apply") return recommendations def _generate_request_id(self, claim_data: Dict[str, Any]) -> str: """Generate unique request ID""" timestamp = datetime.utcnow().isoformat() claim_id = claim_data.get('claim_id', 'unknown') content = f"{timestamp}:{claim_id}" hash_value = hashlib.sha256(content.encode()).hexdigest()[:16] return f"req_{hash_value}" def _create_audit_trail( self, request_id: str, user_id: Optional[str], claim_data: Dict[str, Any], fraud_score: float, risk_level: str, recommendation: str, metadata: Dict[str, Any] ) -> str: """Create audit trail record""" # Generate audit ID audit_id = f"audit_{hashlib.sha256(request_id.encode()).hexdigest()[:16]}" # Create audit record audit_record = { 'audit_id': audit_id, 'timestamp': datetime.utcnow().isoformat(), 'capability_id': self.CAPABILITY_ID, 'version': self.VERSION, 'request_id': request_id, 'user_id': user_id or 'system', 'claim_id': claim_data.get('claim_id'), 'input_hash': hashlib.sha256(json.dumps(claim_data, sort_keys=True).encode()).hexdigest(), 'output': { 'fraud_score': fraud_score, 'risk_level': risk_level, 'recommendation': recommendation }, 'output_hash': hashlib.sha256(f"{fraud_score}:{risk_level}:{recommendation}".encode()).hexdigest(), 'metadata': metadata, 'compliance_flags': metadata['compliance_flags'], 'retention_until': self._calculate_retention_date() } # Store audit record self.audit_records.append(audit_record) return audit_id def _calculate_retention_date(self) -> str: """Calculate data retention date (7 years for AML)""" retention_date = datetime.utcnow() + timedelta(days=2555) # ~7 years return retention_date.isoformat() def get_audit_record(self, audit_id: str) -> Optional[Dict[str, Any]]: """Retrieve audit record by ID""" for record in self.audit_records: if record['audit_id'] == audit_id: return record return None def main(): """Example usage of fraud detection capability""" print("=" * 70) print("Fraud Detection Capability - Example Usage") print("=" * 70) print() # Initialize capability fraud_detector = FraudDetectionCapability(enable_audit=True) print() # Example 1: Low-risk claim print("Example 1: Low-Risk Claim") print("-" * 70) claim_1 = { 'claim_id': 'CLM-2026-001', 'claim_type': 'auto_accident', 'claim_amount': 3500, 'claim_date': '2026-01-03T10:30:00Z', 'policy_start_date': '2024-06-15T00:00:00Z', 'claimant_history': { 'previous_claims': 0, 'years_as_customer': 5 }, 'incident_details': 'Minor fender bender in parking lot. Other driver admitted fault. Police report filed.', 'witnesses': 2, 'third_party_involved': True } print(f"Claim ID: {claim_1['claim_id']}") print(f"Amount: ${claim_1['claim_amount']:,.2f}") print() result_1 = fraud_detector.detect( claim_data=claim_1, explain=True, audit_trail=True, user_id="adjuster_123" ) print(f"Fraud Score: {result_1.fraud_score:.1%}") print(f"Risk Level: {result_1.risk_level.upper()}") print(f"Recommendation: {result_1.recommendation.upper()}") print(f"Risk Factors Detected: {len(result_1.risk_factors)}") print() print("Explanation:") print(result_1.explanation['human_readable_summary']) print() print() # Example 2: High-risk claim print("Example 2: High-Risk Claim") print("-" * 70) claim_2 = { 'claim_id': 'CLM-2026-002', 'claim_type': 'property_damage', 'claim_amount': 75000, 'claim_date': '2026-01-03T23:45:00Z', # Late night 'policy_start_date': '2025-12-20T00:00:00Z', # Recent policy 'claimant_history': { 'previous_claims': 5, # Frequent claims 'years_as_customer': 1 }, 'incident_details': 'Fire damage', # Very short description 'witnesses': 0, # No witnesses 'third_party_involved': False } print(f"Claim ID: {claim_2['claim_id']}") print(f"Amount: ${claim_2['claim_amount']:,.2f}") print() result_2 = fraud_detector.detect( claim_data=claim_2, explain=True, audit_trail=True, user_id="adjuster_456" ) print(f"Fraud Score: {result_2.fraud_score:.1%}") print(f"Risk Level: {result_2.risk_level.upper()}") print(f"Recommendation: {result_2.recommendation.upper()}") print(f"Risk Factors Detected: {len(result_2.risk_factors)}") print() print("Top Risk Factors:") for i, factor in enumerate(result_2.risk_factors[:3], 1): print(f" {i}. {factor['description']} (Severity: {factor['severity']})") print() print("Recommendations:") for i, rec in enumerate(result_2.explanation['recommendations'], 1): print(f" {i}. {rec}") print() print() # Example 3: Medium-risk claim print("Example 3: Medium-Risk Claim") print("-" * 70) claim_3 = { 'claim_id': 'CLM-2026-003', 'claim_type': 'health_claim', 'claim_amount': 25000, 'claim_date': '2026-01-03T14:00:00Z', 'policy_start_date': '2023-01-01T00:00:00Z', 'claimant_history': { 'previous_claims': 2, 'years_as_customer': 3 }, 'incident_details': 'Medical treatment for back injury sustained at work. Multiple doctor visits and physical therapy sessions over 3 months.', 'witnesses': 1, 'third_party_involved': True } result_3 = fraud_detector.detect( claim_data=claim_3, explain=True, audit_trail=True ) print(f"Claim ID: {claim_3['claim_id']}") print(f"Fraud Score: {result_3.fraud_score:.1%}") print(f"Risk Level: {result_3.risk_level.upper()}") print(f"Recommendation: {result_3.recommendation.upper()}") print() print() # Example 4: Audit trail retrieval print("Example 4: Audit Trail Retrieval") print("-" * 70) audit_record = fraud_detector.get_audit_record(result_2.audit_id) if audit_record: print(f"Audit ID: {audit_record['audit_id']}") print(f"Claim ID: {audit_record['claim_id']}") print(f"Timestamp: {audit_record['timestamp']}") print(f"User ID: {audit_record['user_id']}") print(f"Fraud Score: {audit_record['output']['fraud_score']:.1%}") print(f"Risk Level: {audit_record['output']['risk_level']}") print(f"Recommendation: {audit_record['output']['recommendation']}") print(f"AML Compliant: {audit_record['compliance_flags']['aml_compliant']}") print(f"Retention Until: {audit_record['retention_until'][:10]}") print() print() # Example 5: JSON export print("Example 5: JSON Export") print("-" * 70) result_json = json.dumps(result_2.to_dict(), indent=2) print(result_json[:600] + "...") print() print("=" * 70) print("Examples completed successfully!") print("=" * 70) if __name__ == "__main__": main()