Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Test Cases for BDR Agent Factory Examples | |
| Comprehensive test suite for text classification, fraud detection, | |
| and integration examples. | |
| """ | |
| import pytest | |
| import json | |
| from datetime import datetime | |
| # Import example implementations | |
| try: | |
| from text_classification_example import TextClassificationCapability, ClassificationResult | |
| from fraud_detection_example import FraudDetectionCapability, FraudDetectionResult | |
| from integration_example import ClaimsProcessingWorkflow, ClaimProcessingResult | |
| EXAMPLES_AVAILABLE = True | |
| except ImportError: | |
| EXAMPLES_AVAILABLE = False | |
| pytest.skip("Example implementations not available", allow_module_level=True) | |
| class TestTextClassification: | |
| """Test cases for text classification capability""" | |
| def classifier(self): | |
| """Create text classifier instance""" | |
| return TextClassificationCapability(enable_audit=True) | |
| def test_initialization(self, classifier): | |
| """Test classifier initialization""" | |
| assert classifier is not None | |
| assert classifier.CAPABILITY_ID == "cap_text_classification" | |
| assert classifier.VERSION == "2.1.0" | |
| assert classifier.enable_audit is True | |
| def test_property_damage_classification(self, classifier): | |
| """Test classification of property damage claim""" | |
| text = "Water damage to basement after heavy rain and flooding" | |
| result = classifier.classify(text=text, explain=True) | |
| assert isinstance(result, ClassificationResult) | |
| assert result.predicted_class == "property_damage" | |
| assert result.confidence > 0.7 | |
| assert result.explanation is not None | |
| assert result.audit_id is not None | |
| def test_auto_accident_classification(self, classifier): | |
| """Test classification of auto accident claim""" | |
| text = "Rear-end collision on highway during rush hour traffic" | |
| result = classifier.classify(text=text, explain=True) | |
| assert result.predicted_class == "auto_accident" | |
| assert result.confidence > 0.7 | |
| assert 'collision' in text.lower() or 'accident' in text.lower() | |
| def test_health_claim_classification(self, classifier): | |
| """Test classification of health claim""" | |
| text = "Patient underwent surgery at hospital for medical treatment" | |
| result = classifier.classify(text=text, explain=True) | |
| assert result.predicted_class == "health_claim" | |
| assert result.confidence > 0.7 | |
| def test_liability_classification(self, classifier): | |
| """Test classification of liability claim""" | |
| text = "Customer slipped and fell in store, sustained injury" | |
| result = classifier.classify(text=text, explain=True) | |
| assert result.predicted_class == "liability" | |
| assert result.confidence > 0.7 | |
| def test_empty_input_validation(self, classifier): | |
| """Test that empty input raises ValueError""" | |
| with pytest.raises(ValueError, match="non-empty string"): | |
| classifier.classify(text="") | |
| def test_long_input_validation(self, classifier): | |
| """Test that excessively long input raises ValueError""" | |
| long_text = "word " * 10000 | |
| with pytest.raises(ValueError, match="maximum length"): | |
| classifier.classify(text=long_text) | |
| def test_malicious_input_validation(self, classifier): | |
| """Test that malicious input is rejected""" | |
| malicious_text = "<script>alert('xss')</script>" | |
| with pytest.raises(ValueError, match="malicious content"): | |
| classifier.classify(text=malicious_text) | |
| def test_confidence_threshold(self, classifier): | |
| """Test confidence threshold filtering""" | |
| text = "Ambiguous claim description" | |
| result = classifier.classify( | |
| text=text, | |
| confidence_threshold=0.99 # Very high threshold | |
| ) | |
| # With high threshold, uncertain claims should be marked | |
| assert result is not None | |
| def test_explanation_generation(self, classifier): | |
| """Test that explanations are generated correctly""" | |
| text = "Water damage to basement after storm" | |
| result = classifier.classify(text=text, explain=True) | |
| assert result.explanation is not None | |
| assert 'method' in result.explanation | |
| assert 'local_explanation' in result.explanation | |
| assert 'key_features' in result.explanation['local_explanation'] | |
| assert len(result.explanation['local_explanation']['key_features']) > 0 | |
| def test_audit_trail_creation(self, classifier): | |
| """Test that audit trail is created""" | |
| text = "Test claim description" | |
| result = classifier.classify( | |
| text=text, | |
| audit_trail=True, | |
| user_id="test_user" | |
| ) | |
| assert result.audit_id is not None | |
| # Retrieve audit record | |
| audit_record = classifier.get_audit_record(result.audit_id) | |
| assert audit_record is not None | |
| assert audit_record['user_id'] == 'test_user' | |
| assert audit_record['capability_id'] == 'cap_text_classification' | |
| def test_batch_classification(self, classifier): | |
| """Test batch classification""" | |
| texts = [ | |
| "Water damage to property", | |
| "Car accident on highway", | |
| "Medical treatment at hospital" | |
| ] | |
| results = classifier.batch_classify(texts=texts, explain=False) | |
| assert len(results) == 3 | |
| assert all(isinstance(r, ClassificationResult) for r in results) | |
| assert results[0].predicted_class == "property_damage" | |
| assert results[1].predicted_class == "auto_accident" | |
| assert results[2].predicted_class == "health_claim" | |
| def test_metadata_structure(self, classifier): | |
| """Test that metadata has correct structure""" | |
| text = "Test claim" | |
| result = classifier.classify(text=text) | |
| assert result.metadata is not None | |
| assert 'capability_id' in result.metadata | |
| assert 'version' in result.metadata | |
| assert 'processing_time_ms' in result.metadata | |
| assert 'timestamp' in result.metadata | |
| assert 'compliance_flags' in result.metadata | |
| def test_compliance_flags(self, classifier): | |
| """Test that compliance flags are set correctly""" | |
| text = "Test claim" | |
| result = classifier.classify(text=text, explain=True, audit_trail=True) | |
| flags = result.metadata['compliance_flags'] | |
| assert flags['explainable'] is True | |
| assert flags['auditable'] is True | |
| assert flags['gdpr_compliant'] is True | |
| assert flags['ifrs17_compliant'] is True | |
| def test_result_serialization(self, classifier): | |
| """Test that results can be serialized to JSON""" | |
| text = "Test claim" | |
| result = classifier.classify(text=text) | |
| result_dict = result.to_dict() | |
| json_str = json.dumps(result_dict) | |
| assert json_str is not None | |
| assert len(json_str) > 0 | |
| class TestFraudDetection: | |
| """Test cases for fraud detection capability""" | |
| def fraud_detector(self): | |
| """Create fraud detector instance""" | |
| return FraudDetectionCapability(enable_audit=True) | |
| def test_initialization(self, fraud_detector): | |
| """Test fraud detector initialization""" | |
| assert fraud_detector is not None | |
| assert fraud_detector.CAPABILITY_ID == "cap_fraud_detection" | |
| assert fraud_detector.VERSION == "1.5.0" | |
| def test_low_risk_claim(self, fraud_detector): | |
| """Test detection of low-risk claim""" | |
| claim_data = { | |
| 'claim_id': 'TEST-001', | |
| 'claim_amount': 2000, | |
| 'claim_type': 'auto_accident', | |
| 'claim_date': '2026-01-03T10:00:00Z', | |
| 'policy_start_date': '2023-01-01T00:00:00Z', | |
| 'claimant_history': { | |
| 'previous_claims': 0, | |
| 'years_as_customer': 3 | |
| }, | |
| 'witnesses': 2 | |
| } | |
| result = fraud_detector.detect(claim_data=claim_data, explain=True) | |
| assert isinstance(result, FraudDetectionResult) | |
| assert result.risk_level == 'low' | |
| assert result.fraud_score < 0.3 | |
| assert result.recommendation == 'approve' | |
| def test_high_risk_claim(self, fraud_detector): | |
| """Test detection of high-risk claim""" | |
| claim_data = { | |
| 'claim_id': 'TEST-002', | |
| 'claim_amount': 100000, # High amount | |
| 'claim_type': 'property_damage', | |
| 'claim_date': '2026-01-03T23:00:00Z', # Late night | |
| 'policy_start_date': '2025-12-28T00:00:00Z', # Recent policy | |
| 'claimant_history': { | |
| 'previous_claims': 5, # Frequent claims | |
| 'years_as_customer': 1 | |
| }, | |
| 'witnesses': 0, # No witnesses | |
| 'incident_details': 'Fire' # Very short description | |
| } | |
| result = fraud_detector.detect(claim_data=claim_data, explain=True) | |
| assert result.risk_level in ['high', 'critical'] | |
| assert result.fraud_score > 0.6 | |
| assert result.recommendation in ['escalate', 'reject'] | |
| assert len(result.risk_factors) > 0 | |
| def test_medium_risk_claim(self, fraud_detector): | |
| """Test detection of medium-risk claim""" | |
| claim_data = { | |
| 'claim_id': 'TEST-003', | |
| 'claim_amount': 25000, | |
| 'claim_type': 'health_claim', | |
| 'claim_date': '2026-01-03T14:00:00Z', | |
| 'policy_start_date': '2024-01-01T00:00:00Z', | |
| 'claimant_history': { | |
| 'previous_claims': 2, | |
| 'years_as_customer': 2 | |
| }, | |
| 'witnesses': 1 | |
| } | |
| result = fraud_detector.detect(claim_data=claim_data, explain=True) | |
| assert result.risk_level in ['low', 'medium'] | |
| assert result.recommendation in ['approve', 'review'] | |
| def test_missing_required_fields(self, fraud_detector): | |
| """Test that missing required fields raise ValueError""" | |
| incomplete_claim = { | |
| 'claim_id': 'TEST-004' | |
| # Missing claim_amount and claim_type | |
| } | |
| with pytest.raises(ValueError, match="Missing required field"): | |
| fraud_detector.detect(claim_data=incomplete_claim) | |
| def test_invalid_claim_amount(self, fraud_detector): | |
| """Test that invalid claim amount raises ValueError""" | |
| invalid_claim = { | |
| 'claim_id': 'TEST-005', | |
| 'claim_amount': -1000, # Negative amount | |
| 'claim_type': 'auto_accident' | |
| } | |
| with pytest.raises(ValueError, match="cannot be negative"): | |
| fraud_detector.detect(claim_data=invalid_claim) | |
| def test_risk_factor_detection(self, fraud_detector): | |
| """Test that risk factors are detected correctly""" | |
| claim_data = { | |
| 'claim_id': 'TEST-006', | |
| 'claim_amount': 75000, # Should trigger high_claim_amount | |
| 'claim_type': 'property_damage', | |
| 'claim_date': '2026-01-03T10:00:00Z', | |
| 'policy_start_date': '2025-12-20T00:00:00Z', # Should trigger recent_policy | |
| 'claimant_history': { | |
| 'previous_claims': 4, # Should trigger frequent_claims | |
| 'years_as_customer': 1 | |
| } | |
| } | |
| result = fraud_detector.detect(claim_data=claim_data, explain=True) | |
| # Check that multiple risk factors were detected | |
| assert len(result.risk_factors) >= 2 | |
| # Check for specific risk factors | |
| factor_types = [f['factor'] for f in result.risk_factors] | |
| assert 'high_claim_amount' in factor_types | |
| def test_explanation_generation(self, fraud_detector): | |
| """Test that explanations are generated""" | |
| claim_data = { | |
| 'claim_id': 'TEST-007', | |
| 'claim_amount': 10000, | |
| 'claim_type': 'auto_accident' | |
| } | |
| result = fraud_detector.detect(claim_data=claim_data, explain=True) | |
| assert result.explanation is not None | |
| assert 'human_readable_summary' in result.explanation | |
| assert 'contributing_factors' in result.explanation | |
| assert 'recommendations' in result.explanation | |
| def test_audit_trail_creation(self, fraud_detector): | |
| """Test that audit trail is created""" | |
| claim_data = { | |
| 'claim_id': 'TEST-008', | |
| 'claim_amount': 5000, | |
| 'claim_type': 'property_damage' | |
| } | |
| result = fraud_detector.detect( | |
| claim_data=claim_data, | |
| audit_trail=True, | |
| user_id="test_adjuster" | |
| ) | |
| assert result.audit_id is not None | |
| # Retrieve audit record | |
| audit_record = fraud_detector.get_audit_record(result.audit_id) | |
| assert audit_record is not None | |
| assert audit_record['user_id'] == 'test_adjuster' | |
| assert audit_record['claim_id'] == 'TEST-008' | |
| def test_compliance_flags(self, fraud_detector): | |
| """Test that compliance flags are set""" | |
| claim_data = { | |
| 'claim_id': 'TEST-009', | |
| 'claim_amount': 5000, | |
| 'claim_type': 'auto_accident' | |
| } | |
| result = fraud_detector.detect(claim_data=claim_data, explain=True) | |
| flags = result.metadata['compliance_flags'] | |
| assert flags['aml_compliant'] is True | |
| assert flags['gdpr_compliant'] is True | |
| def test_result_serialization(self, fraud_detector): | |
| """Test that results can be serialized to JSON""" | |
| claim_data = { | |
| 'claim_id': 'TEST-010', | |
| 'claim_amount': 5000, | |
| 'claim_type': 'auto_accident' | |
| } | |
| result = fraud_detector.detect(claim_data=claim_data) | |
| result_dict = result.to_dict() | |
| json_str = json.dumps(result_dict) | |
| assert json_str is not None | |
| assert len(json_str) > 0 | |
| class TestIntegration: | |
| """Test cases for integrated workflow""" | |
| def workflow(self): | |
| """Create workflow instance""" | |
| return ClaimsProcessingWorkflow() | |
| def test_initialization(self, workflow): | |
| """Test workflow initialization""" | |
| assert workflow is not None | |
| assert workflow.text_classifier is not None | |
| assert workflow.fraud_detector is not None | |
| def test_simple_claim_processing(self, workflow): | |
| """Test processing of simple claim""" | |
| claim_data = { | |
| 'claim_id': 'INT-001', | |
| 'description': 'Minor car accident in parking lot', | |
| 'claim_amount': 3000, | |
| 'claim_date': '2026-01-03T10:00:00Z', | |
| 'policy_start_date': '2023-01-01T00:00:00Z', | |
| 'claimant_history': { | |
| 'previous_claims': 0, | |
| 'years_as_customer': 3 | |
| }, | |
| 'witnesses': 2, | |
| 'third_party_involved': True | |
| } | |
| result = workflow.process_claim(claim_data, user_id="test_user") | |
| assert isinstance(result, ClaimProcessingResult) | |
| assert result.claim_id == 'INT-001' | |
| assert result.final_decision in ['approve', 'review', 'escalate', 'reject'] | |
| assert len(result.audit_trail) >= 2 # At least classification and fraud detection | |
| def test_high_risk_claim_processing(self, workflow): | |
| """Test processing of high-risk claim""" | |
| claim_data = { | |
| 'claim_id': 'INT-002', | |
| 'description': 'Total loss fire damage', | |
| 'claim_amount': 150000, | |
| 'claim_date': '2026-01-03T23:00:00Z', | |
| 'policy_start_date': '2025-12-28T00:00:00Z', | |
| 'claimant_history': { | |
| 'previous_claims': 5, | |
| 'years_as_customer': 1 | |
| }, | |
| 'witnesses': 0, | |
| 'third_party_involved': False | |
| } | |
| result = workflow.process_claim(claim_data, user_id="test_user") | |
| # High-risk claims should be escalated or rejected | |
| assert result.final_decision in ['escalate', 'reject'] | |
| assert result.fraud_detection['risk_level'] in ['high', 'critical'] | |
| def test_batch_processing(self, workflow): | |
| """Test batch processing of multiple claims""" | |
| claims = [ | |
| { | |
| 'claim_id': f'BATCH-{i:03d}', | |
| 'description': 'Test claim description', | |
| 'claim_amount': 5000, | |
| 'claim_date': '2026-01-03T10:00:00Z', | |
| 'policy_start_date': '2023-01-01T00:00:00Z', | |
| 'claimant_history': {'previous_claims': 0, 'years_as_customer': 3} | |
| } | |
| for i in range(5) | |
| ] | |
| results = workflow.batch_process_claims(claims, user_id="batch_user") | |
| assert len(results) == 5 | |
| assert all(isinstance(r, ClaimProcessingResult) for r in results) | |
| assert all(r.final_decision in ['approve', 'review', 'escalate', 'reject'] for r in results) | |
| def test_audit_trail_completeness(self, workflow): | |
| """Test that audit trail captures all steps""" | |
| claim_data = { | |
| 'claim_id': 'AUDIT-001', | |
| 'description': 'Test claim for audit trail', | |
| 'claim_amount': 5000, | |
| 'claim_date': '2026-01-03T10:00:00Z', | |
| 'policy_start_date': '2023-01-01T00:00:00Z', | |
| 'claimant_history': {'previous_claims': 0, 'years_as_customer': 3} | |
| } | |
| result = workflow.process_claim(claim_data, user_id="audit_user") | |
| # Check audit trail has all expected steps | |
| steps = [entry['step'] for entry in result.audit_trail] | |
| assert 'classification' in steps | |
| assert 'fraud_detection' in steps | |
| assert 'final_decision' in steps | |
| def test_decision_logic_approve(self, workflow): | |
| """Test that low-risk claims are approved""" | |
| claim_data = { | |
| 'claim_id': 'DECISION-001', | |
| 'description': 'Minor fender bender', | |
| 'claim_amount': 2000, | |
| 'claim_date': '2026-01-03T10:00:00Z', | |
| 'policy_start_date': '2022-01-01T00:00:00Z', | |
| 'claimant_history': {'previous_claims': 0, 'years_as_customer': 4}, | |
| 'witnesses': 2 | |
| } | |
| result = workflow.process_claim(claim_data) | |
| # Low-risk, low-amount claims should be approved | |
| assert result.final_decision == 'approve' | |
| def test_decision_logic_review(self, workflow): | |
| """Test that medium-risk claims require review""" | |
| claim_data = { | |
| 'claim_id': 'DECISION-002', | |
| 'description': 'Medical treatment claim', | |
| 'claim_amount': 60000, # High amount triggers review | |
| 'claim_date': '2026-01-03T10:00:00Z', | |
| 'policy_start_date': '2023-01-01T00:00:00Z', | |
| 'claimant_history': {'previous_claims': 1, 'years_as_customer': 3} | |
| } | |
| result = workflow.process_claim(claim_data) | |
| # High-amount claims should require review | |
| assert result.final_decision in ['review', 'escalate'] | |
| def test_metadata_structure(self, workflow): | |
| """Test that metadata has correct structure""" | |
| claim_data = { | |
| 'claim_id': 'META-001', | |
| 'description': 'Test claim', | |
| 'claim_amount': 5000, | |
| 'claim_date': '2026-01-03T10:00:00Z', | |
| 'policy_start_date': '2023-01-01T00:00:00Z', | |
| 'claimant_history': {'previous_claims': 0, 'years_as_customer': 3} | |
| } | |
| result = workflow.process_claim(claim_data) | |
| assert result.metadata is not None | |
| assert 'workflow_version' in result.metadata | |
| assert 'processing_time_ms' in result.metadata | |
| assert 'compliance_flags' in result.metadata | |
| def test_result_serialization(self, workflow): | |
| """Test that workflow results can be serialized""" | |
| claim_data = { | |
| 'claim_id': 'SERIAL-001', | |
| 'description': 'Test claim', | |
| 'claim_amount': 5000, | |
| 'claim_date': '2026-01-03T10:00:00Z', | |
| 'policy_start_date': '2023-01-01T00:00:00Z', | |
| 'claimant_history': {'previous_claims': 0, 'years_as_customer': 3} | |
| } | |
| result = workflow.process_claim(claim_data) | |
| result_dict = result.to_dict() | |
| json_str = json.dumps(result_dict) | |
| assert json_str is not None | |
| assert len(json_str) > 0 | |
| class TestPerformance: | |
| """Performance and benchmark tests""" | |
| def test_classification_latency(self): | |
| """Test that classification meets latency SLA""" | |
| classifier = TextClassificationCapability() | |
| text = "Test claim description for performance testing" | |
| result = classifier.classify(text=text) | |
| # Should complete in under 500ms (generous for mock implementation) | |
| assert result.metadata['processing_time_ms'] < 500 | |
| def test_fraud_detection_latency(self): | |
| """Test that fraud detection meets latency SLA""" | |
| fraud_detector = FraudDetectionCapability() | |
| claim_data = { | |
| 'claim_id': 'PERF-001', | |
| 'claim_amount': 5000, | |
| 'claim_type': 'auto_accident' | |
| } | |
| result = fraud_detector.detect(claim_data=claim_data) | |
| # Should complete in under 300ms | |
| assert result.metadata['processing_time_ms'] < 300 | |
| def test_workflow_latency(self): | |
| """Test that complete workflow meets latency SLA""" | |
| workflow = ClaimsProcessingWorkflow() | |
| claim_data = { | |
| 'claim_id': 'PERF-002', | |
| 'description': 'Performance test claim', | |
| 'claim_amount': 5000, | |
| 'claim_date': '2026-01-03T10:00:00Z', | |
| 'policy_start_date': '2023-01-01T00:00:00Z', | |
| 'claimant_history': {'previous_claims': 0, 'years_as_customer': 3} | |
| } | |
| result = workflow.process_claim(claim_data) | |
| # Complete workflow should finish in under 1000ms | |
| assert result.processing_time_ms < 1000 | |
| if __name__ == "__main__": | |
| # Run tests with pytest | |
| pytest.main([__file__, "-v", "--tb=short"]) | |