BDR-Agent-Factory / examples /integration_example.py
Bader Alabddan
Add comprehensive documentation and implementation framework
3ef5d3c
#!/usr/bin/env python3
"""
Integration Example - BDR Agent Factory
This example demonstrates how to integrate multiple AI capabilities
to build a complete insurance claims processing workflow.
Workflow:
1. Text Classification - Categorize claim type
2. Fraud Detection - Assess fraud risk
3. Decision Making - Approve, review, or reject claim
4. Audit Trail - Track entire process
"""
import json
from datetime import datetime
from typing import Dict, Any, List
from dataclasses import dataclass, asdict
# Import capability implementations
try:
from text_classification_example import TextClassificationCapability
from fraud_detection_example import FraudDetectionCapability
except ImportError:
print("Warning: Could not import capability examples. Make sure they are in the same directory.")
TextClassificationCapability = None
FraudDetectionCapability = None
@dataclass
class ClaimProcessingResult:
"""Result of complete claim processing workflow"""
claim_id: str
classification: Dict[str, Any]
fraud_detection: Dict[str, Any]
final_decision: str
decision_reason: str
processing_time_ms: float
audit_trail: List[Dict[str, Any]]
metadata: Dict[str, Any]
def to_dict(self):
return asdict(self)
class ClaimsProcessingWorkflow:
"""
Complete claims processing workflow integrating multiple AI capabilities
"""
def __init__(self):
"""Initialize workflow with required capabilities"""
print("Initializing Claims Processing Workflow...")
print()
# Initialize capabilities
if TextClassificationCapability:
self.text_classifier = TextClassificationCapability(enable_audit=True)
else:
self.text_classifier = None
print("Warning: Text classification not available")
if FraudDetectionCapability:
self.fraud_detector = FraudDetectionCapability(enable_audit=True)
else:
self.fraud_detector = None
print("Warning: Fraud detection not available")
print("Workflow initialized successfully!")
print()
def process_claim(
self,
claim_data: Dict[str, Any],
user_id: str = "system"
) -> ClaimProcessingResult:
"""
Process insurance claim through complete workflow
Args:
claim_data: Claim information including description and details
user_id: User processing the claim
Returns:
ClaimProcessingResult with complete processing information
"""
start_time = datetime.utcnow()
audit_trail = []
claim_id = claim_data.get('claim_id', 'UNKNOWN')
print(f"Processing Claim: {claim_id}")
print("=" * 70)
# Step 1: Classify claim type
print("Step 1: Classifying claim type...")
classification_result = None
if self.text_classifier and 'description' in claim_data:
classification_result = self.text_classifier.classify(
text=claim_data['description'],
explain=True,
audit_trail=True,
user_id=user_id
)
print(f" ✓ Classification: {classification_result.predicted_class}")
print(f" ✓ Confidence: {classification_result.confidence:.1%}")
audit_trail.append({
'step': 'classification',
'timestamp': datetime.utcnow().isoformat(),
'capability_id': 'cap_text_classification',
'result': classification_result.predicted_class,
'confidence': classification_result.confidence,
'audit_id': classification_result.audit_id
})
else:
print(" ⚠ Classification skipped (not available or no description)")
classification_result = type('obj', (object,), {
'predicted_class': claim_data.get('claim_type', 'unknown'),
'confidence': 0.5,
'all_scores': {},
'explanation': None,
'metadata': {},
'audit_id': None
})
print()
# Step 2: Detect fraud
print("Step 2: Analyzing fraud risk...")
fraud_result = None
if self.fraud_detector:
# Prepare claim data for fraud detection
fraud_claim_data = {
'claim_id': claim_id,
'claim_type': classification_result.predicted_class,
'claim_amount': claim_data.get('claim_amount', 0),
'claim_date': claim_data.get('claim_date', datetime.utcnow().isoformat()),
'policy_start_date': claim_data.get('policy_start_date'),
'claimant_history': claim_data.get('claimant_history', {}),
'incident_details': claim_data.get('description', ''),
'witnesses': claim_data.get('witnesses', 0),
'third_party_involved': claim_data.get('third_party_involved', False)
}
fraud_result = self.fraud_detector.detect(
claim_data=fraud_claim_data,
explain=True,
audit_trail=True,
user_id=user_id
)
print(f" ✓ Fraud Score: {fraud_result.fraud_score:.1%}")
print(f" ✓ Risk Level: {fraud_result.risk_level.upper()}")
print(f" ✓ Recommendation: {fraud_result.recommendation.upper()}")
audit_trail.append({
'step': 'fraud_detection',
'timestamp': datetime.utcnow().isoformat(),
'capability_id': 'cap_fraud_detection',
'fraud_score': fraud_result.fraud_score,
'risk_level': fraud_result.risk_level,
'recommendation': fraud_result.recommendation,
'audit_id': fraud_result.audit_id
})
else:
print(" ⚠ Fraud detection skipped (not available)")
fraud_result = type('obj', (object,), {
'fraud_score': 0.0,
'risk_level': 'low',
'risk_factors': [],
'recommendation': 'approve',
'explanation': None,
'metadata': {},
'audit_id': None
})
print()
# Step 3: Make final decision
print("Step 3: Making final decision...")
final_decision, decision_reason = self._make_decision(
classification_result,
fraud_result,
claim_data
)
print(f" ✓ Final Decision: {final_decision.upper()}")
print(f" ✓ Reason: {decision_reason}")
audit_trail.append({
'step': 'final_decision',
'timestamp': datetime.utcnow().isoformat(),
'decision': final_decision,
'reason': decision_reason
})
print()
# Calculate total processing time
processing_time_ms = (datetime.utcnow() - start_time).total_seconds() * 1000
# Create metadata
metadata = {
'workflow_version': '1.0.0',
'processing_time_ms': processing_time_ms,
'timestamp': datetime.utcnow().isoformat(),
'user_id': user_id,
'capabilities_used': [
'cap_text_classification' if self.text_classifier else None,
'cap_fraud_detection' if self.fraud_detector else None
],
'compliance_flags': {
'gdpr_compliant': True,
'ifrs17_compliant': True,
'aml_compliant': True,
'fully_auditable': True
}
}
# Create result
result = ClaimProcessingResult(
claim_id=claim_id,
classification={
'predicted_class': classification_result.predicted_class,
'confidence': classification_result.confidence,
'audit_id': classification_result.audit_id
},
fraud_detection={
'fraud_score': fraud_result.fraud_score,
'risk_level': fraud_result.risk_level,
'recommendation': fraud_result.recommendation,
'audit_id': fraud_result.audit_id
},
final_decision=final_decision,
decision_reason=decision_reason,
processing_time_ms=processing_time_ms,
audit_trail=audit_trail,
metadata=metadata
)
print(f"Total Processing Time: {processing_time_ms:.2f}ms")
print("=" * 70)
print()
return result
def _make_decision(
self,
classification_result,
fraud_result,
claim_data: Dict[str, Any]
) -> tuple:
"""
Make final decision based on classification and fraud detection
Returns:
Tuple of (decision, reason)
"""
# Decision logic based on fraud risk and classification confidence
# Critical fraud risk -> Reject
if fraud_result.risk_level == 'critical':
return 'reject', 'Critical fraud risk detected'
# High fraud risk -> Escalate
if fraud_result.risk_level == 'high':
return 'escalate', 'High fraud risk requires investigation'
# Medium fraud risk -> Review
if fraud_result.risk_level == 'medium':
return 'review', 'Medium fraud risk requires manual review'
# Low classification confidence -> Review
if classification_result.confidence < 0.7:
return 'review', 'Low classification confidence'
# High claim amount -> Review
claim_amount = claim_data.get('claim_amount', 0)
if claim_amount > 50000:
return 'review', 'High claim amount requires review'
# All checks passed -> Approve
return 'approve', 'All automated checks passed'
def batch_process_claims(
self,
claims: List[Dict[str, Any]],
user_id: str = "system"
) -> List[ClaimProcessingResult]:
"""
Process multiple claims in batch
Args:
claims: List of claim data dictionaries
user_id: User processing the claims
Returns:
List of ClaimProcessingResult objects
"""
print(f"Batch Processing {len(claims)} Claims")
print("=" * 70)
print()
results = []
for i, claim in enumerate(claims, 1):
print(f"[{i}/{len(claims)}] ", end="")
result = self.process_claim(claim, user_id)
results.append(result)
# Summary statistics
print("Batch Processing Summary")
print("=" * 70)
print(f"Total Claims Processed: {len(results)}")
print(f"Approved: {sum(1 for r in results if r.final_decision == 'approve')}")
print(f"Review Required: {sum(1 for r in results if r.final_decision == 'review')}")
print(f"Escalated: {sum(1 for r in results if r.final_decision == 'escalate')}")
print(f"Rejected: {sum(1 for r in results if r.final_decision == 'reject')}")
print(f"Average Processing Time: {sum(r.processing_time_ms for r in results) / len(results):.2f}ms")
print("=" * 70)
print()
return results
def main():
"""Example usage of integrated claims processing workflow"""
print("=" * 70)
print("BDR Agent Factory - Integration Example")
print("Complete Claims Processing Workflow")
print("=" * 70)
print()
# Initialize workflow
workflow = ClaimsProcessingWorkflow()
# Example 1: Simple auto accident claim (should approve)
print("\n" + "#" * 70)
print("# Example 1: Simple Auto Accident Claim")
print("#" * 70 + "\n")
claim_1 = {
'claim_id': 'CLM-2026-001',
'description': 'Minor rear-end collision in parking lot. Other driver admitted fault. No injuries.',
'claim_amount': 2500,
'claim_date': '2026-01-03T10:30:00Z',
'policy_start_date': '2023-01-01T00:00:00Z',
'claimant_history': {
'previous_claims': 0,
'years_as_customer': 3
},
'witnesses': 2,
'third_party_involved': True
}
result_1 = workflow.process_claim(claim_1, user_id="adjuster_001")
print("Result Summary:")
print(f" Claim Type: {result_1.classification['predicted_class']}")
print(f" Fraud Risk: {result_1.fraud_detection['risk_level']}")
print(f" Decision: {result_1.final_decision.upper()}")
print()
# Example 2: Suspicious high-value claim (should escalate)
print("\n" + "#" * 70)
print("# Example 2: Suspicious High-Value Claim")
print("#" * 70 + "\n")
claim_2 = {
'claim_id': 'CLM-2026-002',
'description': 'Total loss fire damage to property',
'claim_amount': 150000,
'claim_date': '2026-01-03T23:45:00Z',
'policy_start_date': '2025-12-28T00:00:00Z', # Very recent policy
'claimant_history': {
'previous_claims': 4,
'years_as_customer': 1
},
'witnesses': 0,
'third_party_involved': False
}
result_2 = workflow.process_claim(claim_2, user_id="adjuster_002")
print("Result Summary:")
print(f" Claim Type: {result_2.classification['predicted_class']}")
print(f" Fraud Risk: {result_2.fraud_detection['risk_level']}")
print(f" Decision: {result_2.final_decision.upper()}")
print()
# Example 3: Medium-risk claim (should review)
print("\n" + "#" * 70)
print("# Example 3: Medium-Risk Health Claim")
print("#" * 70 + "\n")
claim_3 = {
'claim_id': 'CLM-2026-003',
'description': 'Medical treatment for back injury. Multiple doctor visits and physical therapy.',
'claim_amount': 18000,
'claim_date': '2026-01-03T14:00:00Z',
'policy_start_date': '2024-06-01T00:00:00Z',
'claimant_history': {
'previous_claims': 2,
'years_as_customer': 2
},
'witnesses': 1,
'third_party_involved': True
}
result_3 = workflow.process_claim(claim_3, user_id="adjuster_003")
print("Result Summary:")
print(f" Claim Type: {result_3.classification['predicted_class']}")
print(f" Fraud Risk: {result_3.fraud_detection['risk_level']}")
print(f" Decision: {result_3.final_decision.upper()}")
print()
# Example 4: Batch processing
print("\n" + "#" * 70)
print("# Example 4: Batch Processing")
print("#" * 70 + "\n")
batch_claims = [
{
'claim_id': 'CLM-2026-101',
'description': 'Water damage to basement after storm',
'claim_amount': 5000,
'claim_date': '2026-01-03T09:00:00Z',
'policy_start_date': '2022-01-01T00:00:00Z',
'claimant_history': {'previous_claims': 0, 'years_as_customer': 4},
'witnesses': 0,
'third_party_involved': False
},
{
'claim_id': 'CLM-2026-102',
'description': 'Car accident on highway, airbags deployed',
'claim_amount': 12000,
'claim_date': '2026-01-03T15:30:00Z',
'policy_start_date': '2021-06-01T00:00:00Z',
'claimant_history': {'previous_claims': 1, 'years_as_customer': 5},
'witnesses': 3,
'third_party_involved': True
},
{
'claim_id': 'CLM-2026-103',
'description': 'Slip and fall at retail store',
'claim_amount': 8000,
'claim_date': '2026-01-03T11:00:00Z',
'policy_start_date': '2023-03-01T00:00:00Z',
'claimant_history': {'previous_claims': 0, 'years_as_customer': 3},
'witnesses': 2,
'third_party_involved': True
}
]
batch_results = workflow.batch_process_claims(batch_claims, user_id="batch_processor")
# Example 5: Export results as JSON
print("\n" + "#" * 70)
print("# Example 5: JSON Export")
print("#" * 70 + "\n")
print("Exporting result to JSON...")
result_json = json.dumps(result_1.to_dict(), indent=2)
print(result_json[:800] + "...\n}")
print()
# Example 6: Audit trail analysis
print("\n" + "#" * 70)
print("# Example 6: Audit Trail Analysis")
print("#" * 70 + "\n")
print(f"Claim {result_2.claim_id} - Audit Trail:")
print("-" * 70)
for i, step in enumerate(result_2.audit_trail, 1):
print(f"{i}. {step['step'].upper()}")
print(f" Timestamp: {step['timestamp']}")
if 'capability_id' in step:
print(f" Capability: {step['capability_id']}")
if 'audit_id' in step:
print(f" Audit ID: {step['audit_id']}")
print()
print("=" * 70)
print("Integration examples completed successfully!")
print("=" * 70)
if __name__ == "__main__":
main()