BDR-Agent-Factory / examples /test_examples.py
Bader Alabddan
Add comprehensive documentation and implementation framework
3ef5d3c
#!/usr/bin/env python3
"""
Test Cases for BDR Agent Factory Examples
Comprehensive test suite for text classification, fraud detection,
and integration examples.
"""
import pytest
import json
from datetime import datetime
# Import example implementations
try:
from text_classification_example import TextClassificationCapability, ClassificationResult
from fraud_detection_example import FraudDetectionCapability, FraudDetectionResult
from integration_example import ClaimsProcessingWorkflow, ClaimProcessingResult
EXAMPLES_AVAILABLE = True
except ImportError:
EXAMPLES_AVAILABLE = False
pytest.skip("Example implementations not available", allow_module_level=True)
class TestTextClassification:
"""Test cases for text classification capability"""
@pytest.fixture
def classifier(self):
"""Create text classifier instance"""
return TextClassificationCapability(enable_audit=True)
def test_initialization(self, classifier):
"""Test classifier initialization"""
assert classifier is not None
assert classifier.CAPABILITY_ID == "cap_text_classification"
assert classifier.VERSION == "2.1.0"
assert classifier.enable_audit is True
def test_property_damage_classification(self, classifier):
"""Test classification of property damage claim"""
text = "Water damage to basement after heavy rain and flooding"
result = classifier.classify(text=text, explain=True)
assert isinstance(result, ClassificationResult)
assert result.predicted_class == "property_damage"
assert result.confidence > 0.7
assert result.explanation is not None
assert result.audit_id is not None
def test_auto_accident_classification(self, classifier):
"""Test classification of auto accident claim"""
text = "Rear-end collision on highway during rush hour traffic"
result = classifier.classify(text=text, explain=True)
assert result.predicted_class == "auto_accident"
assert result.confidence > 0.7
assert 'collision' in text.lower() or 'accident' in text.lower()
def test_health_claim_classification(self, classifier):
"""Test classification of health claim"""
text = "Patient underwent surgery at hospital for medical treatment"
result = classifier.classify(text=text, explain=True)
assert result.predicted_class == "health_claim"
assert result.confidence > 0.7
def test_liability_classification(self, classifier):
"""Test classification of liability claim"""
text = "Customer slipped and fell in store, sustained injury"
result = classifier.classify(text=text, explain=True)
assert result.predicted_class == "liability"
assert result.confidence > 0.7
def test_empty_input_validation(self, classifier):
"""Test that empty input raises ValueError"""
with pytest.raises(ValueError, match="non-empty string"):
classifier.classify(text="")
def test_long_input_validation(self, classifier):
"""Test that excessively long input raises ValueError"""
long_text = "word " * 10000
with pytest.raises(ValueError, match="maximum length"):
classifier.classify(text=long_text)
def test_malicious_input_validation(self, classifier):
"""Test that malicious input is rejected"""
malicious_text = "<script>alert('xss')</script>"
with pytest.raises(ValueError, match="malicious content"):
classifier.classify(text=malicious_text)
def test_confidence_threshold(self, classifier):
"""Test confidence threshold filtering"""
text = "Ambiguous claim description"
result = classifier.classify(
text=text,
confidence_threshold=0.99 # Very high threshold
)
# With high threshold, uncertain claims should be marked
assert result is not None
def test_explanation_generation(self, classifier):
"""Test that explanations are generated correctly"""
text = "Water damage to basement after storm"
result = classifier.classify(text=text, explain=True)
assert result.explanation is not None
assert 'method' in result.explanation
assert 'local_explanation' in result.explanation
assert 'key_features' in result.explanation['local_explanation']
assert len(result.explanation['local_explanation']['key_features']) > 0
def test_audit_trail_creation(self, classifier):
"""Test that audit trail is created"""
text = "Test claim description"
result = classifier.classify(
text=text,
audit_trail=True,
user_id="test_user"
)
assert result.audit_id is not None
# Retrieve audit record
audit_record = classifier.get_audit_record(result.audit_id)
assert audit_record is not None
assert audit_record['user_id'] == 'test_user'
assert audit_record['capability_id'] == 'cap_text_classification'
def test_batch_classification(self, classifier):
"""Test batch classification"""
texts = [
"Water damage to property",
"Car accident on highway",
"Medical treatment at hospital"
]
results = classifier.batch_classify(texts=texts, explain=False)
assert len(results) == 3
assert all(isinstance(r, ClassificationResult) for r in results)
assert results[0].predicted_class == "property_damage"
assert results[1].predicted_class == "auto_accident"
assert results[2].predicted_class == "health_claim"
def test_metadata_structure(self, classifier):
"""Test that metadata has correct structure"""
text = "Test claim"
result = classifier.classify(text=text)
assert result.metadata is not None
assert 'capability_id' in result.metadata
assert 'version' in result.metadata
assert 'processing_time_ms' in result.metadata
assert 'timestamp' in result.metadata
assert 'compliance_flags' in result.metadata
def test_compliance_flags(self, classifier):
"""Test that compliance flags are set correctly"""
text = "Test claim"
result = classifier.classify(text=text, explain=True, audit_trail=True)
flags = result.metadata['compliance_flags']
assert flags['explainable'] is True
assert flags['auditable'] is True
assert flags['gdpr_compliant'] is True
assert flags['ifrs17_compliant'] is True
def test_result_serialization(self, classifier):
"""Test that results can be serialized to JSON"""
text = "Test claim"
result = classifier.classify(text=text)
result_dict = result.to_dict()
json_str = json.dumps(result_dict)
assert json_str is not None
assert len(json_str) > 0
class TestFraudDetection:
"""Test cases for fraud detection capability"""
@pytest.fixture
def fraud_detector(self):
"""Create fraud detector instance"""
return FraudDetectionCapability(enable_audit=True)
def test_initialization(self, fraud_detector):
"""Test fraud detector initialization"""
assert fraud_detector is not None
assert fraud_detector.CAPABILITY_ID == "cap_fraud_detection"
assert fraud_detector.VERSION == "1.5.0"
def test_low_risk_claim(self, fraud_detector):
"""Test detection of low-risk claim"""
claim_data = {
'claim_id': 'TEST-001',
'claim_amount': 2000,
'claim_type': 'auto_accident',
'claim_date': '2026-01-03T10:00:00Z',
'policy_start_date': '2023-01-01T00:00:00Z',
'claimant_history': {
'previous_claims': 0,
'years_as_customer': 3
},
'witnesses': 2
}
result = fraud_detector.detect(claim_data=claim_data, explain=True)
assert isinstance(result, FraudDetectionResult)
assert result.risk_level == 'low'
assert result.fraud_score < 0.3
assert result.recommendation == 'approve'
def test_high_risk_claim(self, fraud_detector):
"""Test detection of high-risk claim"""
claim_data = {
'claim_id': 'TEST-002',
'claim_amount': 100000, # High amount
'claim_type': 'property_damage',
'claim_date': '2026-01-03T23:00:00Z', # Late night
'policy_start_date': '2025-12-28T00:00:00Z', # Recent policy
'claimant_history': {
'previous_claims': 5, # Frequent claims
'years_as_customer': 1
},
'witnesses': 0, # No witnesses
'incident_details': 'Fire' # Very short description
}
result = fraud_detector.detect(claim_data=claim_data, explain=True)
assert result.risk_level in ['high', 'critical']
assert result.fraud_score > 0.6
assert result.recommendation in ['escalate', 'reject']
assert len(result.risk_factors) > 0
def test_medium_risk_claim(self, fraud_detector):
"""Test detection of medium-risk claim"""
claim_data = {
'claim_id': 'TEST-003',
'claim_amount': 25000,
'claim_type': 'health_claim',
'claim_date': '2026-01-03T14:00:00Z',
'policy_start_date': '2024-01-01T00:00:00Z',
'claimant_history': {
'previous_claims': 2,
'years_as_customer': 2
},
'witnesses': 1
}
result = fraud_detector.detect(claim_data=claim_data, explain=True)
assert result.risk_level in ['low', 'medium']
assert result.recommendation in ['approve', 'review']
def test_missing_required_fields(self, fraud_detector):
"""Test that missing required fields raise ValueError"""
incomplete_claim = {
'claim_id': 'TEST-004'
# Missing claim_amount and claim_type
}
with pytest.raises(ValueError, match="Missing required field"):
fraud_detector.detect(claim_data=incomplete_claim)
def test_invalid_claim_amount(self, fraud_detector):
"""Test that invalid claim amount raises ValueError"""
invalid_claim = {
'claim_id': 'TEST-005',
'claim_amount': -1000, # Negative amount
'claim_type': 'auto_accident'
}
with pytest.raises(ValueError, match="cannot be negative"):
fraud_detector.detect(claim_data=invalid_claim)
def test_risk_factor_detection(self, fraud_detector):
"""Test that risk factors are detected correctly"""
claim_data = {
'claim_id': 'TEST-006',
'claim_amount': 75000, # Should trigger high_claim_amount
'claim_type': 'property_damage',
'claim_date': '2026-01-03T10:00:00Z',
'policy_start_date': '2025-12-20T00:00:00Z', # Should trigger recent_policy
'claimant_history': {
'previous_claims': 4, # Should trigger frequent_claims
'years_as_customer': 1
}
}
result = fraud_detector.detect(claim_data=claim_data, explain=True)
# Check that multiple risk factors were detected
assert len(result.risk_factors) >= 2
# Check for specific risk factors
factor_types = [f['factor'] for f in result.risk_factors]
assert 'high_claim_amount' in factor_types
def test_explanation_generation(self, fraud_detector):
"""Test that explanations are generated"""
claim_data = {
'claim_id': 'TEST-007',
'claim_amount': 10000,
'claim_type': 'auto_accident'
}
result = fraud_detector.detect(claim_data=claim_data, explain=True)
assert result.explanation is not None
assert 'human_readable_summary' in result.explanation
assert 'contributing_factors' in result.explanation
assert 'recommendations' in result.explanation
def test_audit_trail_creation(self, fraud_detector):
"""Test that audit trail is created"""
claim_data = {
'claim_id': 'TEST-008',
'claim_amount': 5000,
'claim_type': 'property_damage'
}
result = fraud_detector.detect(
claim_data=claim_data,
audit_trail=True,
user_id="test_adjuster"
)
assert result.audit_id is not None
# Retrieve audit record
audit_record = fraud_detector.get_audit_record(result.audit_id)
assert audit_record is not None
assert audit_record['user_id'] == 'test_adjuster'
assert audit_record['claim_id'] == 'TEST-008'
def test_compliance_flags(self, fraud_detector):
"""Test that compliance flags are set"""
claim_data = {
'claim_id': 'TEST-009',
'claim_amount': 5000,
'claim_type': 'auto_accident'
}
result = fraud_detector.detect(claim_data=claim_data, explain=True)
flags = result.metadata['compliance_flags']
assert flags['aml_compliant'] is True
assert flags['gdpr_compliant'] is True
def test_result_serialization(self, fraud_detector):
"""Test that results can be serialized to JSON"""
claim_data = {
'claim_id': 'TEST-010',
'claim_amount': 5000,
'claim_type': 'auto_accident'
}
result = fraud_detector.detect(claim_data=claim_data)
result_dict = result.to_dict()
json_str = json.dumps(result_dict)
assert json_str is not None
assert len(json_str) > 0
class TestIntegration:
"""Test cases for integrated workflow"""
@pytest.fixture
def workflow(self):
"""Create workflow instance"""
return ClaimsProcessingWorkflow()
def test_initialization(self, workflow):
"""Test workflow initialization"""
assert workflow is not None
assert workflow.text_classifier is not None
assert workflow.fraud_detector is not None
def test_simple_claim_processing(self, workflow):
"""Test processing of simple claim"""
claim_data = {
'claim_id': 'INT-001',
'description': 'Minor car accident in parking lot',
'claim_amount': 3000,
'claim_date': '2026-01-03T10:00:00Z',
'policy_start_date': '2023-01-01T00:00:00Z',
'claimant_history': {
'previous_claims': 0,
'years_as_customer': 3
},
'witnesses': 2,
'third_party_involved': True
}
result = workflow.process_claim(claim_data, user_id="test_user")
assert isinstance(result, ClaimProcessingResult)
assert result.claim_id == 'INT-001'
assert result.final_decision in ['approve', 'review', 'escalate', 'reject']
assert len(result.audit_trail) >= 2 # At least classification and fraud detection
def test_high_risk_claim_processing(self, workflow):
"""Test processing of high-risk claim"""
claim_data = {
'claim_id': 'INT-002',
'description': 'Total loss fire damage',
'claim_amount': 150000,
'claim_date': '2026-01-03T23:00:00Z',
'policy_start_date': '2025-12-28T00:00:00Z',
'claimant_history': {
'previous_claims': 5,
'years_as_customer': 1
},
'witnesses': 0,
'third_party_involved': False
}
result = workflow.process_claim(claim_data, user_id="test_user")
# High-risk claims should be escalated or rejected
assert result.final_decision in ['escalate', 'reject']
assert result.fraud_detection['risk_level'] in ['high', 'critical']
def test_batch_processing(self, workflow):
"""Test batch processing of multiple claims"""
claims = [
{
'claim_id': f'BATCH-{i:03d}',
'description': 'Test claim description',
'claim_amount': 5000,
'claim_date': '2026-01-03T10:00:00Z',
'policy_start_date': '2023-01-01T00:00:00Z',
'claimant_history': {'previous_claims': 0, 'years_as_customer': 3}
}
for i in range(5)
]
results = workflow.batch_process_claims(claims, user_id="batch_user")
assert len(results) == 5
assert all(isinstance(r, ClaimProcessingResult) for r in results)
assert all(r.final_decision in ['approve', 'review', 'escalate', 'reject'] for r in results)
def test_audit_trail_completeness(self, workflow):
"""Test that audit trail captures all steps"""
claim_data = {
'claim_id': 'AUDIT-001',
'description': 'Test claim for audit trail',
'claim_amount': 5000,
'claim_date': '2026-01-03T10:00:00Z',
'policy_start_date': '2023-01-01T00:00:00Z',
'claimant_history': {'previous_claims': 0, 'years_as_customer': 3}
}
result = workflow.process_claim(claim_data, user_id="audit_user")
# Check audit trail has all expected steps
steps = [entry['step'] for entry in result.audit_trail]
assert 'classification' in steps
assert 'fraud_detection' in steps
assert 'final_decision' in steps
def test_decision_logic_approve(self, workflow):
"""Test that low-risk claims are approved"""
claim_data = {
'claim_id': 'DECISION-001',
'description': 'Minor fender bender',
'claim_amount': 2000,
'claim_date': '2026-01-03T10:00:00Z',
'policy_start_date': '2022-01-01T00:00:00Z',
'claimant_history': {'previous_claims': 0, 'years_as_customer': 4},
'witnesses': 2
}
result = workflow.process_claim(claim_data)
# Low-risk, low-amount claims should be approved
assert result.final_decision == 'approve'
def test_decision_logic_review(self, workflow):
"""Test that medium-risk claims require review"""
claim_data = {
'claim_id': 'DECISION-002',
'description': 'Medical treatment claim',
'claim_amount': 60000, # High amount triggers review
'claim_date': '2026-01-03T10:00:00Z',
'policy_start_date': '2023-01-01T00:00:00Z',
'claimant_history': {'previous_claims': 1, 'years_as_customer': 3}
}
result = workflow.process_claim(claim_data)
# High-amount claims should require review
assert result.final_decision in ['review', 'escalate']
def test_metadata_structure(self, workflow):
"""Test that metadata has correct structure"""
claim_data = {
'claim_id': 'META-001',
'description': 'Test claim',
'claim_amount': 5000,
'claim_date': '2026-01-03T10:00:00Z',
'policy_start_date': '2023-01-01T00:00:00Z',
'claimant_history': {'previous_claims': 0, 'years_as_customer': 3}
}
result = workflow.process_claim(claim_data)
assert result.metadata is not None
assert 'workflow_version' in result.metadata
assert 'processing_time_ms' in result.metadata
assert 'compliance_flags' in result.metadata
def test_result_serialization(self, workflow):
"""Test that workflow results can be serialized"""
claim_data = {
'claim_id': 'SERIAL-001',
'description': 'Test claim',
'claim_amount': 5000,
'claim_date': '2026-01-03T10:00:00Z',
'policy_start_date': '2023-01-01T00:00:00Z',
'claimant_history': {'previous_claims': 0, 'years_as_customer': 3}
}
result = workflow.process_claim(claim_data)
result_dict = result.to_dict()
json_str = json.dumps(result_dict)
assert json_str is not None
assert len(json_str) > 0
class TestPerformance:
"""Performance and benchmark tests"""
def test_classification_latency(self):
"""Test that classification meets latency SLA"""
classifier = TextClassificationCapability()
text = "Test claim description for performance testing"
result = classifier.classify(text=text)
# Should complete in under 500ms (generous for mock implementation)
assert result.metadata['processing_time_ms'] < 500
def test_fraud_detection_latency(self):
"""Test that fraud detection meets latency SLA"""
fraud_detector = FraudDetectionCapability()
claim_data = {
'claim_id': 'PERF-001',
'claim_amount': 5000,
'claim_type': 'auto_accident'
}
result = fraud_detector.detect(claim_data=claim_data)
# Should complete in under 300ms
assert result.metadata['processing_time_ms'] < 300
def test_workflow_latency(self):
"""Test that complete workflow meets latency SLA"""
workflow = ClaimsProcessingWorkflow()
claim_data = {
'claim_id': 'PERF-002',
'description': 'Performance test claim',
'claim_amount': 5000,
'claim_date': '2026-01-03T10:00:00Z',
'policy_start_date': '2023-01-01T00:00:00Z',
'claimant_history': {'previous_claims': 0, 'years_as_customer': 3}
}
result = workflow.process_claim(claim_data)
# Complete workflow should finish in under 1000ms
assert result.processing_time_ms < 1000
if __name__ == "__main__":
# Run tests with pytest
pytest.main([__file__, "-v", "--tb=short"])