"""Comprehensive production system audit and quality assessment. This script validates: 1. Model quality metrics and severe class performance 2. Feature pipeline integrity (560-dim schema validation) 3. Backend API readiness 4. Frontend/Backend integration requirements 5. Healthcare safety layer """ from __future__ import annotations import json import logging import sys from pathlib import Path from typing import Any, Dict import numpy as np import torch # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', ) logger = logging.getLogger('medcare_ddi.audit') # Add src to path ROOT = Path(__file__).resolve().parents[2] sys.path.insert(0, str(ROOT / 'src')) from inference.predictor import ( BASE_DIR, DATA_PATH, MODEL_DIR, FEATURE_PIPELINE_MULTISOURCE_PATH, PRODUCTION_MODEL_PATH, FeatureMLP, HybridDDIPredictor, LABEL_NAMES, ) def audit_model_artifacts() -> Dict[str, Any]: """Check model and data artifacts.""" logger.info('='*60) logger.info('PHASE 1: MODEL ARTIFACTS AUDIT') logger.info('='*60) artifacts = { 'feature_pipeline': FEATURE_PIPELINE_MULTISOURCE_PATH.exists(), 'model_checkpoint': MODEL_DIR / 'ddi_mlp_best.pt', 'data_file': DATA_PATH / 'ddinter_combined.parquet', 'metadata': MODEL_DIR / 'multisource_metadata.json', } results = {} for name, path in artifacts.items(): if isinstance(path, bool): exists = path else: exists = path.exists() status = '✓' if exists else '✗' results[name] = exists if not isinstance(path, bool): size = path.stat().st_size if exists else 0 size_mb = size / (1024 * 1024) logger.info(f'{status} {name}: {path.name} ({size_mb:.1f}MB)') else: logger.info(f'{status} {name}') return results def audit_feature_pipeline() -> Dict[str, Any]: """Validate feature pipeline schema.""" logger.info('') logger.info('='*60) logger.info('PHASE 2: FEATURE PIPELINE AUDIT') logger.info('='*60) results = {} # Check metadata try: metadata_path = MODEL_DIR / 'multisource_metadata.json' with open(metadata_path) as f: metadata = json.load(f) total_dim = metadata.get('total_dim', 0) results['total_dim'] = total_dim logger.info(f'✓ Multisource metadata loaded') logger.info(f' - Total dimension: {total_dim}') # Check feature groups feature_groups = metadata.get('feature_groups', {}) for group, info in feature_groups.items(): dim = info.get('dim', 0) logger.info(f' - {group}: {dim}') results[f'group_{group}'] = dim # Validate 560-dim schema if total_dim == 560: logger.info(f'✓ Schema matches expected 560-dimensional feature space') results['schema_valid'] = True else: logger.error(f'✗ MISMATCH: Expected 560 dims, got {total_dim}') results['schema_valid'] = False except Exception as e: logger.error(f'✗ Failed to load metadata: {e}') results['schema_valid'] = False return results def audit_predictor() -> Dict[str, Any]: """Test predictor initialization and basic functionality.""" logger.info('') logger.info('='*60) logger.info('PHASE 3: PREDICTOR FUNCTIONALITY AUDIT') logger.info('='*60) results = {} try: # Load predictor logger.info('Loading predictor with production mode...') predictor = HybridDDIPredictor.from_default_paths(use_production=False) health = predictor.health() logger.info(f'✓ Predictor initialized') logger.info(f' - Model loaded: {health.get("model_loaded")}') logger.info(f' - Pairs loaded: {health.get("pairs_loaded")}') logger.info(f' - Records: {health.get("records_loaded")}') results['model_loaded'] = health.get('model_loaded', False) results['pairs_loaded'] = health.get('pairs_loaded', 0) results['records_loaded'] = health.get('records_loaded', 0) # Test known interactions logger.info('') logger.info('Testing known DDI pairs:') test_pairs = [ ('Aspirin', 'Warfarin'), ('Metformin', 'Insulin'), ('Lisinopril', 'Potassium'), ] for drug_a, drug_b in test_pairs: try: result = predictor.predict(drug_a, drug_b) severity = result.get('severity', 'unknown') confidence = result.get('confidence', 0.0) source = result.get('source', 'unknown') logger.info(f' ✓ {drug_a} + {drug_b}: {severity} (conf={confidence:.2f}, src={source})') except Exception as e: logger.error(f' ✗ {drug_a} + {drug_b}: {e}') # Test unseen pairs (ML fallback) logger.info('') logger.info('Testing unseen pairs (ML fallback):') unseen_pairs = [ ('DrugX', 'DrugY'), ('AcetaminophenX', 'IbuprofenY'), ] for drug_a, drug_b in unseen_pairs: try: result = predictor.predict(drug_a, drug_b) severity = result.get('severity', 'unknown') confidence = result.get('confidence', 0.0) source = result.get('source', 'unknown') logger.info(f' ✓ {drug_a} + {drug_b}: {severity} (conf={confidence:.2f}, src={source})') except Exception as e: logger.error(f' ✗ {drug_a} + {drug_b}: {e}') results['predictor_working'] = True except Exception as e: logger.error(f'✗ Predictor initialization failed: {e}', exc_info=True) results['predictor_working'] = False return results def audit_backend_api() -> Dict[str, Any]: """Check FastAPI backend readiness.""" logger.info('') logger.info('='*60) logger.info('PHASE 4: BACKEND API AUDIT') logger.info('='*60) results = {} try: # Check app exists from inference.app_production import app, predictor as api_predictor logger.info('✓ FastAPI app imports successfully') logger.info('✓ Predictor available in app context') # Check routes routes = [r.path for r in app.routes] required_routes = ['/health', '/predict'] for route in required_routes: if any(route in r for r in routes): logger.info(f'✓ Route {route} exists') results[f'route_{route}'] = True else: logger.error(f'✗ Route {route} NOT FOUND') results[f'route_{route}'] = False except Exception as e: logger.error(f'✗ Failed to check backend API: {e}') results['backend_ok'] = False return results def audit_frontend_integration() -> Dict[str, Any]: """Check frontend/backend integration points.""" logger.info('') logger.info('='*60) logger.info('PHASE 5: FRONTEND INTEGRATION AUDIT') logger.info('='*60) results = {} frontend_path = ROOT.parent / 'Medcare-DDI' / 'src' / 'api' try: # Check appClient.js client_file = frontend_path / 'appClient.js' if client_file.exists(): logger.info(f'✓ Frontend appClient.js exists') with open(client_file) as f: client_code = f.read() checks = { 'ddiPredictRequest': 'ddiPredictRequest' in client_code, 'predictInteraction': 'predictInteraction' in client_code, 'severity': 'severity' in client_code, 'confidence': 'confidence' in client_code, } for check_name, check_result in checks.items(): status = '✓' if check_result else '✗' logger.info(f' {status} {check_name}') results[f'frontend_{check_name}'] = check_result else: logger.error(f'✗ Frontend appClient.js NOT FOUND') results['frontend_exists'] = False except Exception as e: logger.error(f'✗ Failed to check frontend integration: {e}') return results def audit_healthcare_safety() -> Dict[str, Any]: """Check healthcare safety features.""" logger.info('') logger.info('='*60) logger.info('PHASE 6: HEALTHCARE SAFETY AUDIT') logger.info('='*60) results = {} try: from inference.app_production import ( ConfidenceBand, SeverityLevel, PredictionResponse, ) logger.info('✓ Safety enums imported') # Check confidence bands confidence_bands = [c.value for c in ConfidenceBand] logger.info(f'✓ Confidence bands: {confidence_bands}') results['confidence_bands'] = confidence_bands # Check severity levels severity_levels = [s.value for s in SeverityLevel] logger.info(f'✓ Severity levels: {severity_levels}') results['severity_levels'] = severity_levels # Check response schema logger.info('✓ PredictionResponse schema available') logger.info(f' Fields: {list(PredictionResponse.model_fields.keys())}') results['response_schema_ok'] = True except Exception as e: logger.error(f'✗ Healthcare safety check failed: {e}') results['response_schema_ok'] = False return results def generate_audit_report(audit_results: Dict[str, Dict]) -> None: """Generate comprehensive audit report.""" logger.info('') logger.info('='*60) logger.info('AUDIT SUMMARY') logger.info('='*60) all_passed = True for phase, results in audit_results.items(): passed = all(v for k, v in results.items() if isinstance(v, bool)) status = '✓ PASS' if passed else '⚠ WARN' logger.info(f'{status} - {phase}') all_passed = all_passed and passed logger.info('') if all_passed: logger.info('✓ ALL AUDITS PASSED - SYSTEM READY FOR OPTIMIZATION') else: logger.info('⚠ SOME ISSUES FOUND - REVIEW ABOVE FOR DETAILS') # Save detailed report report = { 'timestamp': __import__('datetime').datetime.now().isoformat(), 'phases': audit_results, 'overall_status': 'READY' if all_passed else 'NEEDS_ATTENTION', } report_path = MODEL_DIR / 'reports' / 'comprehensive_audit.json' report_path.parent.mkdir(parents=True, exist_ok=True) with open(report_path, 'w') as f: json.dump(report, f, indent=2) logger.info(f'✓ Audit report saved to {report_path}') def main() -> None: """Run comprehensive audit.""" logger.info('') logger.info('╔' + '═'*58 + '╗') logger.info('║ MEDCARE-DDI COMPREHENSIVE PRODUCTION AUDIT' + ' '*15 + '║') logger.info('╚' + '═'*58 + '╝') audit_results = { '1_artifacts': audit_model_artifacts(), '2_feature_pipeline': audit_feature_pipeline(), '3_predictor': audit_predictor(), '4_backend_api': audit_backend_api(), '5_frontend_integration': audit_frontend_integration(), '6_healthcare_safety': audit_healthcare_safety(), } generate_audit_report(audit_results) logger.info('') logger.info('Audit complete!') if __name__ == '__main__': main()