#!/usr/bin/env python3 """ Test Cases for BDR Agent Factory Examples Comprehensive test suite for text classification, fraud detection, and integration examples. """ import pytest import json from datetime import datetime # Import example implementations try: from text_classification_example import TextClassificationCapability, ClassificationResult from fraud_detection_example import FraudDetectionCapability, FraudDetectionResult from integration_example import ClaimsProcessingWorkflow, ClaimProcessingResult EXAMPLES_AVAILABLE = True except ImportError: EXAMPLES_AVAILABLE = False pytest.skip("Example implementations not available", allow_module_level=True) class TestTextClassification: """Test cases for text classification capability""" @pytest.fixture def classifier(self): """Create text classifier instance""" return TextClassificationCapability(enable_audit=True) def test_initialization(self, classifier): """Test classifier initialization""" assert classifier is not None assert classifier.CAPABILITY_ID == "cap_text_classification" assert classifier.VERSION == "2.1.0" assert classifier.enable_audit is True def test_property_damage_classification(self, classifier): """Test classification of property damage claim""" text = "Water damage to basement after heavy rain and flooding" result = classifier.classify(text=text, explain=True) assert isinstance(result, ClassificationResult) assert result.predicted_class == "property_damage" assert result.confidence > 0.7 assert result.explanation is not None assert result.audit_id is not None def test_auto_accident_classification(self, classifier): """Test classification of auto accident claim""" text = "Rear-end collision on highway during rush hour traffic" result = classifier.classify(text=text, explain=True) assert result.predicted_class == "auto_accident" assert result.confidence > 0.7 assert 'collision' in text.lower() or 'accident' in text.lower() def test_health_claim_classification(self, classifier): """Test classification of health claim""" text = "Patient underwent surgery at hospital for medical treatment" result = classifier.classify(text=text, explain=True) assert result.predicted_class == "health_claim" assert result.confidence > 0.7 def test_liability_classification(self, classifier): """Test classification of liability claim""" text = "Customer slipped and fell in store, sustained injury" result = classifier.classify(text=text, explain=True) assert result.predicted_class == "liability" assert result.confidence > 0.7 def test_empty_input_validation(self, classifier): """Test that empty input raises ValueError""" with pytest.raises(ValueError, match="non-empty string"): classifier.classify(text="") def test_long_input_validation(self, classifier): """Test that excessively long input raises ValueError""" long_text = "word " * 10000 with pytest.raises(ValueError, match="maximum length"): classifier.classify(text=long_text) def test_malicious_input_validation(self, classifier): """Test that malicious input is rejected""" malicious_text = "" with pytest.raises(ValueError, match="malicious content"): classifier.classify(text=malicious_text) def test_confidence_threshold(self, classifier): """Test confidence threshold filtering""" text = "Ambiguous claim description" result = classifier.classify( text=text, confidence_threshold=0.99 # Very high threshold ) # With high threshold, uncertain claims should be marked assert result is not None def test_explanation_generation(self, classifier): """Test that explanations are generated correctly""" text = "Water damage to basement after storm" result = classifier.classify(text=text, explain=True) assert result.explanation is not None assert 'method' in result.explanation assert 'local_explanation' in result.explanation assert 'key_features' in result.explanation['local_explanation'] assert len(result.explanation['local_explanation']['key_features']) > 0 def test_audit_trail_creation(self, classifier): """Test that audit trail is created""" text = "Test claim description" result = classifier.classify( text=text, audit_trail=True, user_id="test_user" ) assert result.audit_id is not None # Retrieve audit record audit_record = classifier.get_audit_record(result.audit_id) assert audit_record is not None assert audit_record['user_id'] == 'test_user' assert audit_record['capability_id'] == 'cap_text_classification' def test_batch_classification(self, classifier): """Test batch classification""" texts = [ "Water damage to property", "Car accident on highway", "Medical treatment at hospital" ] results = classifier.batch_classify(texts=texts, explain=False) assert len(results) == 3 assert all(isinstance(r, ClassificationResult) for r in results) assert results[0].predicted_class == "property_damage" assert results[1].predicted_class == "auto_accident" assert results[2].predicted_class == "health_claim" def test_metadata_structure(self, classifier): """Test that metadata has correct structure""" text = "Test claim" result = classifier.classify(text=text) assert result.metadata is not None assert 'capability_id' in result.metadata assert 'version' in result.metadata assert 'processing_time_ms' in result.metadata assert 'timestamp' in result.metadata assert 'compliance_flags' in result.metadata def test_compliance_flags(self, classifier): """Test that compliance flags are set correctly""" text = "Test claim" result = classifier.classify(text=text, explain=True, audit_trail=True) flags = result.metadata['compliance_flags'] assert flags['explainable'] is True assert flags['auditable'] is True assert flags['gdpr_compliant'] is True assert flags['ifrs17_compliant'] is True def test_result_serialization(self, classifier): """Test that results can be serialized to JSON""" text = "Test claim" result = classifier.classify(text=text) result_dict = result.to_dict() json_str = json.dumps(result_dict) assert json_str is not None assert len(json_str) > 0 class TestFraudDetection: """Test cases for fraud detection capability""" @pytest.fixture def fraud_detector(self): """Create fraud detector instance""" return FraudDetectionCapability(enable_audit=True) def test_initialization(self, fraud_detector): """Test fraud detector initialization""" assert fraud_detector is not None assert fraud_detector.CAPABILITY_ID == "cap_fraud_detection" assert fraud_detector.VERSION == "1.5.0" def test_low_risk_claim(self, fraud_detector): """Test detection of low-risk claim""" claim_data = { 'claim_id': 'TEST-001', 'claim_amount': 2000, 'claim_type': 'auto_accident', 'claim_date': '2026-01-03T10:00:00Z', 'policy_start_date': '2023-01-01T00:00:00Z', 'claimant_history': { 'previous_claims': 0, 'years_as_customer': 3 }, 'witnesses': 2 } result = fraud_detector.detect(claim_data=claim_data, explain=True) assert isinstance(result, FraudDetectionResult) assert result.risk_level == 'low' assert result.fraud_score < 0.3 assert result.recommendation == 'approve' def test_high_risk_claim(self, fraud_detector): """Test detection of high-risk claim""" claim_data = { 'claim_id': 'TEST-002', 'claim_amount': 100000, # High amount 'claim_type': 'property_damage', 'claim_date': '2026-01-03T23:00:00Z', # Late night 'policy_start_date': '2025-12-28T00:00:00Z', # Recent policy 'claimant_history': { 'previous_claims': 5, # Frequent claims 'years_as_customer': 1 }, 'witnesses': 0, # No witnesses 'incident_details': 'Fire' # Very short description } result = fraud_detector.detect(claim_data=claim_data, explain=True) assert result.risk_level in ['high', 'critical'] assert result.fraud_score > 0.6 assert result.recommendation in ['escalate', 'reject'] assert len(result.risk_factors) > 0 def test_medium_risk_claim(self, fraud_detector): """Test detection of medium-risk claim""" claim_data = { 'claim_id': 'TEST-003', 'claim_amount': 25000, 'claim_type': 'health_claim', 'claim_date': '2026-01-03T14:00:00Z', 'policy_start_date': '2024-01-01T00:00:00Z', 'claimant_history': { 'previous_claims': 2, 'years_as_customer': 2 }, 'witnesses': 1 } result = fraud_detector.detect(claim_data=claim_data, explain=True) assert result.risk_level in ['low', 'medium'] assert result.recommendation in ['approve', 'review'] def test_missing_required_fields(self, fraud_detector): """Test that missing required fields raise ValueError""" incomplete_claim = { 'claim_id': 'TEST-004' # Missing claim_amount and claim_type } with pytest.raises(ValueError, match="Missing required field"): fraud_detector.detect(claim_data=incomplete_claim) def test_invalid_claim_amount(self, fraud_detector): """Test that invalid claim amount raises ValueError""" invalid_claim = { 'claim_id': 'TEST-005', 'claim_amount': -1000, # Negative amount 'claim_type': 'auto_accident' } with pytest.raises(ValueError, match="cannot be negative"): fraud_detector.detect(claim_data=invalid_claim) def test_risk_factor_detection(self, fraud_detector): """Test that risk factors are detected correctly""" claim_data = { 'claim_id': 'TEST-006', 'claim_amount': 75000, # Should trigger high_claim_amount 'claim_type': 'property_damage', 'claim_date': '2026-01-03T10:00:00Z', 'policy_start_date': '2025-12-20T00:00:00Z', # Should trigger recent_policy 'claimant_history': { 'previous_claims': 4, # Should trigger frequent_claims 'years_as_customer': 1 } } result = fraud_detector.detect(claim_data=claim_data, explain=True) # Check that multiple risk factors were detected assert len(result.risk_factors) >= 2 # Check for specific risk factors factor_types = [f['factor'] for f in result.risk_factors] assert 'high_claim_amount' in factor_types def test_explanation_generation(self, fraud_detector): """Test that explanations are generated""" claim_data = { 'claim_id': 'TEST-007', 'claim_amount': 10000, 'claim_type': 'auto_accident' } result = fraud_detector.detect(claim_data=claim_data, explain=True) assert result.explanation is not None assert 'human_readable_summary' in result.explanation assert 'contributing_factors' in result.explanation assert 'recommendations' in result.explanation def test_audit_trail_creation(self, fraud_detector): """Test that audit trail is created""" claim_data = { 'claim_id': 'TEST-008', 'claim_amount': 5000, 'claim_type': 'property_damage' } result = fraud_detector.detect( claim_data=claim_data, audit_trail=True, user_id="test_adjuster" ) assert result.audit_id is not None # Retrieve audit record audit_record = fraud_detector.get_audit_record(result.audit_id) assert audit_record is not None assert audit_record['user_id'] == 'test_adjuster' assert audit_record['claim_id'] == 'TEST-008' def test_compliance_flags(self, fraud_detector): """Test that compliance flags are set""" claim_data = { 'claim_id': 'TEST-009', 'claim_amount': 5000, 'claim_type': 'auto_accident' } result = fraud_detector.detect(claim_data=claim_data, explain=True) flags = result.metadata['compliance_flags'] assert flags['aml_compliant'] is True assert flags['gdpr_compliant'] is True def test_result_serialization(self, fraud_detector): """Test that results can be serialized to JSON""" claim_data = { 'claim_id': 'TEST-010', 'claim_amount': 5000, 'claim_type': 'auto_accident' } result = fraud_detector.detect(claim_data=claim_data) result_dict = result.to_dict() json_str = json.dumps(result_dict) assert json_str is not None assert len(json_str) > 0 class TestIntegration: """Test cases for integrated workflow""" @pytest.fixture def workflow(self): """Create workflow instance""" return ClaimsProcessingWorkflow() def test_initialization(self, workflow): """Test workflow initialization""" assert workflow is not None assert workflow.text_classifier is not None assert workflow.fraud_detector is not None def test_simple_claim_processing(self, workflow): """Test processing of simple claim""" claim_data = { 'claim_id': 'INT-001', 'description': 'Minor car accident in parking lot', 'claim_amount': 3000, 'claim_date': '2026-01-03T10:00:00Z', 'policy_start_date': '2023-01-01T00:00:00Z', 'claimant_history': { 'previous_claims': 0, 'years_as_customer': 3 }, 'witnesses': 2, 'third_party_involved': True } result = workflow.process_claim(claim_data, user_id="test_user") assert isinstance(result, ClaimProcessingResult) assert result.claim_id == 'INT-001' assert result.final_decision in ['approve', 'review', 'escalate', 'reject'] assert len(result.audit_trail) >= 2 # At least classification and fraud detection def test_high_risk_claim_processing(self, workflow): """Test processing of high-risk claim""" claim_data = { 'claim_id': 'INT-002', 'description': 'Total loss fire damage', 'claim_amount': 150000, 'claim_date': '2026-01-03T23:00:00Z', 'policy_start_date': '2025-12-28T00:00:00Z', 'claimant_history': { 'previous_claims': 5, 'years_as_customer': 1 }, 'witnesses': 0, 'third_party_involved': False } result = workflow.process_claim(claim_data, user_id="test_user") # High-risk claims should be escalated or rejected assert result.final_decision in ['escalate', 'reject'] assert result.fraud_detection['risk_level'] in ['high', 'critical'] def test_batch_processing(self, workflow): """Test batch processing of multiple claims""" claims = [ { 'claim_id': f'BATCH-{i:03d}', 'description': 'Test claim description', 'claim_amount': 5000, 'claim_date': '2026-01-03T10:00:00Z', 'policy_start_date': '2023-01-01T00:00:00Z', 'claimant_history': {'previous_claims': 0, 'years_as_customer': 3} } for i in range(5) ] results = workflow.batch_process_claims(claims, user_id="batch_user") assert len(results) == 5 assert all(isinstance(r, ClaimProcessingResult) for r in results) assert all(r.final_decision in ['approve', 'review', 'escalate', 'reject'] for r in results) def test_audit_trail_completeness(self, workflow): """Test that audit trail captures all steps""" claim_data = { 'claim_id': 'AUDIT-001', 'description': 'Test claim for audit trail', 'claim_amount': 5000, 'claim_date': '2026-01-03T10:00:00Z', 'policy_start_date': '2023-01-01T00:00:00Z', 'claimant_history': {'previous_claims': 0, 'years_as_customer': 3} } result = workflow.process_claim(claim_data, user_id="audit_user") # Check audit trail has all expected steps steps = [entry['step'] for entry in result.audit_trail] assert 'classification' in steps assert 'fraud_detection' in steps assert 'final_decision' in steps def test_decision_logic_approve(self, workflow): """Test that low-risk claims are approved""" claim_data = { 'claim_id': 'DECISION-001', 'description': 'Minor fender bender', 'claim_amount': 2000, 'claim_date': '2026-01-03T10:00:00Z', 'policy_start_date': '2022-01-01T00:00:00Z', 'claimant_history': {'previous_claims': 0, 'years_as_customer': 4}, 'witnesses': 2 } result = workflow.process_claim(claim_data) # Low-risk, low-amount claims should be approved assert result.final_decision == 'approve' def test_decision_logic_review(self, workflow): """Test that medium-risk claims require review""" claim_data = { 'claim_id': 'DECISION-002', 'description': 'Medical treatment claim', 'claim_amount': 60000, # High amount triggers review 'claim_date': '2026-01-03T10:00:00Z', 'policy_start_date': '2023-01-01T00:00:00Z', 'claimant_history': {'previous_claims': 1, 'years_as_customer': 3} } result = workflow.process_claim(claim_data) # High-amount claims should require review assert result.final_decision in ['review', 'escalate'] def test_metadata_structure(self, workflow): """Test that metadata has correct structure""" claim_data = { 'claim_id': 'META-001', 'description': 'Test claim', 'claim_amount': 5000, 'claim_date': '2026-01-03T10:00:00Z', 'policy_start_date': '2023-01-01T00:00:00Z', 'claimant_history': {'previous_claims': 0, 'years_as_customer': 3} } result = workflow.process_claim(claim_data) assert result.metadata is not None assert 'workflow_version' in result.metadata assert 'processing_time_ms' in result.metadata assert 'compliance_flags' in result.metadata def test_result_serialization(self, workflow): """Test that workflow results can be serialized""" claim_data = { 'claim_id': 'SERIAL-001', 'description': 'Test claim', 'claim_amount': 5000, 'claim_date': '2026-01-03T10:00:00Z', 'policy_start_date': '2023-01-01T00:00:00Z', 'claimant_history': {'previous_claims': 0, 'years_as_customer': 3} } result = workflow.process_claim(claim_data) result_dict = result.to_dict() json_str = json.dumps(result_dict) assert json_str is not None assert len(json_str) > 0 class TestPerformance: """Performance and benchmark tests""" def test_classification_latency(self): """Test that classification meets latency SLA""" classifier = TextClassificationCapability() text = "Test claim description for performance testing" result = classifier.classify(text=text) # Should complete in under 500ms (generous for mock implementation) assert result.metadata['processing_time_ms'] < 500 def test_fraud_detection_latency(self): """Test that fraud detection meets latency SLA""" fraud_detector = FraudDetectionCapability() claim_data = { 'claim_id': 'PERF-001', 'claim_amount': 5000, 'claim_type': 'auto_accident' } result = fraud_detector.detect(claim_data=claim_data) # Should complete in under 300ms assert result.metadata['processing_time_ms'] < 300 def test_workflow_latency(self): """Test that complete workflow meets latency SLA""" workflow = ClaimsProcessingWorkflow() claim_data = { 'claim_id': 'PERF-002', 'description': 'Performance test claim', 'claim_amount': 5000, 'claim_date': '2026-01-03T10:00:00Z', 'policy_start_date': '2023-01-01T00:00:00Z', 'claimant_history': {'previous_claims': 0, 'years_as_customer': 3} } result = workflow.process_claim(claim_data) # Complete workflow should finish in under 1000ms assert result.processing_time_ms < 1000 if __name__ == "__main__": # Run tests with pytest pytest.main([__file__, "-v", "--tb=short"])