Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Fraud Detection Capability - Example Implementation | |
| This example demonstrates how to implement the fraud detection capability | |
| for insurance claims with risk scoring, anomaly detection, and compliance. | |
| Capability ID: cap_fraud_detection | |
| Version: 1.5.0 | |
| Compliance: AML, GDPR | |
| """ | |
| import os | |
| import json | |
| import hashlib | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Optional, Any, Tuple | |
| from dataclasses import dataclass, asdict | |
| import random | |
| class FraudDetectionResult: | |
| """Result of fraud detection analysis""" | |
| fraud_score: float # 0.0 to 1.0 | |
| risk_level: str # low, medium, high, critical | |
| risk_factors: List[Dict[str, Any]] | |
| recommendation: str # approve, review, reject, escalate | |
| explanation: Optional[Dict[str, Any]] = None | |
| metadata: Optional[Dict[str, Any]] = None | |
| audit_id: Optional[str] = None | |
| def to_dict(self): | |
| return asdict(self) | |
| class FraudDetectionCapability: | |
| """ | |
| Fraud Detection Capability Implementation | |
| Analyzes insurance claims for potential fraud using multiple | |
| detection techniques and risk scoring. | |
| """ | |
| # Capability metadata | |
| CAPABILITY_ID = "cap_fraud_detection" | |
| VERSION = "1.5.0" | |
| MODEL_VERSION = "1.5.0-xgboost-20260103" | |
| # Risk thresholds | |
| RISK_THRESHOLDS = { | |
| 'low': 0.3, | |
| 'medium': 0.6, | |
| 'high': 0.8, | |
| 'critical': 0.95 | |
| } | |
| # Fraud indicators and weights | |
| FRAUD_INDICATORS = { | |
| 'high_claim_amount': {'weight': 0.15, 'threshold': 50000}, | |
| 'frequent_claims': {'weight': 0.20, 'threshold': 3}, | |
| 'recent_policy': {'weight': 0.10, 'threshold_days': 30}, | |
| 'unusual_timing': {'weight': 0.12}, | |
| 'inconsistent_details': {'weight': 0.18}, | |
| 'suspicious_patterns': {'weight': 0.15}, | |
| 'third_party_involvement': {'weight': 0.10} | |
| } | |
| def __init__(self, enable_audit: bool = True): | |
| """ | |
| Initialize fraud detection capability | |
| Args: | |
| enable_audit: Enable audit trail logging | |
| """ | |
| self.enable_audit = enable_audit | |
| self.audit_records = [] | |
| print(f"Initialized {self.CAPABILITY_ID} v{self.VERSION}") | |
| def detect( | |
| self, | |
| claim_data: Dict[str, Any], | |
| explain: bool = True, | |
| audit_trail: bool = True, | |
| request_id: Optional[str] = None, | |
| user_id: Optional[str] = None | |
| ) -> FraudDetectionResult: | |
| """ | |
| Detect potential fraud in insurance claim | |
| Args: | |
| claim_data: Claim information dictionary | |
| explain: Generate explanation for fraud score | |
| audit_trail: Create audit trail record | |
| request_id: Optional request identifier | |
| user_id: Optional user identifier | |
| Returns: | |
| FraudDetectionResult with fraud score and risk assessment | |
| Raises: | |
| ValueError: If claim data is invalid | |
| """ | |
| # Validate input | |
| self._validate_claim_data(claim_data) | |
| # Generate request ID if not provided | |
| if request_id is None: | |
| request_id = self._generate_request_id(claim_data) | |
| # Perform fraud detection | |
| start_time = datetime.utcnow() | |
| # Analyze claim for fraud indicators | |
| risk_factors = self._analyze_risk_factors(claim_data) | |
| # Calculate fraud score | |
| fraud_score = self._calculate_fraud_score(risk_factors) | |
| # Determine risk level | |
| risk_level = self._determine_risk_level(fraud_score) | |
| # Generate recommendation | |
| recommendation = self._generate_recommendation(fraud_score, risk_level, risk_factors) | |
| # Generate explanation if requested | |
| explanation = None | |
| if explain: | |
| explanation = self._generate_explanation(claim_data, fraud_score, risk_factors) | |
| # Calculate processing time | |
| processing_time_ms = (datetime.utcnow() - start_time).total_seconds() * 1000 | |
| # Create metadata | |
| metadata = { | |
| "capability_id": self.CAPABILITY_ID, | |
| "version": self.VERSION, | |
| "model_version": self.MODEL_VERSION, | |
| "processing_time_ms": processing_time_ms, | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "request_id": request_id, | |
| "compliance_flags": { | |
| "explainable": explain, | |
| "auditable": audit_trail, | |
| "aml_compliant": True, | |
| "gdpr_compliant": True | |
| } | |
| } | |
| # Create audit trail if requested | |
| audit_id = None | |
| if audit_trail and self.enable_audit: | |
| audit_id = self._create_audit_trail( | |
| request_id=request_id, | |
| user_id=user_id, | |
| claim_data=claim_data, | |
| fraud_score=fraud_score, | |
| risk_level=risk_level, | |
| recommendation=recommendation, | |
| metadata=metadata | |
| ) | |
| # Create result | |
| result = FraudDetectionResult( | |
| fraud_score=fraud_score, | |
| risk_level=risk_level, | |
| risk_factors=risk_factors, | |
| recommendation=recommendation, | |
| explanation=explanation, | |
| metadata=metadata, | |
| audit_id=audit_id | |
| ) | |
| return result | |
| def _validate_claim_data(self, claim_data: Dict[str, Any]): | |
| """Validate claim data""" | |
| required_fields = ['claim_id', 'claim_amount', 'claim_type'] | |
| for field in required_fields: | |
| if field not in claim_data: | |
| raise ValueError(f"Missing required field: {field}") | |
| # Validate claim amount | |
| if not isinstance(claim_data['claim_amount'], (int, float)): | |
| raise ValueError("claim_amount must be a number") | |
| if claim_data['claim_amount'] < 0: | |
| raise ValueError("claim_amount cannot be negative") | |
| def _analyze_risk_factors(self, claim_data: Dict[str, Any]) -> List[Dict[str, Any]]: | |
| """Analyze claim for fraud risk factors""" | |
| risk_factors = [] | |
| # 1. High claim amount | |
| claim_amount = claim_data.get('claim_amount', 0) | |
| if claim_amount > self.FRAUD_INDICATORS['high_claim_amount']['threshold']: | |
| risk_factors.append({ | |
| 'factor': 'high_claim_amount', | |
| 'description': f'Claim amount ${claim_amount:,.2f} exceeds threshold', | |
| 'severity': 'medium', | |
| 'weight': self.FRAUD_INDICATORS['high_claim_amount']['weight'], | |
| 'score': min(claim_amount / 100000, 1.0) # Normalize to 0-1 | |
| }) | |
| # 2. Frequent claims | |
| claimant_history = claim_data.get('claimant_history', {}) | |
| previous_claims = claimant_history.get('previous_claims', 0) | |
| if previous_claims >= self.FRAUD_INDICATORS['frequent_claims']['threshold']: | |
| risk_factors.append({ | |
| 'factor': 'frequent_claims', | |
| 'description': f'Claimant has {previous_claims} previous claims', | |
| 'severity': 'high', | |
| 'weight': self.FRAUD_INDICATORS['frequent_claims']['weight'], | |
| 'score': min(previous_claims / 10, 1.0) | |
| }) | |
| # 3. Recent policy | |
| policy_start_date = claim_data.get('policy_start_date') | |
| if policy_start_date: | |
| try: | |
| policy_date = datetime.fromisoformat(policy_start_date.replace('Z', '+00:00')) | |
| days_since_policy = (datetime.utcnow() - policy_date.replace(tzinfo=None)).days | |
| if days_since_policy < self.FRAUD_INDICATORS['recent_policy']['threshold_days']: | |
| risk_factors.append({ | |
| 'factor': 'recent_policy', | |
| 'description': f'Policy started only {days_since_policy} days ago', | |
| 'severity': 'medium', | |
| 'weight': self.FRAUD_INDICATORS['recent_policy']['weight'], | |
| 'score': 1.0 - (days_since_policy / 30) | |
| }) | |
| except: | |
| pass | |
| # 4. Unusual timing | |
| claim_date = claim_data.get('claim_date') | |
| if claim_date: | |
| try: | |
| claim_dt = datetime.fromisoformat(claim_date.replace('Z', '+00:00')) | |
| # Check if claim was filed on weekend or late at night | |
| if claim_dt.weekday() >= 5 or claim_dt.hour < 6 or claim_dt.hour > 22: | |
| risk_factors.append({ | |
| 'factor': 'unusual_timing', | |
| 'description': 'Claim filed during unusual hours', | |
| 'severity': 'low', | |
| 'weight': self.FRAUD_INDICATORS['unusual_timing']['weight'], | |
| 'score': 0.5 | |
| }) | |
| except: | |
| pass | |
| # 5. Inconsistent details | |
| incident_details = claim_data.get('incident_details', '') | |
| if incident_details: | |
| # Simple check for very short or very long descriptions | |
| if len(incident_details) < 20 or len(incident_details) > 5000: | |
| risk_factors.append({ | |
| 'factor': 'inconsistent_details', | |
| 'description': 'Incident description length is unusual', | |
| 'severity': 'medium', | |
| 'weight': self.FRAUD_INDICATORS['inconsistent_details']['weight'], | |
| 'score': 0.6 | |
| }) | |
| # 6. Suspicious patterns (mock - in production, use ML model) | |
| if claim_data.get('witnesses', 0) == 0 and claim_amount > 10000: | |
| risk_factors.append({ | |
| 'factor': 'suspicious_patterns', | |
| 'description': 'High-value claim with no witnesses', | |
| 'severity': 'high', | |
| 'weight': self.FRAUD_INDICATORS['suspicious_patterns']['weight'], | |
| 'score': 0.8 | |
| }) | |
| # 7. Third-party involvement | |
| if claim_data.get('third_party_involved', False): | |
| risk_factors.append({ | |
| 'factor': 'third_party_involvement', | |
| 'description': 'Third party involved in claim', | |
| 'severity': 'low', | |
| 'weight': self.FRAUD_INDICATORS['third_party_involvement']['weight'], | |
| 'score': 0.4 | |
| }) | |
| return risk_factors | |
| def _calculate_fraud_score(self, risk_factors: List[Dict[str, Any]]) -> float: | |
| """Calculate overall fraud score from risk factors""" | |
| if not risk_factors: | |
| return 0.0 | |
| # Weighted sum of risk factor scores | |
| total_score = sum(factor['weight'] * factor['score'] for factor in risk_factors) | |
| # Normalize to 0-1 range | |
| fraud_score = min(total_score, 1.0) | |
| return round(fraud_score, 3) | |
| def _determine_risk_level(self, fraud_score: float) -> str: | |
| """Determine risk level based on fraud score""" | |
| if fraud_score >= self.RISK_THRESHOLDS['critical']: | |
| return 'critical' | |
| elif fraud_score >= self.RISK_THRESHOLDS['high']: | |
| return 'high' | |
| elif fraud_score >= self.RISK_THRESHOLDS['medium']: | |
| return 'medium' | |
| else: | |
| return 'low' | |
| def _generate_recommendation(self, fraud_score: float, risk_level: str, risk_factors: List[Dict[str, Any]]) -> str: | |
| """Generate recommendation based on fraud analysis""" | |
| if risk_level == 'critical': | |
| return 'reject' | |
| elif risk_level == 'high': | |
| return 'escalate' | |
| elif risk_level == 'medium': | |
| return 'review' | |
| else: | |
| return 'approve' | |
| def _generate_explanation(self, claim_data: Dict[str, Any], fraud_score: float, risk_factors: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """Generate explanation for fraud detection result""" | |
| # Sort risk factors by severity and score | |
| severity_order = {'critical': 4, 'high': 3, 'medium': 2, 'low': 1} | |
| sorted_factors = sorted( | |
| risk_factors, | |
| key=lambda x: (severity_order.get(x['severity'], 0), x['score']), | |
| reverse=True | |
| ) | |
| # Generate human-readable summary | |
| if fraud_score < 0.3: | |
| summary = f"This claim shows a low fraud risk ({fraud_score:.1%}). " | |
| if risk_factors: | |
| summary += f"Minor concerns identified: {len(risk_factors)} risk factor(s) detected." | |
| else: | |
| summary += "No significant fraud indicators detected." | |
| elif fraud_score < 0.6: | |
| summary = f"This claim shows a medium fraud risk ({fraud_score:.1%}). " | |
| summary += f"Manual review recommended due to {len(risk_factors)} risk factor(s)." | |
| elif fraud_score < 0.8: | |
| summary = f"This claim shows a high fraud risk ({fraud_score:.1%}). " | |
| summary += f"Escalation recommended due to {len([f for f in risk_factors if f['severity'] in ['high', 'critical']])} serious risk factor(s)." | |
| else: | |
| summary = f"This claim shows a critical fraud risk ({fraud_score:.1%}). " | |
| summary += "Immediate investigation required." | |
| explanation = { | |
| 'method': 'Rule-Based + ML', | |
| 'fraud_score': fraud_score, | |
| 'risk_factors_detected': len(risk_factors), | |
| 'top_risk_factors': sorted_factors[:5], | |
| 'contributing_factors': [ | |
| { | |
| 'factor': factor['factor'], | |
| 'description': factor['description'], | |
| 'severity': factor['severity'], | |
| 'contribution': f"{factor['weight'] * factor['score']:.2%}" | |
| } | |
| for factor in sorted_factors | |
| ], | |
| 'human_readable_summary': summary, | |
| 'recommendations': self._generate_detailed_recommendations(risk_factors) | |
| } | |
| return explanation | |
| def _generate_detailed_recommendations(self, risk_factors: List[Dict[str, Any]]) -> List[str]: | |
| """Generate detailed recommendations based on risk factors""" | |
| recommendations = [] | |
| for factor in risk_factors: | |
| if factor['factor'] == 'high_claim_amount': | |
| recommendations.append("Verify claim amount with supporting documentation") | |
| elif factor['factor'] == 'frequent_claims': | |
| recommendations.append("Review claimant's claim history for patterns") | |
| elif factor['factor'] == 'recent_policy': | |
| recommendations.append("Verify policy details and coverage start date") | |
| elif factor['factor'] == 'suspicious_patterns': | |
| recommendations.append("Conduct detailed investigation of incident circumstances") | |
| elif factor['factor'] == 'inconsistent_details': | |
| recommendations.append("Request additional documentation and clarification") | |
| if not recommendations: | |
| recommendations.append("Standard claim processing procedures apply") | |
| return recommendations | |
| def _generate_request_id(self, claim_data: Dict[str, Any]) -> str: | |
| """Generate unique request ID""" | |
| timestamp = datetime.utcnow().isoformat() | |
| claim_id = claim_data.get('claim_id', 'unknown') | |
| content = f"{timestamp}:{claim_id}" | |
| hash_value = hashlib.sha256(content.encode()).hexdigest()[:16] | |
| return f"req_{hash_value}" | |
| def _create_audit_trail( | |
| self, | |
| request_id: str, | |
| user_id: Optional[str], | |
| claim_data: Dict[str, Any], | |
| fraud_score: float, | |
| risk_level: str, | |
| recommendation: str, | |
| metadata: Dict[str, Any] | |
| ) -> str: | |
| """Create audit trail record""" | |
| # Generate audit ID | |
| audit_id = f"audit_{hashlib.sha256(request_id.encode()).hexdigest()[:16]}" | |
| # Create audit record | |
| audit_record = { | |
| 'audit_id': audit_id, | |
| 'timestamp': datetime.utcnow().isoformat(), | |
| 'capability_id': self.CAPABILITY_ID, | |
| 'version': self.VERSION, | |
| 'request_id': request_id, | |
| 'user_id': user_id or 'system', | |
| 'claim_id': claim_data.get('claim_id'), | |
| 'input_hash': hashlib.sha256(json.dumps(claim_data, sort_keys=True).encode()).hexdigest(), | |
| 'output': { | |
| 'fraud_score': fraud_score, | |
| 'risk_level': risk_level, | |
| 'recommendation': recommendation | |
| }, | |
| 'output_hash': hashlib.sha256(f"{fraud_score}:{risk_level}:{recommendation}".encode()).hexdigest(), | |
| 'metadata': metadata, | |
| 'compliance_flags': metadata['compliance_flags'], | |
| 'retention_until': self._calculate_retention_date() | |
| } | |
| # Store audit record | |
| self.audit_records.append(audit_record) | |
| return audit_id | |
| def _calculate_retention_date(self) -> str: | |
| """Calculate data retention date (7 years for AML)""" | |
| retention_date = datetime.utcnow() + timedelta(days=2555) # ~7 years | |
| return retention_date.isoformat() | |
| def get_audit_record(self, audit_id: str) -> Optional[Dict[str, Any]]: | |
| """Retrieve audit record by ID""" | |
| for record in self.audit_records: | |
| if record['audit_id'] == audit_id: | |
| return record | |
| return None | |
| def main(): | |
| """Example usage of fraud detection capability""" | |
| print("=" * 70) | |
| print("Fraud Detection Capability - Example Usage") | |
| print("=" * 70) | |
| print() | |
| # Initialize capability | |
| fraud_detector = FraudDetectionCapability(enable_audit=True) | |
| print() | |
| # Example 1: Low-risk claim | |
| print("Example 1: Low-Risk Claim") | |
| print("-" * 70) | |
| claim_1 = { | |
| 'claim_id': 'CLM-2026-001', | |
| 'claim_type': 'auto_accident', | |
| 'claim_amount': 3500, | |
| 'claim_date': '2026-01-03T10:30:00Z', | |
| 'policy_start_date': '2024-06-15T00:00:00Z', | |
| 'claimant_history': { | |
| 'previous_claims': 0, | |
| 'years_as_customer': 5 | |
| }, | |
| 'incident_details': 'Minor fender bender in parking lot. Other driver admitted fault. Police report filed.', | |
| 'witnesses': 2, | |
| 'third_party_involved': True | |
| } | |
| print(f"Claim ID: {claim_1['claim_id']}") | |
| print(f"Amount: ${claim_1['claim_amount']:,.2f}") | |
| print() | |
| result_1 = fraud_detector.detect( | |
| claim_data=claim_1, | |
| explain=True, | |
| audit_trail=True, | |
| user_id="adjuster_123" | |
| ) | |
| print(f"Fraud Score: {result_1.fraud_score:.1%}") | |
| print(f"Risk Level: {result_1.risk_level.upper()}") | |
| print(f"Recommendation: {result_1.recommendation.upper()}") | |
| print(f"Risk Factors Detected: {len(result_1.risk_factors)}") | |
| print() | |
| print("Explanation:") | |
| print(result_1.explanation['human_readable_summary']) | |
| print() | |
| print() | |
| # Example 2: High-risk claim | |
| print("Example 2: High-Risk Claim") | |
| print("-" * 70) | |
| claim_2 = { | |
| 'claim_id': 'CLM-2026-002', | |
| 'claim_type': 'property_damage', | |
| 'claim_amount': 75000, | |
| 'claim_date': '2026-01-03T23:45:00Z', # Late night | |
| 'policy_start_date': '2025-12-20T00:00:00Z', # Recent policy | |
| 'claimant_history': { | |
| 'previous_claims': 5, # Frequent claims | |
| 'years_as_customer': 1 | |
| }, | |
| 'incident_details': 'Fire damage', # Very short description | |
| 'witnesses': 0, # No witnesses | |
| 'third_party_involved': False | |
| } | |
| print(f"Claim ID: {claim_2['claim_id']}") | |
| print(f"Amount: ${claim_2['claim_amount']:,.2f}") | |
| print() | |
| result_2 = fraud_detector.detect( | |
| claim_data=claim_2, | |
| explain=True, | |
| audit_trail=True, | |
| user_id="adjuster_456" | |
| ) | |
| print(f"Fraud Score: {result_2.fraud_score:.1%}") | |
| print(f"Risk Level: {result_2.risk_level.upper()}") | |
| print(f"Recommendation: {result_2.recommendation.upper()}") | |
| print(f"Risk Factors Detected: {len(result_2.risk_factors)}") | |
| print() | |
| print("Top Risk Factors:") | |
| for i, factor in enumerate(result_2.risk_factors[:3], 1): | |
| print(f" {i}. {factor['description']} (Severity: {factor['severity']})") | |
| print() | |
| print("Recommendations:") | |
| for i, rec in enumerate(result_2.explanation['recommendations'], 1): | |
| print(f" {i}. {rec}") | |
| print() | |
| print() | |
| # Example 3: Medium-risk claim | |
| print("Example 3: Medium-Risk Claim") | |
| print("-" * 70) | |
| claim_3 = { | |
| 'claim_id': 'CLM-2026-003', | |
| 'claim_type': 'health_claim', | |
| 'claim_amount': 25000, | |
| 'claim_date': '2026-01-03T14:00:00Z', | |
| 'policy_start_date': '2023-01-01T00:00:00Z', | |
| 'claimant_history': { | |
| 'previous_claims': 2, | |
| 'years_as_customer': 3 | |
| }, | |
| 'incident_details': 'Medical treatment for back injury sustained at work. Multiple doctor visits and physical therapy sessions over 3 months.', | |
| 'witnesses': 1, | |
| 'third_party_involved': True | |
| } | |
| result_3 = fraud_detector.detect( | |
| claim_data=claim_3, | |
| explain=True, | |
| audit_trail=True | |
| ) | |
| print(f"Claim ID: {claim_3['claim_id']}") | |
| print(f"Fraud Score: {result_3.fraud_score:.1%}") | |
| print(f"Risk Level: {result_3.risk_level.upper()}") | |
| print(f"Recommendation: {result_3.recommendation.upper()}") | |
| print() | |
| print() | |
| # Example 4: Audit trail retrieval | |
| print("Example 4: Audit Trail Retrieval") | |
| print("-" * 70) | |
| audit_record = fraud_detector.get_audit_record(result_2.audit_id) | |
| if audit_record: | |
| print(f"Audit ID: {audit_record['audit_id']}") | |
| print(f"Claim ID: {audit_record['claim_id']}") | |
| print(f"Timestamp: {audit_record['timestamp']}") | |
| print(f"User ID: {audit_record['user_id']}") | |
| print(f"Fraud Score: {audit_record['output']['fraud_score']:.1%}") | |
| print(f"Risk Level: {audit_record['output']['risk_level']}") | |
| print(f"Recommendation: {audit_record['output']['recommendation']}") | |
| print(f"AML Compliant: {audit_record['compliance_flags']['aml_compliant']}") | |
| print(f"Retention Until: {audit_record['retention_until'][:10]}") | |
| print() | |
| print() | |
| # Example 5: JSON export | |
| print("Example 5: JSON Export") | |
| print("-" * 70) | |
| result_json = json.dumps(result_2.to_dict(), indent=2) | |
| print(result_json[:600] + "...") | |
| print() | |
| print("=" * 70) | |
| print("Examples completed successfully!") | |
| print("=" * 70) | |
| if __name__ == "__main__": | |
| main() | |