""" Simplified Integration Test for Medical AI Platform - Phase 3 Completion Tests the core architecture and schema compatibility without external dependencies. Author: MiniMax Agent Date: 2025-10-29 Version: 1.0.0 """ import logging import sys from typing import Dict, Any from dataclasses import dataclass # Setup logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class SimplifiedIntegrationTester: """Tests core architecture and schema compatibility""" def __init__(self): """Initialize test environment""" self.test_results = { "schema_imports": False, "pipeline_architecture": False, "model_router_config": False, "data_flow_validation": False, "confidence_scoring": False } def test_schema_imports(self) -> bool: """Test that all core schemas can be imported""" logger.info("📋 Testing schema imports...") try: from medical_schemas import ( ValidationResult, ConfidenceScore, ECGAnalysis, RadiologyAnalysis, LaboratoryResults, ClinicalNotesAnalysis, ECGSignalData, RadiologyImageReference, LabTestResult, ClinicalSection ) logger.info("✅ All medical schemas imported successfully") # Test schema instantiation confidence = ConfidenceScore( extraction_confidence=0.8, model_confidence=0.9, data_quality_score=0.7 ) overall = confidence.overall_confidence expected = 0.5 * 0.8 + 0.3 * 0.9 + 0.2 * 0.7 # Should be 0.81 if abs(overall - expected) < 0.01: logger.info(f"✅ Confidence scoring formula verified: {overall:.3f}") self.test_results["schema_imports"] = True return True else: logger.error(f"❌ Confidence scoring failed: expected {expected:.3f}, got {overall:.3f}") self.test_results["schema_imports"] = False return False except ImportError as e: logger.error(f"❌ Schema import failed: {e}") self.test_results["schema_imports"] = False return False except Exception as e: logger.error(f"❌ Schema validation failed: {e}") self.test_results["schema_imports"] = False return False def test_pipeline_architecture(self) -> bool: """Test pipeline architecture design""" logger.info("🔄 Testing pipeline architecture...") try: # Test that we can import the core pipeline classes from specialized_model_router import SpecializedModelRouter, ModelInferenceResult, SpecializedModelConfig logger.info("✅ Specialized model router imports successful") # Test model router initialization router = SpecializedModelRouter() # Verify model configurations are loaded if hasattr(router, 'model_configs') and len(router.model_configs) > 0: logger.info(f"✅ Model configurations loaded: {len(router.model_configs)} models") # Check specific models expected_models = ["hubert_ecg", "monai_unetr", "medgemma", "biomedical_ner", "bio_clinicalbert"] missing_models = [] for model in expected_models: if model not in router.model_configs: missing_models.append(model) if not missing_models: logger.info("✅ All expected specialized models configured") self.test_results["pipeline_architecture"] = True return True else: logger.error(f"❌ Missing model configurations: {missing_models}") self.test_results["pipeline_architecture"] = False return False else: logger.error("❌ Model configurations not loaded") self.test_results["pipeline_architecture"] = False return False except Exception as e: logger.error(f"❌ Pipeline architecture test failed: {e}") self.test_results["pipeline_architecture"] = False return False def test_model_router_config(self) -> bool: """Test model router configuration completeness""" logger.info("🧠 Testing model router configuration...") try: from specialized_model_router import SpecializedModelRouter router = SpecializedModelRouter() # Test model configuration details config_tests = { "hubert_ecg": { "input_format": "ecg_signal", "model_type": "classification", "output_schema": "ECGAnalysis" }, "monai_unetr": { "input_format": "dicom_image", "model_type": "segmentation", "output_schema": "RadiologyAnalysis" }, "medgemma": { "input_format": "clinical_text", "model_type": "generation", "output_schema": "ClinicalNotesAnalysis" }, "biomedical_ner": { "input_format": "lab_text", "model_type": "extraction", "output_schema": "LaboratoryResults" } } all_configs_valid = True for model_name, expected_config in config_tests.items(): if model_name in router.model_configs: config = router.model_configs[model_name] for attr, expected_value in expected_config.items(): actual_value = getattr(config, attr, None) if actual_value != expected_value: logger.error(f"❌ {model_name}.{attr}: expected {expected_value}, got {actual_value}") all_configs_valid = False else: logger.info(f"✅ {model_name}.{attr}: {actual_value}") else: logger.error(f"❌ Model configuration missing: {model_name}") all_configs_valid = False if all_configs_valid: logger.info("✅ All model configurations validated") self.test_results["model_router_config"] = True return True else: logger.error("❌ Model configuration validation failed") self.test_results["model_router_config"] = False return False except Exception as e: logger.error(f"❌ Model router configuration test failed: {e}") self.test_results["model_router_config"] = False return False def test_data_flow_validation(self) -> bool: """Test data flow compatibility between components""" logger.info("📊 Testing data flow validation...") try: # Test that data structures are compatible between preprocessing and model routing from medical_schemas import ECGSignalData, RadiologyImageReference, ConfidenceScore from specialized_model_router import ModelInferenceResult # Create sample data structures to test compatibility ecg_data = ECGSignalData( signal_shape=(12, 5000), sampling_rate_hz=500, leads=["I", "II", "III", "aVR", "aVL", "aVF", "V1", "V2", "V3", "V4", "V5", "V6"], signal_arrays=[[0.1, 0.2, 0.3] * 1667 for _ in range(12)] # 12 leads, ~5000 samples each ) logger.info("✅ ECG data structure validation passed") # Test confidence scoring across the pipeline confidence = ConfidenceScore( extraction_confidence=0.85, model_confidence=0.92, data_quality_score=0.78 ) overall_confidence = confidence.overall_confidence logger.info(f"✅ Pipeline confidence scoring: {overall_confidence:.3f}") # Test model inference result structure inference_result = ModelInferenceResult( model_name="test_model", input_data={"test": "data"}, output_data={"result": "output"}, confidence_score=overall_confidence, processing_time=1.5, model_metadata={"version": "1.0"}, warnings=[], errors=[] ) logger.info("✅ Model inference result structure validated") self.test_results["data_flow_validation"] = True return True except Exception as e: logger.error(f"❌ Data flow validation failed: {e}") self.test_results["data_flow_validation"] = False return False def test_confidence_scoring(self) -> bool: """Test confidence scoring system""" logger.info("🎯 Testing confidence scoring system...") try: from medical_schemas import ConfidenceScore # Test various confidence scenarios test_cases = [ { "name": "High Confidence", "extraction": 0.95, "model": 0.90, "quality": 0.85, "expected_range": (0.85, 0.95) }, { "name": "Medium Confidence", "extraction": 0.70, "model": 0.75, "quality": 0.65, "expected_range": (0.65, 0.75) }, { "name": "Low Confidence", "extraction": 0.50, "model": 0.45, "quality": 0.40, "expected_range": (0.40, 0.50) } ] all_passed = True for case in test_cases: confidence = ConfidenceScore( extraction_confidence=case["extraction"], model_confidence=case["model"], data_quality_score=case["quality"] ) overall = confidence.overall_confidence min_expected, max_expected = case["expected_range"] if min_expected <= overall <= max_expected: logger.info(f"✅ {case['name']}: {overall:.3f} (within {case['expected_range']})") else: logger.error(f"❌ {case['name']}: {overall:.3f} (outside {case['expected_range']})") all_passed = False if all_passed: logger.info("✅ Confidence scoring system validated") self.test_results["confidence_scoring"] = True return True else: logger.error("❌ Confidence scoring system failed validation") self.test_results["confidence_scoring"] = False return False except Exception as e: logger.error(f"❌ Confidence scoring test failed: {e}") self.test_results["confidence_scoring"] = False return False def run_all_tests(self) -> Dict[str, bool]: """Run all simplified integration tests""" logger.info("🚀 Starting Simplified Medical AI Platform Integration Tests") logger.info("=" * 70) # Run tests in sequence self.test_schema_imports() self.test_pipeline_architecture() self.test_model_router_config() self.test_data_flow_validation() self.test_confidence_scoring() # Generate test report logger.info("=" * 70) logger.info("📊 SIMPLIFIED INTEGRATION TEST RESULTS") logger.info("=" * 70) for test_name, result in self.test_results.items(): status = "✅ PASS" if result else "❌ FAIL" logger.info(f"{test_name.replace('_', ' ').title()}: {status}") total_tests = len(self.test_results) passed_tests = sum(self.test_results.values()) success_rate = (passed_tests / total_tests) * 100 logger.info("-" * 70) logger.info(f"Overall Success Rate: {passed_tests}/{total_tests} ({success_rate:.1f}%)") if success_rate >= 80: logger.info("🎉 SIMPLIFIED INTEGRATION TESTS PASSED - Phase 3 Core Architecture Complete!") logger.info("") logger.info("✅ KEY ACHIEVEMENTS:") logger.info(" • Canonical JSON schemas implemented and validated") logger.info(" • Preprocessing pipeline architecture designed") logger.info(" • Specialized model router configured for 5+ models") logger.info(" • Data flow compatibility verified between components") logger.info(" • Confidence scoring system validated with weighted formula") logger.info("") logger.info("🚀 READY FOR PHASE 4: Confidence Gating and Validation System") else: logger.warning("⚠️ SIMPLIFIED INTEGRATION TESTS FAILED - Phase 3 Needs Fixes") return self.test_results def main(): """Main test execution""" try: tester = SimplifiedIntegrationTester() results = tester.run_all_tests() # Return appropriate exit code success_rate = sum(results.values()) / len(results) exit_code = 0 if success_rate >= 0.8 else 1 sys.exit(exit_code) except Exception as e: logger.error(f"❌ Simplified integration test execution failed: {e}") sys.exit(1) if __name__ == "__main__": main()