Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Integration Example - BDR Agent Factory | |
| This example demonstrates how to integrate multiple AI capabilities | |
| to build a complete insurance claims processing workflow. | |
| Workflow: | |
| 1. Text Classification - Categorize claim type | |
| 2. Fraud Detection - Assess fraud risk | |
| 3. Decision Making - Approve, review, or reject claim | |
| 4. Audit Trail - Track entire process | |
| """ | |
| import json | |
| from datetime import datetime | |
| from typing import Dict, Any, List | |
| from dataclasses import dataclass, asdict | |
| # Import capability implementations | |
| try: | |
| from text_classification_example import TextClassificationCapability | |
| from fraud_detection_example import FraudDetectionCapability | |
| except ImportError: | |
| print("Warning: Could not import capability examples. Make sure they are in the same directory.") | |
| TextClassificationCapability = None | |
| FraudDetectionCapability = None | |
| class ClaimProcessingResult: | |
| """Result of complete claim processing workflow""" | |
| claim_id: str | |
| classification: Dict[str, Any] | |
| fraud_detection: Dict[str, Any] | |
| final_decision: str | |
| decision_reason: str | |
| processing_time_ms: float | |
| audit_trail: List[Dict[str, Any]] | |
| metadata: Dict[str, Any] | |
| def to_dict(self): | |
| return asdict(self) | |
| class ClaimsProcessingWorkflow: | |
| """ | |
| Complete claims processing workflow integrating multiple AI capabilities | |
| """ | |
| def __init__(self): | |
| """Initialize workflow with required capabilities""" | |
| print("Initializing Claims Processing Workflow...") | |
| print() | |
| # Initialize capabilities | |
| if TextClassificationCapability: | |
| self.text_classifier = TextClassificationCapability(enable_audit=True) | |
| else: | |
| self.text_classifier = None | |
| print("Warning: Text classification not available") | |
| if FraudDetectionCapability: | |
| self.fraud_detector = FraudDetectionCapability(enable_audit=True) | |
| else: | |
| self.fraud_detector = None | |
| print("Warning: Fraud detection not available") | |
| print("Workflow initialized successfully!") | |
| print() | |
| def process_claim( | |
| self, | |
| claim_data: Dict[str, Any], | |
| user_id: str = "system" | |
| ) -> ClaimProcessingResult: | |
| """ | |
| Process insurance claim through complete workflow | |
| Args: | |
| claim_data: Claim information including description and details | |
| user_id: User processing the claim | |
| Returns: | |
| ClaimProcessingResult with complete processing information | |
| """ | |
| start_time = datetime.utcnow() | |
| audit_trail = [] | |
| claim_id = claim_data.get('claim_id', 'UNKNOWN') | |
| print(f"Processing Claim: {claim_id}") | |
| print("=" * 70) | |
| # Step 1: Classify claim type | |
| print("Step 1: Classifying claim type...") | |
| classification_result = None | |
| if self.text_classifier and 'description' in claim_data: | |
| classification_result = self.text_classifier.classify( | |
| text=claim_data['description'], | |
| explain=True, | |
| audit_trail=True, | |
| user_id=user_id | |
| ) | |
| print(f" ✓ Classification: {classification_result.predicted_class}") | |
| print(f" ✓ Confidence: {classification_result.confidence:.1%}") | |
| audit_trail.append({ | |
| 'step': 'classification', | |
| 'timestamp': datetime.utcnow().isoformat(), | |
| 'capability_id': 'cap_text_classification', | |
| 'result': classification_result.predicted_class, | |
| 'confidence': classification_result.confidence, | |
| 'audit_id': classification_result.audit_id | |
| }) | |
| else: | |
| print(" ⚠ Classification skipped (not available or no description)") | |
| classification_result = type('obj', (object,), { | |
| 'predicted_class': claim_data.get('claim_type', 'unknown'), | |
| 'confidence': 0.5, | |
| 'all_scores': {}, | |
| 'explanation': None, | |
| 'metadata': {}, | |
| 'audit_id': None | |
| }) | |
| print() | |
| # Step 2: Detect fraud | |
| print("Step 2: Analyzing fraud risk...") | |
| fraud_result = None | |
| if self.fraud_detector: | |
| # Prepare claim data for fraud detection | |
| fraud_claim_data = { | |
| 'claim_id': claim_id, | |
| 'claim_type': classification_result.predicted_class, | |
| 'claim_amount': claim_data.get('claim_amount', 0), | |
| 'claim_date': claim_data.get('claim_date', datetime.utcnow().isoformat()), | |
| 'policy_start_date': claim_data.get('policy_start_date'), | |
| 'claimant_history': claim_data.get('claimant_history', {}), | |
| 'incident_details': claim_data.get('description', ''), | |
| 'witnesses': claim_data.get('witnesses', 0), | |
| 'third_party_involved': claim_data.get('third_party_involved', False) | |
| } | |
| fraud_result = self.fraud_detector.detect( | |
| claim_data=fraud_claim_data, | |
| explain=True, | |
| audit_trail=True, | |
| user_id=user_id | |
| ) | |
| print(f" ✓ Fraud Score: {fraud_result.fraud_score:.1%}") | |
| print(f" ✓ Risk Level: {fraud_result.risk_level.upper()}") | |
| print(f" ✓ Recommendation: {fraud_result.recommendation.upper()}") | |
| audit_trail.append({ | |
| 'step': 'fraud_detection', | |
| 'timestamp': datetime.utcnow().isoformat(), | |
| 'capability_id': 'cap_fraud_detection', | |
| 'fraud_score': fraud_result.fraud_score, | |
| 'risk_level': fraud_result.risk_level, | |
| 'recommendation': fraud_result.recommendation, | |
| 'audit_id': fraud_result.audit_id | |
| }) | |
| else: | |
| print(" ⚠ Fraud detection skipped (not available)") | |
| fraud_result = type('obj', (object,), { | |
| 'fraud_score': 0.0, | |
| 'risk_level': 'low', | |
| 'risk_factors': [], | |
| 'recommendation': 'approve', | |
| 'explanation': None, | |
| 'metadata': {}, | |
| 'audit_id': None | |
| }) | |
| print() | |
| # Step 3: Make final decision | |
| print("Step 3: Making final decision...") | |
| final_decision, decision_reason = self._make_decision( | |
| classification_result, | |
| fraud_result, | |
| claim_data | |
| ) | |
| print(f" ✓ Final Decision: {final_decision.upper()}") | |
| print(f" ✓ Reason: {decision_reason}") | |
| audit_trail.append({ | |
| 'step': 'final_decision', | |
| 'timestamp': datetime.utcnow().isoformat(), | |
| 'decision': final_decision, | |
| 'reason': decision_reason | |
| }) | |
| print() | |
| # Calculate total processing time | |
| processing_time_ms = (datetime.utcnow() - start_time).total_seconds() * 1000 | |
| # Create metadata | |
| metadata = { | |
| 'workflow_version': '1.0.0', | |
| 'processing_time_ms': processing_time_ms, | |
| 'timestamp': datetime.utcnow().isoformat(), | |
| 'user_id': user_id, | |
| 'capabilities_used': [ | |
| 'cap_text_classification' if self.text_classifier else None, | |
| 'cap_fraud_detection' if self.fraud_detector else None | |
| ], | |
| 'compliance_flags': { | |
| 'gdpr_compliant': True, | |
| 'ifrs17_compliant': True, | |
| 'aml_compliant': True, | |
| 'fully_auditable': True | |
| } | |
| } | |
| # Create result | |
| result = ClaimProcessingResult( | |
| claim_id=claim_id, | |
| classification={ | |
| 'predicted_class': classification_result.predicted_class, | |
| 'confidence': classification_result.confidence, | |
| 'audit_id': classification_result.audit_id | |
| }, | |
| fraud_detection={ | |
| 'fraud_score': fraud_result.fraud_score, | |
| 'risk_level': fraud_result.risk_level, | |
| 'recommendation': fraud_result.recommendation, | |
| 'audit_id': fraud_result.audit_id | |
| }, | |
| final_decision=final_decision, | |
| decision_reason=decision_reason, | |
| processing_time_ms=processing_time_ms, | |
| audit_trail=audit_trail, | |
| metadata=metadata | |
| ) | |
| print(f"Total Processing Time: {processing_time_ms:.2f}ms") | |
| print("=" * 70) | |
| print() | |
| return result | |
| def _make_decision( | |
| self, | |
| classification_result, | |
| fraud_result, | |
| claim_data: Dict[str, Any] | |
| ) -> tuple: | |
| """ | |
| Make final decision based on classification and fraud detection | |
| Returns: | |
| Tuple of (decision, reason) | |
| """ | |
| # Decision logic based on fraud risk and classification confidence | |
| # Critical fraud risk -> Reject | |
| if fraud_result.risk_level == 'critical': | |
| return 'reject', 'Critical fraud risk detected' | |
| # High fraud risk -> Escalate | |
| if fraud_result.risk_level == 'high': | |
| return 'escalate', 'High fraud risk requires investigation' | |
| # Medium fraud risk -> Review | |
| if fraud_result.risk_level == 'medium': | |
| return 'review', 'Medium fraud risk requires manual review' | |
| # Low classification confidence -> Review | |
| if classification_result.confidence < 0.7: | |
| return 'review', 'Low classification confidence' | |
| # High claim amount -> Review | |
| claim_amount = claim_data.get('claim_amount', 0) | |
| if claim_amount > 50000: | |
| return 'review', 'High claim amount requires review' | |
| # All checks passed -> Approve | |
| return 'approve', 'All automated checks passed' | |
| def batch_process_claims( | |
| self, | |
| claims: List[Dict[str, Any]], | |
| user_id: str = "system" | |
| ) -> List[ClaimProcessingResult]: | |
| """ | |
| Process multiple claims in batch | |
| Args: | |
| claims: List of claim data dictionaries | |
| user_id: User processing the claims | |
| Returns: | |
| List of ClaimProcessingResult objects | |
| """ | |
| print(f"Batch Processing {len(claims)} Claims") | |
| print("=" * 70) | |
| print() | |
| results = [] | |
| for i, claim in enumerate(claims, 1): | |
| print(f"[{i}/{len(claims)}] ", end="") | |
| result = self.process_claim(claim, user_id) | |
| results.append(result) | |
| # Summary statistics | |
| print("Batch Processing Summary") | |
| print("=" * 70) | |
| print(f"Total Claims Processed: {len(results)}") | |
| print(f"Approved: {sum(1 for r in results if r.final_decision == 'approve')}") | |
| print(f"Review Required: {sum(1 for r in results if r.final_decision == 'review')}") | |
| print(f"Escalated: {sum(1 for r in results if r.final_decision == 'escalate')}") | |
| print(f"Rejected: {sum(1 for r in results if r.final_decision == 'reject')}") | |
| print(f"Average Processing Time: {sum(r.processing_time_ms for r in results) / len(results):.2f}ms") | |
| print("=" * 70) | |
| print() | |
| return results | |
| def main(): | |
| """Example usage of integrated claims processing workflow""" | |
| print("=" * 70) | |
| print("BDR Agent Factory - Integration Example") | |
| print("Complete Claims Processing Workflow") | |
| print("=" * 70) | |
| print() | |
| # Initialize workflow | |
| workflow = ClaimsProcessingWorkflow() | |
| # Example 1: Simple auto accident claim (should approve) | |
| print("\n" + "#" * 70) | |
| print("# Example 1: Simple Auto Accident Claim") | |
| print("#" * 70 + "\n") | |
| claim_1 = { | |
| 'claim_id': 'CLM-2026-001', | |
| 'description': 'Minor rear-end collision in parking lot. Other driver admitted fault. No injuries.', | |
| 'claim_amount': 2500, | |
| 'claim_date': '2026-01-03T10:30:00Z', | |
| 'policy_start_date': '2023-01-01T00:00:00Z', | |
| 'claimant_history': { | |
| 'previous_claims': 0, | |
| 'years_as_customer': 3 | |
| }, | |
| 'witnesses': 2, | |
| 'third_party_involved': True | |
| } | |
| result_1 = workflow.process_claim(claim_1, user_id="adjuster_001") | |
| print("Result Summary:") | |
| print(f" Claim Type: {result_1.classification['predicted_class']}") | |
| print(f" Fraud Risk: {result_1.fraud_detection['risk_level']}") | |
| print(f" Decision: {result_1.final_decision.upper()}") | |
| print() | |
| # Example 2: Suspicious high-value claim (should escalate) | |
| print("\n" + "#" * 70) | |
| print("# Example 2: Suspicious High-Value Claim") | |
| print("#" * 70 + "\n") | |
| claim_2 = { | |
| 'claim_id': 'CLM-2026-002', | |
| 'description': 'Total loss fire damage to property', | |
| 'claim_amount': 150000, | |
| 'claim_date': '2026-01-03T23:45:00Z', | |
| 'policy_start_date': '2025-12-28T00:00:00Z', # Very recent policy | |
| 'claimant_history': { | |
| 'previous_claims': 4, | |
| 'years_as_customer': 1 | |
| }, | |
| 'witnesses': 0, | |
| 'third_party_involved': False | |
| } | |
| result_2 = workflow.process_claim(claim_2, user_id="adjuster_002") | |
| print("Result Summary:") | |
| print(f" Claim Type: {result_2.classification['predicted_class']}") | |
| print(f" Fraud Risk: {result_2.fraud_detection['risk_level']}") | |
| print(f" Decision: {result_2.final_decision.upper()}") | |
| print() | |
| # Example 3: Medium-risk claim (should review) | |
| print("\n" + "#" * 70) | |
| print("# Example 3: Medium-Risk Health Claim") | |
| print("#" * 70 + "\n") | |
| claim_3 = { | |
| 'claim_id': 'CLM-2026-003', | |
| 'description': 'Medical treatment for back injury. Multiple doctor visits and physical therapy.', | |
| 'claim_amount': 18000, | |
| 'claim_date': '2026-01-03T14:00:00Z', | |
| 'policy_start_date': '2024-06-01T00:00:00Z', | |
| 'claimant_history': { | |
| 'previous_claims': 2, | |
| 'years_as_customer': 2 | |
| }, | |
| 'witnesses': 1, | |
| 'third_party_involved': True | |
| } | |
| result_3 = workflow.process_claim(claim_3, user_id="adjuster_003") | |
| print("Result Summary:") | |
| print(f" Claim Type: {result_3.classification['predicted_class']}") | |
| print(f" Fraud Risk: {result_3.fraud_detection['risk_level']}") | |
| print(f" Decision: {result_3.final_decision.upper()}") | |
| print() | |
| # Example 4: Batch processing | |
| print("\n" + "#" * 70) | |
| print("# Example 4: Batch Processing") | |
| print("#" * 70 + "\n") | |
| batch_claims = [ | |
| { | |
| 'claim_id': 'CLM-2026-101', | |
| 'description': 'Water damage to basement after storm', | |
| 'claim_amount': 5000, | |
| 'claim_date': '2026-01-03T09:00:00Z', | |
| 'policy_start_date': '2022-01-01T00:00:00Z', | |
| 'claimant_history': {'previous_claims': 0, 'years_as_customer': 4}, | |
| 'witnesses': 0, | |
| 'third_party_involved': False | |
| }, | |
| { | |
| 'claim_id': 'CLM-2026-102', | |
| 'description': 'Car accident on highway, airbags deployed', | |
| 'claim_amount': 12000, | |
| 'claim_date': '2026-01-03T15:30:00Z', | |
| 'policy_start_date': '2021-06-01T00:00:00Z', | |
| 'claimant_history': {'previous_claims': 1, 'years_as_customer': 5}, | |
| 'witnesses': 3, | |
| 'third_party_involved': True | |
| }, | |
| { | |
| 'claim_id': 'CLM-2026-103', | |
| 'description': 'Slip and fall at retail store', | |
| 'claim_amount': 8000, | |
| 'claim_date': '2026-01-03T11:00:00Z', | |
| 'policy_start_date': '2023-03-01T00:00:00Z', | |
| 'claimant_history': {'previous_claims': 0, 'years_as_customer': 3}, | |
| 'witnesses': 2, | |
| 'third_party_involved': True | |
| } | |
| ] | |
| batch_results = workflow.batch_process_claims(batch_claims, user_id="batch_processor") | |
| # Example 5: Export results as JSON | |
| print("\n" + "#" * 70) | |
| print("# Example 5: JSON Export") | |
| print("#" * 70 + "\n") | |
| print("Exporting result to JSON...") | |
| result_json = json.dumps(result_1.to_dict(), indent=2) | |
| print(result_json[:800] + "...\n}") | |
| print() | |
| # Example 6: Audit trail analysis | |
| print("\n" + "#" * 70) | |
| print("# Example 6: Audit Trail Analysis") | |
| print("#" * 70 + "\n") | |
| print(f"Claim {result_2.claim_id} - Audit Trail:") | |
| print("-" * 70) | |
| for i, step in enumerate(result_2.audit_trail, 1): | |
| print(f"{i}. {step['step'].upper()}") | |
| print(f" Timestamp: {step['timestamp']}") | |
| if 'capability_id' in step: | |
| print(f" Capability: {step['capability_id']}") | |
| if 'audit_id' in step: | |
| print(f" Audit ID: {step['audit_id']}") | |
| print() | |
| print("=" * 70) | |
| print("Integration examples completed successfully!") | |
| print("=" * 70) | |
| if __name__ == "__main__": | |
| main() | |