#!/usr/bin/env python3 """ Integration Example - BDR Agent Factory This example demonstrates how to integrate multiple AI capabilities to build a complete insurance claims processing workflow. Workflow: 1. Text Classification - Categorize claim type 2. Fraud Detection - Assess fraud risk 3. Decision Making - Approve, review, or reject claim 4. Audit Trail - Track entire process """ import json from datetime import datetime from typing import Dict, Any, List from dataclasses import dataclass, asdict # Import capability implementations try: from text_classification_example import TextClassificationCapability from fraud_detection_example import FraudDetectionCapability except ImportError: print("Warning: Could not import capability examples. Make sure they are in the same directory.") TextClassificationCapability = None FraudDetectionCapability = None @dataclass class ClaimProcessingResult: """Result of complete claim processing workflow""" claim_id: str classification: Dict[str, Any] fraud_detection: Dict[str, Any] final_decision: str decision_reason: str processing_time_ms: float audit_trail: List[Dict[str, Any]] metadata: Dict[str, Any] def to_dict(self): return asdict(self) class ClaimsProcessingWorkflow: """ Complete claims processing workflow integrating multiple AI capabilities """ def __init__(self): """Initialize workflow with required capabilities""" print("Initializing Claims Processing Workflow...") print() # Initialize capabilities if TextClassificationCapability: self.text_classifier = TextClassificationCapability(enable_audit=True) else: self.text_classifier = None print("Warning: Text classification not available") if FraudDetectionCapability: self.fraud_detector = FraudDetectionCapability(enable_audit=True) else: self.fraud_detector = None print("Warning: Fraud detection not available") print("Workflow initialized successfully!") print() def process_claim( self, claim_data: Dict[str, Any], user_id: str = "system" ) -> ClaimProcessingResult: """ Process insurance claim through complete workflow Args: claim_data: Claim information including description and details user_id: User processing the claim Returns: ClaimProcessingResult with complete processing information """ start_time = datetime.utcnow() audit_trail = [] claim_id = claim_data.get('claim_id', 'UNKNOWN') print(f"Processing Claim: {claim_id}") print("=" * 70) # Step 1: Classify claim type print("Step 1: Classifying claim type...") classification_result = None if self.text_classifier and 'description' in claim_data: classification_result = self.text_classifier.classify( text=claim_data['description'], explain=True, audit_trail=True, user_id=user_id ) print(f" ✓ Classification: {classification_result.predicted_class}") print(f" ✓ Confidence: {classification_result.confidence:.1%}") audit_trail.append({ 'step': 'classification', 'timestamp': datetime.utcnow().isoformat(), 'capability_id': 'cap_text_classification', 'result': classification_result.predicted_class, 'confidence': classification_result.confidence, 'audit_id': classification_result.audit_id }) else: print(" ⚠ Classification skipped (not available or no description)") classification_result = type('obj', (object,), { 'predicted_class': claim_data.get('claim_type', 'unknown'), 'confidence': 0.5, 'all_scores': {}, 'explanation': None, 'metadata': {}, 'audit_id': None }) print() # Step 2: Detect fraud print("Step 2: Analyzing fraud risk...") fraud_result = None if self.fraud_detector: # Prepare claim data for fraud detection fraud_claim_data = { 'claim_id': claim_id, 'claim_type': classification_result.predicted_class, 'claim_amount': claim_data.get('claim_amount', 0), 'claim_date': claim_data.get('claim_date', datetime.utcnow().isoformat()), 'policy_start_date': claim_data.get('policy_start_date'), 'claimant_history': claim_data.get('claimant_history', {}), 'incident_details': claim_data.get('description', ''), 'witnesses': claim_data.get('witnesses', 0), 'third_party_involved': claim_data.get('third_party_involved', False) } fraud_result = self.fraud_detector.detect( claim_data=fraud_claim_data, explain=True, audit_trail=True, user_id=user_id ) print(f" ✓ Fraud Score: {fraud_result.fraud_score:.1%}") print(f" ✓ Risk Level: {fraud_result.risk_level.upper()}") print(f" ✓ Recommendation: {fraud_result.recommendation.upper()}") audit_trail.append({ 'step': 'fraud_detection', 'timestamp': datetime.utcnow().isoformat(), 'capability_id': 'cap_fraud_detection', 'fraud_score': fraud_result.fraud_score, 'risk_level': fraud_result.risk_level, 'recommendation': fraud_result.recommendation, 'audit_id': fraud_result.audit_id }) else: print(" ⚠ Fraud detection skipped (not available)") fraud_result = type('obj', (object,), { 'fraud_score': 0.0, 'risk_level': 'low', 'risk_factors': [], 'recommendation': 'approve', 'explanation': None, 'metadata': {}, 'audit_id': None }) print() # Step 3: Make final decision print("Step 3: Making final decision...") final_decision, decision_reason = self._make_decision( classification_result, fraud_result, claim_data ) print(f" ✓ Final Decision: {final_decision.upper()}") print(f" ✓ Reason: {decision_reason}") audit_trail.append({ 'step': 'final_decision', 'timestamp': datetime.utcnow().isoformat(), 'decision': final_decision, 'reason': decision_reason }) print() # Calculate total processing time processing_time_ms = (datetime.utcnow() - start_time).total_seconds() * 1000 # Create metadata metadata = { 'workflow_version': '1.0.0', 'processing_time_ms': processing_time_ms, 'timestamp': datetime.utcnow().isoformat(), 'user_id': user_id, 'capabilities_used': [ 'cap_text_classification' if self.text_classifier else None, 'cap_fraud_detection' if self.fraud_detector else None ], 'compliance_flags': { 'gdpr_compliant': True, 'ifrs17_compliant': True, 'aml_compliant': True, 'fully_auditable': True } } # Create result result = ClaimProcessingResult( claim_id=claim_id, classification={ 'predicted_class': classification_result.predicted_class, 'confidence': classification_result.confidence, 'audit_id': classification_result.audit_id }, fraud_detection={ 'fraud_score': fraud_result.fraud_score, 'risk_level': fraud_result.risk_level, 'recommendation': fraud_result.recommendation, 'audit_id': fraud_result.audit_id }, final_decision=final_decision, decision_reason=decision_reason, processing_time_ms=processing_time_ms, audit_trail=audit_trail, metadata=metadata ) print(f"Total Processing Time: {processing_time_ms:.2f}ms") print("=" * 70) print() return result def _make_decision( self, classification_result, fraud_result, claim_data: Dict[str, Any] ) -> tuple: """ Make final decision based on classification and fraud detection Returns: Tuple of (decision, reason) """ # Decision logic based on fraud risk and classification confidence # Critical fraud risk -> Reject if fraud_result.risk_level == 'critical': return 'reject', 'Critical fraud risk detected' # High fraud risk -> Escalate if fraud_result.risk_level == 'high': return 'escalate', 'High fraud risk requires investigation' # Medium fraud risk -> Review if fraud_result.risk_level == 'medium': return 'review', 'Medium fraud risk requires manual review' # Low classification confidence -> Review if classification_result.confidence < 0.7: return 'review', 'Low classification confidence' # High claim amount -> Review claim_amount = claim_data.get('claim_amount', 0) if claim_amount > 50000: return 'review', 'High claim amount requires review' # All checks passed -> Approve return 'approve', 'All automated checks passed' def batch_process_claims( self, claims: List[Dict[str, Any]], user_id: str = "system" ) -> List[ClaimProcessingResult]: """ Process multiple claims in batch Args: claims: List of claim data dictionaries user_id: User processing the claims Returns: List of ClaimProcessingResult objects """ print(f"Batch Processing {len(claims)} Claims") print("=" * 70) print() results = [] for i, claim in enumerate(claims, 1): print(f"[{i}/{len(claims)}] ", end="") result = self.process_claim(claim, user_id) results.append(result) # Summary statistics print("Batch Processing Summary") print("=" * 70) print(f"Total Claims Processed: {len(results)}") print(f"Approved: {sum(1 for r in results if r.final_decision == 'approve')}") print(f"Review Required: {sum(1 for r in results if r.final_decision == 'review')}") print(f"Escalated: {sum(1 for r in results if r.final_decision == 'escalate')}") print(f"Rejected: {sum(1 for r in results if r.final_decision == 'reject')}") print(f"Average Processing Time: {sum(r.processing_time_ms for r in results) / len(results):.2f}ms") print("=" * 70) print() return results def main(): """Example usage of integrated claims processing workflow""" print("=" * 70) print("BDR Agent Factory - Integration Example") print("Complete Claims Processing Workflow") print("=" * 70) print() # Initialize workflow workflow = ClaimsProcessingWorkflow() # Example 1: Simple auto accident claim (should approve) print("\n" + "#" * 70) print("# Example 1: Simple Auto Accident Claim") print("#" * 70 + "\n") claim_1 = { 'claim_id': 'CLM-2026-001', 'description': 'Minor rear-end collision in parking lot. Other driver admitted fault. No injuries.', 'claim_amount': 2500, 'claim_date': '2026-01-03T10:30:00Z', 'policy_start_date': '2023-01-01T00:00:00Z', 'claimant_history': { 'previous_claims': 0, 'years_as_customer': 3 }, 'witnesses': 2, 'third_party_involved': True } result_1 = workflow.process_claim(claim_1, user_id="adjuster_001") print("Result Summary:") print(f" Claim Type: {result_1.classification['predicted_class']}") print(f" Fraud Risk: {result_1.fraud_detection['risk_level']}") print(f" Decision: {result_1.final_decision.upper()}") print() # Example 2: Suspicious high-value claim (should escalate) print("\n" + "#" * 70) print("# Example 2: Suspicious High-Value Claim") print("#" * 70 + "\n") claim_2 = { 'claim_id': 'CLM-2026-002', 'description': 'Total loss fire damage to property', 'claim_amount': 150000, 'claim_date': '2026-01-03T23:45:00Z', 'policy_start_date': '2025-12-28T00:00:00Z', # Very recent policy 'claimant_history': { 'previous_claims': 4, 'years_as_customer': 1 }, 'witnesses': 0, 'third_party_involved': False } result_2 = workflow.process_claim(claim_2, user_id="adjuster_002") print("Result Summary:") print(f" Claim Type: {result_2.classification['predicted_class']}") print(f" Fraud Risk: {result_2.fraud_detection['risk_level']}") print(f" Decision: {result_2.final_decision.upper()}") print() # Example 3: Medium-risk claim (should review) print("\n" + "#" * 70) print("# Example 3: Medium-Risk Health Claim") print("#" * 70 + "\n") claim_3 = { 'claim_id': 'CLM-2026-003', 'description': 'Medical treatment for back injury. Multiple doctor visits and physical therapy.', 'claim_amount': 18000, 'claim_date': '2026-01-03T14:00:00Z', 'policy_start_date': '2024-06-01T00:00:00Z', 'claimant_history': { 'previous_claims': 2, 'years_as_customer': 2 }, 'witnesses': 1, 'third_party_involved': True } result_3 = workflow.process_claim(claim_3, user_id="adjuster_003") print("Result Summary:") print(f" Claim Type: {result_3.classification['predicted_class']}") print(f" Fraud Risk: {result_3.fraud_detection['risk_level']}") print(f" Decision: {result_3.final_decision.upper()}") print() # Example 4: Batch processing print("\n" + "#" * 70) print("# Example 4: Batch Processing") print("#" * 70 + "\n") batch_claims = [ { 'claim_id': 'CLM-2026-101', 'description': 'Water damage to basement after storm', 'claim_amount': 5000, 'claim_date': '2026-01-03T09:00:00Z', 'policy_start_date': '2022-01-01T00:00:00Z', 'claimant_history': {'previous_claims': 0, 'years_as_customer': 4}, 'witnesses': 0, 'third_party_involved': False }, { 'claim_id': 'CLM-2026-102', 'description': 'Car accident on highway, airbags deployed', 'claim_amount': 12000, 'claim_date': '2026-01-03T15:30:00Z', 'policy_start_date': '2021-06-01T00:00:00Z', 'claimant_history': {'previous_claims': 1, 'years_as_customer': 5}, 'witnesses': 3, 'third_party_involved': True }, { 'claim_id': 'CLM-2026-103', 'description': 'Slip and fall at retail store', 'claim_amount': 8000, 'claim_date': '2026-01-03T11:00:00Z', 'policy_start_date': '2023-03-01T00:00:00Z', 'claimant_history': {'previous_claims': 0, 'years_as_customer': 3}, 'witnesses': 2, 'third_party_involved': True } ] batch_results = workflow.batch_process_claims(batch_claims, user_id="batch_processor") # Example 5: Export results as JSON print("\n" + "#" * 70) print("# Example 5: JSON Export") print("#" * 70 + "\n") print("Exporting result to JSON...") result_json = json.dumps(result_1.to_dict(), indent=2) print(result_json[:800] + "...\n}") print() # Example 6: Audit trail analysis print("\n" + "#" * 70) print("# Example 6: Audit Trail Analysis") print("#" * 70 + "\n") print(f"Claim {result_2.claim_id} - Audit Trail:") print("-" * 70) for i, step in enumerate(result_2.audit_trail, 1): print(f"{i}. {step['step'].upper()}") print(f" Timestamp: {step['timestamp']}") if 'capability_id' in step: print(f" Capability: {step['capability_id']}") if 'audit_id' in step: print(f" Audit ID: {step['audit_id']}") print() print("=" * 70) print("Integration examples completed successfully!") print("=" * 70) if __name__ == "__main__": main()