from __future__ import annotations import logging import math import re from collections import Counter, defaultdict from dataclasses import dataclass from pathlib import Path from typing import Any import joblib import numpy as np import pandas as pd import torch from torch import nn from training.feature_pipeline import transform_pair_features as transform_pair_features_legacy from training.feature_pipeline_multisource import transform_pair_features as transform_pair_features_multisource from training.ensemble import EnsemblePredictor from training.calibration import SafetyCalibrationConfig, risk_adjusted_prediction from chemistry.smiles_recovery import resolve_drug_name_to_smiles try: from chemistry.drug_synonyms import load_drugbank_name_to_smiles_index, lookup_drugbank_smiles, normalize_drug_name except Exception: # pragma: no cover - optional DrugBank index load_drugbank_name_to_smiles_index = None # type: ignore lookup_drugbank_smiles = None # type: ignore normalize_drug_name = None # type: ignore logger = logging.getLogger('medcare_ddi.predictor') BASE_DIR = Path(__file__).resolve().parents[2] DATA_PATH = BASE_DIR / 'data' / 'processed' / 'ddinter_combined.parquet' MODEL_DIR = BASE_DIR / 'models' MODEL_PATH = MODEL_DIR / 'ddi_mlp_best.pt' PRODUCTION_MODEL_PATH = MODEL_DIR / 'ddi_mlp_production.pt' # Healthcare-grade production model FEATURE_PIPELINE_PATH = MODEL_DIR / 'feature_pipeline.pkl' FEATURE_PIPELINE_MULTISOURCE_PATH = MODEL_DIR / 'feature_pipeline_multisource.pkl' CALIBRATION_PATH = MODEL_DIR / 'calibration_artifacts.pkl' PRODUCTION_CALIBRATION_PATH = MODEL_DIR / 'calibration_artifacts_production.pkl' # Production calibration ENSEMBLE_DIR = MODEL_DIR / 'ensemble' DRUGBANK_TO_DDINTER = { 'DB01048': 'Abacavir', 'DB01097': 'Leflunomide', 'DB00331': 'Metformin', 'DB00682': 'Warfarin', 'DB00945': 'Acetylsalicylic acid', 'DB01050': 'Ibuprofen', 'DB00338': 'Omeprazole', 'DB01076': 'Atorvastatin', 'DB00722': 'Lisinopril', 'DB00381': 'Amlodipine', 'DB00758': 'Clopidogrel', 'DB00641': 'Simvastatin', 'DB00537': 'Ciprofloxacin', 'DB00196': 'Fluconazole', 'DB01045': 'Rifampicin', } LABEL_NAMES = ['unknown', 'minor', 'moderate', 'major'] LABEL_TO_INDEX = {label: index for index, label in enumerate(LABEL_NAMES)} INDEX_TO_LABEL = {index: label for label, index in LABEL_TO_INDEX.items()} SEVERITY_ADVICE = { 'major': 'Avoid the combination when possible. If there is no alternative, use specialist oversight and close monitoring.', 'moderate': 'Use with caution. Consider monitoring, dose adjustment, and review of safer alternatives.', 'minor': 'The combination is generally acceptable with routine clinical monitoring.', 'unknown': 'The local DDInter table does not provide a clear severity signal for this pair.', } def normalize_name(value: str) -> str: return ' '.join(value.strip().lower().split()) def resolve_input_name(value: str) -> str: if not value: raise ValueError('Drug input is required') cleaned_value = value.strip() if re.fullmatch(r'DB\d{5}', cleaned_value, flags=re.IGNORECASE): return DRUGBANK_TO_DDINTER.get(cleaned_value.upper(), cleaned_value.upper()) return cleaned_value def _load_frontend_alias_map() -> dict[str, str]: """Load a lightweight alias map from the frontend `src/lib/drugAliases.js`. This provides a fallback mapping for common marketing names and ATC codes defined in the React UI when the DrugBank index is unavailable or missing those aliases. """ alias_map: dict[str, str] = {} try: js_path = BASE_DIR / 'src' / 'lib' / 'drugAliases.js' if not js_path.exists(): return alias_map text = js_path.read_text(encoding='utf-8') # Find objects inside DRUG_DATABASE array entries = re.findall(r"\{([^}]+)\}", text[text.find('DRUG_DATABASE'):]) for entry in entries: try: name_m = re.search(r"name\s*:\s*[\"']([^\"']+)[\"']", entry) atc_m = re.search(r"atc\s*:\s*[\"']([^\"']+)[\"']", entry) markets_m = re.search(r"marketingNames\s*:\s*\[([^\]]+)\]", entry, flags=re.S) if not name_m: continue canonical = name_m.group(1).strip() if atc_m: key = normalize_name(atc_m.group(1)) alias_map[key] = canonical if markets_m: markets = re.findall(r"[\"']([^\"']+)[\"']", markets_m.group(1)) for mkt in markets: alias_map[normalize_name(mkt)] = canonical # also map ingredient name alias_map[normalize_name(canonical)] = canonical except Exception: continue except Exception: return {} return alias_map def canonical_pair_key(drug_a: str, drug_b: str) -> tuple[str, str]: return tuple(sorted((normalize_name(drug_a), normalize_name(drug_b)))) def severity_rank(value: str) -> int: return {'unknown': 0, 'minor': 1, 'moderate': 2, 'major': 3}.get(value.lower(), 0) def lookup_confidence(severity: str, support_count: int) -> float: base = { 'major': 0.94, 'moderate': 0.84, 'minor': 0.72, 'unknown': 0.58, }.get(severity, 0.60) support_bonus = min(0.06, round(math.log2(max(support_count, 1)) * 0.01, 3)) return round(min(0.99, base + support_bonus), 3) def confidence_band(confidence: float) -> str: if confidence < 0.55: return 'low' if confidence < 0.75: return 'medium' return 'high' def load_ddinter_lookup() -> tuple[dict[tuple[str, str], list[dict[str, str]]], dict[str, Counter]]: from preprocessing.artifact_manager import manager try: df = manager.load_artifact('ddinter_combined') except Exception as e: raise FileNotFoundError(f'Failed to load ddinter_combined artifact: {e}') pair_index: dict[tuple[str, str], list[dict[str, str]]] = defaultdict(list) drug_profiles: dict[str, Counter] = defaultdict(Counter) for _, row in df.iterrows(): # Ensure row represents dictionary properly (handle pandas types) row_dict = {k: str(v) for k, v in row.to_dict().items() if v is not None and not pd.isna(v)} try: a_can = str(row_dict.get('canonical_drug_a') or row_dict.get('Drug_A', '')).strip().upper() b_can = str(row_dict.get('canonical_drug_b') or row_dict.get('Drug_B', '')).strip().upper() except: a_can = str(row_dict.get('Drug_A', '')).strip().upper() b_can = str(row_dict.get('Drug_B', '')).strip().upper() if not a_can or not b_can: continue pair_key = tuple(sorted([a_can, b_can])) pair_index[pair_key].append(row_dict) try: level = str(row_dict.get('Level') or row_dict.get('level', 'Unknown')).strip() except: level = 'Unknown' if level in ('Major', 'Moderate', 'Minor'): drug_profiles[a_can][level] += 1 drug_profiles[b_can][level] += 1 return pair_index, drug_profiles class DDIEmbeddingMLP(nn.Module): def __init__( self, vocab_size: int, embedding_dim: int, hidden_dim: int, num_classes: int, dropout: float = 0.2, ) -> None: super().__init__() self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0) self.network = nn.Sequential( nn.Linear(embedding_dim * 4, hidden_dim), nn.ReLU(), nn.Dropout(dropout), nn.Linear(hidden_dim, hidden_dim // 2), nn.ReLU(), nn.Dropout(dropout), nn.Linear(hidden_dim // 2, num_classes), ) def forward(self, drug_a_ids: torch.Tensor, drug_b_ids: torch.Tensor) -> torch.Tensor: embedding_a = self.embedding(drug_a_ids) embedding_b = self.embedding(drug_b_ids) features = torch.cat( [embedding_a, embedding_b, torch.abs(embedding_a - embedding_b), embedding_a * embedding_b], dim=-1, ) return self.network(features) class FeatureMLP(nn.Module): def __init__(self, input_dim: int, hidden_dim: int, num_classes: int, dropout: float = 0.2) -> None: super().__init__() self.network = nn.Sequential( nn.Linear(input_dim, hidden_dim), nn.ReLU(), nn.Dropout(dropout), nn.Linear(hidden_dim, max(8, hidden_dim // 2)), nn.ReLU(), nn.Dropout(dropout), nn.Linear(max(8, hidden_dim // 2), num_classes), ) def forward(self, features: torch.Tensor) -> torch.Tensor: return self.network(features) @dataclass(slots=True) class InferenceResult: source: str confidence: float severity: str explanation: str warning: str | None model_version: str mechanism: str affected_systems: str smiles_a: str smiles_b: str clinical_advice: str additional_notes: str drug_a_name: str drug_b_name: str drug_a_id: str drug_b_id: str source_dataset: str exact_match: bool evidence_count: int | None = None confidence_band: str | None = None probabilities: dict[str, float] | None = None def to_dict(self) -> dict[str, Any]: return { 'source': self.source, 'confidence': self.confidence, 'severity': self.severity, 'explanation': self.explanation, 'warning': self.warning, 'model_version': self.model_version, 'mechanism': self.mechanism, 'affected_systems': self.affected_systems, 'smiles_a': self.smiles_a, 'smiles_b': self.smiles_b, 'clinical_advice': self.clinical_advice, 'additional_notes': self.additional_notes, 'drug_a_name': self.drug_a_name, 'drug_b_name': self.drug_b_name, 'drug_a_id': self.drug_a_id, 'drug_b_id': self.drug_b_id, 'source_dataset': self.source_dataset, 'exact_match': self.exact_match, 'evidence_count': self.evidence_count, 'confidence_band': self.confidence_band, 'probabilities': self.probabilities, } class HybridDDIPredictor: def __init__(self, checkpoint: dict[str, Any], calibration: dict[str, Any] | None = None) -> None: self.checkpoint = checkpoint self.model_version = str(checkpoint.get('model_version', 'ddi-mlp-v1')) self.embedding_dim = int(checkpoint.get('embedding_dim', 64)) self.hidden_dim = int(checkpoint.get('hidden_dim', 128)) self.model_type = str(checkpoint.get('model_type', 'ddi_embedding_mlp')) self.input_dim = checkpoint.get('input_dim') self.temperature = float(checkpoint.get('temperature', 1.0)) self.vocab = checkpoint.get('drug_vocab', {}) self.label_names = list(checkpoint.get('label_names', LABEL_NAMES)) self.label_to_index = dict(checkpoint.get('label_to_index', LABEL_TO_INDEX)) self.index_to_label = {int(index): label for index, label in checkpoint.get('index_to_label', {}).items()} if not self.index_to_label: self.index_to_label = {index: label for label, index in self.label_to_index.items()} # Load calibration artifacts (temperature scaling, threshold tuning) self.calibration = calibration or {} if self.calibration: self.temperature = float(self.calibration.get('temperature', self.temperature)) self.major_threshold = float(self.calibration.get('major_threshold', 0.5)) logger.info(f'Loaded calibration: temperature={self.temperature:.4f}, major_threshold={self.major_threshold:.4f}') else: self.major_threshold = 0.5 self.safety_calibration_cfg = SafetyCalibrationConfig( severe_class_index=self.label_to_index.get('major', 3), low_confidence_threshold=float(self.calibration.get('low_confidence_threshold', 0.55)), medium_confidence_threshold=float(self.calibration.get('medium_confidence_threshold', 0.75)), severe_alert_threshold=float(self.calibration.get('severe_alert_threshold', 0.45)), entropy_high_threshold=float(self.calibration.get('entropy_high_threshold', 1.0)), top2_margin_low_threshold=float(self.calibration.get('top2_margin_low_threshold', 0.15)), ) self.pair_index, self.drug_profiles = load_ddinter_lookup() self.ensemble: EnsemblePredictor | None = None self.feature_pipeline = None if self.model_type in {'feature_mlp', 'feature_mlp_multisource'}: feature_pipeline_path = FEATURE_PIPELINE_MULTISOURCE_PATH if self.model_type == 'feature_mlp_multisource' else FEATURE_PIPELINE_PATH if not feature_pipeline_path.exists(): raise FileNotFoundError(f'Feature pipeline artifact not found at {feature_pipeline_path}') self.feature_pipeline = joblib.load(feature_pipeline_path) artifact_dim = int(self.feature_pipeline.get('metadata', {}).get('vector_dim', self.feature_pipeline.get('metadata', {}).get('feature_dim', 0))) if self.input_dim is not None and int(self.input_dim) != artifact_dim: raise ValueError( f'Checkpoint input_dim={self.input_dim} does not match feature pipeline dim={artifact_dim}. ' 'Refusing to infer with a mismatched schema.' ) if self.input_dim is None: self.input_dim = artifact_dim self.model = FeatureMLP( input_dim=int(self.input_dim or self.feature_pipeline['metadata']['vector_dim']), hidden_dim=self.hidden_dim, num_classes=len(self.label_names), dropout=float(checkpoint.get('dropout', 0.2)), ) else: self.model = DDIEmbeddingMLP( vocab_size=len(self.vocab) + 1, embedding_dim=self.embedding_dim, hidden_dim=self.hidden_dim, num_classes=len(self.label_names), ) self.model.load_state_dict(checkpoint['model_state_dict']) self.model.eval() # Optional ensemble backend for feature-based models. if self.model_type in {'feature_mlp', 'feature_mlp_multisource'} and ENSEMBLE_DIR.exists(): try: self.ensemble = EnsemblePredictor(ENSEMBLE_DIR) logger.info('Loaded optional ensemble artifacts from %s', ENSEMBLE_DIR) except Exception as exc: logger.warning('Failed to load ensemble artifacts: %s', exc) @classmethod def from_default_paths(cls, use_production: bool = True) -> 'HybridDDIPredictor': """Load predictor from default model paths. Args: use_production: If True, try loading production model first; fall back to standard model Returns: HybridDDIPredictor instance """ # Prefer production model if it exists model_path = PRODUCTION_MODEL_PATH if (use_production and PRODUCTION_MODEL_PATH.exists()) else MODEL_PATH if not model_path.exists(): raise FileNotFoundError( f'Model checkpoint not found at {model_path}. ' f'Run src/training/train_healthcare_production.py or src/inference/train_model.py to create it.' ) logger.info(f'Loading checkpoint from {model_path}') checkpoint = torch.load(model_path, map_location='cpu') # Try to load calibration artifacts (optional) calibration = None calibration_path = PRODUCTION_CALIBRATION_PATH if (use_production and PRODUCTION_CALIBRATION_PATH.exists()) else CALIBRATION_PATH if calibration_path.exists(): logger.info(f'Loading calibration artifacts from {calibration_path}') try: calibration = joblib.load(calibration_path) except Exception as e: logger.warning(f'Failed to load calibration artifacts: {e}') return cls(checkpoint, calibration=calibration) def health(self) -> dict[str, Any]: return { 'status': 'healthy', 'mode': 'hybrid', 'model_loaded': True, 'pipeline_loaded': True, 'calibration_loaded': bool(self.calibration), 'ensemble_loaded': self.ensemble is not None, 'model_version': self.model_version, 'model_type': self.model_type, 'pairs_loaded': len(self.pair_index), 'records_loaded': sum(len(records) for records in self.pair_index.values()), 'vocab_size': len(self.vocab), 'label_names': self.label_names, 'temperature': self.temperature, 'major_threshold': self.major_threshold, 'feature_schema': { 'input_dim': self.input_dim, 'temperature': self.temperature, 'feature_pipeline_path': str(FEATURE_PIPELINE_MULTISOURCE_PATH if self.model_type == 'feature_mlp_multisource' else FEATURE_PIPELINE_PATH) if self.model_type in {'feature_mlp', 'feature_mlp_multisource'} else None, 'available': self.feature_pipeline is not None, 'group_slices': self.feature_pipeline.get('group_slices', {}) if self.feature_pipeline else {}, }, } def _resolve_drug_name(self, drug_value: str) -> str: """Resolve input which may be a DrugBank ID, active ingredient, marketing name, or ATC code. Strategy: 1. Basic cleaning and DB ID mapping via `resolve_input_name`. 2. If available, consult DrugBank alias index (`load_drugbank_name_to_smiles_index`) for direct alias -> canonical name, ATC code matches, and canonical lookup. 3. Use `lookup_drugbank_smiles` as a fuzzy fallback when available. 4. Fall back to the frontend alias map defined in `src/lib/drugAliases.js`. 5. Return the cleaned value if no mapping found. """ # 1) Basic resolution (handles DBxxxxx -> mapped name) try: resolved = resolve_input_name(drug_value) except Exception: resolved = str(drug_value or '').strip() # 2) Consult DrugBank index if available try: index = load_drugbank_name_to_smiles_index() if load_drugbank_name_to_smiles_index else None except Exception: index = None if index: # Precomputed alias map: alias -> canonical name name_to_record = index.get('name_to_record_key', {}) or {} canonical_map = index.get('canonical_name_to_record', {}) or {} # Normalize input for direct alias lookup norm_key = None try: norm_key = normalize_drug_name(drug_value) if normalize_drug_name else normalize_name(str(drug_value or '')) except Exception: norm_key = normalize_name(str(drug_value or '')) if norm_key in name_to_record: return name_to_record.get(norm_key) or resolved # ATC code pattern (e.g., C09AA05) up = str(drug_value or '').strip().upper() if re.fullmatch(r'[A-Z]\d{2}[A-Z]{2}\d{2}', up): for record in canonical_map.values(): atcs = record.get('atc_codes') or [] for code in atcs: if code and up == str(code).upper(): return record.get('canonical_name') or resolved # Try expanded variants (brands, synonyms) try: variants = drug_name_variants(drug_value) if 'drug_name_variants' in globals() or 'drug_name_variants' in locals() else [] except Exception: variants = [] for var in variants: if var in name_to_record: return name_to_record.get(var) or resolved # 3) Fuzzy lookup via `lookup_drugbank_smiles` if available if lookup_drugbank_smiles: try: lookup = lookup_drugbank_smiles(drug_value) if lookup and lookup.get('matched') and lookup.get('matched_name'): return lookup.get('matched_name') except Exception: pass # 4) Frontend alias map fallback try: frontend_aliases = _load_frontend_alias_map() key = normalize_name(str(drug_value or '')) if key in frontend_aliases: return frontend_aliases[key] except Exception: pass # 5) Final fallback return resolved def _find_vocab_id(self, drug_name: str) -> int: if self.model_type == 'feature_mlp': return 0 normalized_name = normalize_name(drug_name) return int(self.vocab.get(normalized_name, 0)) def _resolve_smiles(self, drug_name: str) -> str: try: resolved = resolve_drug_name_to_smiles(drug_name) except Exception as exc: logger.warning('smiles_resolution_failed drug=%s error=%s', drug_name, exc) resolved = None return resolved or 'N/A' def _normalize_clinical_severity(self, probabilities: dict[str, float], fallback_severity: str) -> tuple[str, float]: clinical_labels = ('minor', 'moderate', 'major') clinical_probs = {label: float(probabilities.get(label, 0.0) or 0.0) for label in clinical_labels} total = sum(clinical_probs.values()) if total > 0: normalized_probs = {label: value / total for label, value in clinical_probs.items()} chosen_label = max(normalized_probs, key=normalized_probs.get) return chosen_label, round(float(normalized_probs[chosen_label]), 3) fallback = fallback_severity if fallback_severity in clinical_labels else 'minor' return fallback, round(float(clinical_probs.get(fallback, 0.0)), 3) def _lookup_exact(self, drug_a_name: str, drug_b_name: str, drug_a_id: str, drug_b_id: str) -> InferenceResult | None: key = canonical_pair_key(drug_a_name, drug_b_name) records = self.pair_index.get(key, []) if not records: return None top_record = max(records, key=lambda record: severity_rank(record['severity'])) top_severity = top_record['severity'] confidence = lookup_confidence(top_severity, len(records)) warning = None if confidence < 0.75: warning = 'Exact DDInter evidence exists, but the support is limited and confidence is below the high-confidence threshold.' explanation = ( f'Exact DDInter evidence match found for {drug_a_name} and {drug_b_name}. ' f'{len(records)} supporting record(s) were retrieved and the highest observed severity is {top_severity}.' ) return InferenceResult( source='ddinter_lookup', confidence=confidence, severity=top_severity, explanation=explanation, warning=warning, model_version='ddinter-evidence-v1', mechanism='Evidence-backed lookup from the processed DDInter table', affected_systems='Not available in the local DDInter table', smiles_a=self._resolve_smiles(drug_a_name), smiles_b=self._resolve_smiles(drug_b_name), clinical_advice=SEVERITY_ADVICE[top_severity], additional_notes=explanation if warning is None else f'{explanation} {warning}', drug_a_name=drug_a_name, drug_b_name=drug_b_name, drug_a_id=drug_a_id, drug_b_id=drug_b_id, source_dataset='ddinter_combined.parquet', exact_match=True, evidence_count=len(records), confidence_band=confidence_band(confidence), probabilities=None, ) def _predict_with_model(self, drug_a_name: str, drug_b_name: str, drug_a_id: str, drug_b_id: str) -> InferenceResult: if self.model_type == 'feature_mlp': if self.feature_pipeline is None: raise RuntimeError('Feature pipeline artifacts are not loaded') features = transform_pair_features_legacy(drug_a_name, drug_b_name, self.feature_pipeline) input_features = torch.tensor([features], dtype=torch.float32) with torch.no_grad(): logits = self.model(input_features) / max(self.temperature, 1e-6) probabilities = torch.softmax(logits, dim=-1).squeeze(0) elif self.model_type == 'feature_mlp_multisource': if self.feature_pipeline is None: raise RuntimeError('Feature pipeline artifacts are not loaded') features = transform_pair_features_multisource(drug_a_name, drug_b_name, self.feature_pipeline) input_features = torch.tensor([features], dtype=torch.float32) with torch.no_grad(): logits = self.model(input_features) / max(self.temperature, 1e-6) probabilities = torch.softmax(logits, dim=-1).squeeze(0) # Ensemble override when available. if self.ensemble is not None: try: probs = self.ensemble.predict_proba(np.array([features], dtype=np.float32))[0] probabilities = torch.tensor(probs, dtype=torch.float32) except Exception as exc: logger.warning('Ensemble inference failed; falling back to base model: %s', exc) else: drug_a_vocab_id = self._find_vocab_id(drug_a_name) drug_b_vocab_id = self._find_vocab_id(drug_b_name) input_a = torch.tensor([drug_a_vocab_id], dtype=torch.long) input_b = torch.tensor([drug_b_vocab_id], dtype=torch.long) with torch.no_grad(): logits = self.model(input_a, input_b) probabilities = torch.softmax(logits, dim=-1).squeeze(0) probability_values = probabilities.tolist() probability_array = np.array(probability_values, dtype=np.float32) safety = risk_adjusted_prediction(probability_array, cfg=self.safety_calibration_cfg) best_index = int(safety['pred_index']) predicted_severity = self.index_to_label.get(best_index, self.label_names[best_index]) confidence = round(float(safety['max_probability']), 3) confidence_band_value = str(safety['confidence_band']) probabilities_map = { self.index_to_label.get(index, self.label_names[index]): round(float(value), 3) for index, value in enumerate(probability_values) } clinical_severity, clinical_confidence = self._normalize_clinical_severity(probabilities_map, predicted_severity) if clinical_severity != predicted_severity: predicted_severity = clinical_severity confidence = clinical_confidence warning = None if confidence_band_value == 'low': warning = 'Low-confidence deep learning prediction. Please review this result carefully and consider expert validation.' elif confidence_band_value == 'medium': warning = 'Moderate-confidence prediction. Treat this result as advisory rather than definitive.' if bool(safety.get('escalated_for_safety', False)): escalated_warning = 'Prediction was conservatively escalated to major due to uncertainty and elevated severe-class probability.' warning = escalated_warning if warning is None else f'{warning} {escalated_warning}' explanation = ( f'Deep learning model predicted {predicted_severity} interaction severity with {confidence:.3f} confidence. ' f'The prediction used learned embeddings for {drug_a_name} and {drug_b_name} from the trained MEDCARE-DDI MLP. ' f'Uncertainty={safety.get("uncertainty", "normal")}, top2_margin={float(safety.get("top2_margin", 0.0)):.3f}, ' f'severe_probability={float(safety.get("severe_probability", 0.0)):.3f}.' ) return InferenceResult( source='deep_learning_prediction', confidence=confidence, severity=predicted_severity, explanation=explanation, warning=warning, model_version=self.model_version, mechanism='Deep learning inference from the trained MEDCARE-DDI PyTorch model', affected_systems='Learned from DDInter interaction patterns', smiles_a=self._resolve_smiles(drug_a_name), smiles_b=self._resolve_smiles(drug_b_name), clinical_advice=SEVERITY_ADVICE.get(predicted_severity, SEVERITY_ADVICE['unknown']), additional_notes=explanation if warning is None else f'{explanation} {warning}', drug_a_name=drug_a_name, drug_b_name=drug_b_name, drug_a_id=drug_a_id, drug_b_id=drug_b_id, source_dataset='ddi_mlp_best.pt', exact_match=False, evidence_count=None, confidence_band=confidence_band_value, probabilities=probabilities_map, ) def predict(self, drug_a_value: str, drug_b_value: str) -> dict[str, Any]: drug_a_name = self._resolve_drug_name(drug_a_value) drug_b_name = self._resolve_drug_name(drug_b_value) drug_a_id = drug_a_value.strip().upper() drug_b_id = drug_b_value.strip().upper() if not drug_a_name or not drug_b_name: raise ValueError('Both drugs must be provided') exact_result = self._lookup_exact(drug_a_name, drug_b_name, drug_a_id, drug_b_id) if exact_result is not None: # When DDInter has an exact pair but only "unknown" severity, use the model # to provide a more actionable estimate while preserving evidence context. if exact_result.severity == 'unknown': model_result = self._predict_with_model(drug_a_name, drug_b_name, drug_a_id, drug_b_id) evidence_note = ( f'Exact DDInter match exists ({exact_result.evidence_count or 0} record(s)) ' 'but reported severity is unknown; using ML fallback for a more informative estimate.' ) model_result.warning = ( f'{evidence_note} {model_result.warning}' if model_result.warning else evidence_note ) model_result.additional_notes = ( f'{model_result.additional_notes} {evidence_note}'.strip() ) logger.info( 'lookup_unknown_fallback drug_a=%s drug_b=%s evidence_count=%s model_severity=%s model_confidence=%.3f', drug_a_name, drug_b_name, exact_result.evidence_count, model_result.severity, model_result.confidence, ) return model_result.to_dict() logger.info( 'lookup_hit drug_a=%s drug_b=%s severity=%s confidence=%.3f', drug_a_name, drug_b_name, exact_result.severity, exact_result.confidence, ) return exact_result.to_dict() model_result = self._predict_with_model(drug_a_name, drug_b_name, drug_a_id, drug_b_id) logger.info( 'model_fallback drug_a=%s drug_b=%s severity=%s confidence=%.3f confidence_band=%s', drug_a_name, drug_b_name, model_result.severity, model_result.confidence, model_result.confidence_band, ) return model_result.to_dict()