|
|
""" |
|
|
Self-contained Hugging Face wrapper for Sybil lung cancer risk prediction model. |
|
|
This version works directly from HF without requiring external Sybil package. |
|
|
""" |
|
|
|
|
|
import os |
|
|
import json |
|
|
import sys |
|
|
import torch |
|
|
import numpy as np |
|
|
from typing import List, Dict, Optional |
|
|
from dataclasses import dataclass |
|
|
from transformers.modeling_outputs import BaseModelOutput |
|
|
from safetensors.torch import load_file |
|
|
|
|
|
|
|
|
current_dir = os.path.dirname(os.path.abspath(__file__)) |
|
|
if current_dir not in sys.path: |
|
|
sys.path.insert(0, current_dir) |
|
|
|
|
|
try: |
|
|
from .configuration_sybil import SybilConfig |
|
|
from .modeling_sybil import SybilForRiskPrediction |
|
|
from .image_processing_sybil import SybilImageProcessor |
|
|
except ImportError: |
|
|
from configuration_sybil import SybilConfig |
|
|
from modeling_sybil import SybilForRiskPrediction |
|
|
from image_processing_sybil import SybilImageProcessor |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class SybilOutput(BaseModelOutput): |
|
|
""" |
|
|
Output class for Sybil model predictions. |
|
|
|
|
|
Args: |
|
|
risk_scores: Risk scores for each year (1-6 years by default) |
|
|
attentions: Optional attention maps if requested |
|
|
""" |
|
|
risk_scores: torch.FloatTensor = None |
|
|
attentions: Optional[Dict] = None |
|
|
|
|
|
|
|
|
class SybilHFWrapper: |
|
|
""" |
|
|
Hugging Face wrapper for Sybil ensemble model. |
|
|
Provides a simple interface for lung cancer risk prediction from CT scans. |
|
|
""" |
|
|
|
|
|
def __init__(self, config: SybilConfig = None, model_dir: str = None): |
|
|
""" |
|
|
Initialize the Sybil model ensemble. |
|
|
|
|
|
Args: |
|
|
config: Model configuration (will use default if not provided) |
|
|
model_dir: Directory containing model files (defaults to file location) |
|
|
""" |
|
|
self.config = config if config is not None else SybilConfig() |
|
|
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
|
|
|
|
|
|
if model_dir is not None: |
|
|
self.model_dir = model_dir |
|
|
else: |
|
|
|
|
|
self.model_dir = os.path.dirname(os.path.abspath(__file__)) |
|
|
|
|
|
|
|
|
self.image_processor = SybilImageProcessor() |
|
|
|
|
|
|
|
|
self.calibrator = self._load_calibrator() |
|
|
|
|
|
|
|
|
self.models = self._load_ensemble_models() |
|
|
|
|
|
def _load_calibrator(self) -> Dict: |
|
|
"""Load ensemble calibrator data""" |
|
|
calibrator_path = os.path.join(self.model_dir, "checkpoints", "sybil_ensemble_simple_calibrator.json") |
|
|
|
|
|
if os.path.exists(calibrator_path): |
|
|
with open(calibrator_path, 'r') as f: |
|
|
return json.load(f) |
|
|
else: |
|
|
|
|
|
calibrator_path = os.path.join(self.model_dir, "calibrator_data.json") |
|
|
if os.path.exists(calibrator_path): |
|
|
with open(calibrator_path, 'r') as f: |
|
|
return json.load(f) |
|
|
return {} |
|
|
|
|
|
def _load_ensemble_models(self) -> List[torch.nn.Module]: |
|
|
""" |
|
|
Load all models in the ensemble from original checkpoints. |
|
|
|
|
|
Note: We load from .ckpt files instead of safetensors because the safetensors |
|
|
were created with the wrong CumulativeProbabilityLayer architecture. |
|
|
""" |
|
|
import glob as glob_module |
|
|
models = [] |
|
|
|
|
|
|
|
|
checkpoints_dir = os.path.join(self.model_dir, "checkpoints") |
|
|
checkpoint_files = sorted(glob_module.glob(os.path.join(checkpoints_dir, "*.ckpt"))) |
|
|
|
|
|
print(f"Found {len(checkpoint_files)} checkpoint files") |
|
|
|
|
|
|
|
|
for checkpoint_path in checkpoint_files: |
|
|
try: |
|
|
model = SybilForRiskPrediction(self.config) |
|
|
checkpoint = torch.load(checkpoint_path, map_location='cpu', weights_only=False) |
|
|
|
|
|
|
|
|
if 'state_dict' in checkpoint: |
|
|
state_dict = checkpoint['state_dict'] |
|
|
else: |
|
|
state_dict = checkpoint |
|
|
|
|
|
|
|
|
cleaned_state_dict = {} |
|
|
for k, v in state_dict.items(): |
|
|
if k.startswith('model.'): |
|
|
cleaned_state_dict[k[6:]] = v |
|
|
else: |
|
|
cleaned_state_dict[k] = v |
|
|
|
|
|
|
|
|
model.load_state_dict(cleaned_state_dict, strict=False) |
|
|
model.to(self.device) |
|
|
model.eval() |
|
|
models.append(model) |
|
|
print(f" Loaded model from {os.path.basename(checkpoint_path)}") |
|
|
except Exception as e: |
|
|
print(f" Warning: Could not load {os.path.basename(checkpoint_path)}: {e}") |
|
|
continue |
|
|
|
|
|
if not models: |
|
|
raise ValueError("No models could be loaded from the ensemble. Please ensure model files are present.") |
|
|
|
|
|
print(f"Loaded {len(models)} models in ensemble") |
|
|
return models |
|
|
|
|
|
def _apply_calibration(self, scores: np.ndarray) -> np.ndarray: |
|
|
""" |
|
|
Apply complete isotonic regression calibration matching the original Sybil implementation. |
|
|
|
|
|
This method applies the same calibration as the original SimpleClassifierGroup.predict_proba: |
|
|
1. For each year, apply each calibrator in the ensemble |
|
|
2. Each calibrator applies: linear transform -> clip -> isotonic regression (np.interp) |
|
|
3. Average predictions from all calibrators |
|
|
|
|
|
Args: |
|
|
scores: Raw risk scores from the model (shape: [batch_size, num_years]) |
|
|
|
|
|
Returns: |
|
|
Calibrated risk scores (shape: [batch_size, num_years]) |
|
|
""" |
|
|
if not self.calibrator: |
|
|
return scores |
|
|
|
|
|
calibrated_scores = [] |
|
|
|
|
|
for year in range(scores.shape[1]): |
|
|
year_key = f"Year{year + 1}" |
|
|
|
|
|
if year_key not in self.calibrator: |
|
|
|
|
|
calibrated_scores.append(scores[:, year]) |
|
|
continue |
|
|
|
|
|
cal_list = self.calibrator[year_key] |
|
|
|
|
|
if not isinstance(cal_list, list) or len(cal_list) == 0: |
|
|
|
|
|
calibrated_scores.append(scores[:, year]) |
|
|
continue |
|
|
|
|
|
|
|
|
year_predictions = [] |
|
|
|
|
|
for cal_data in cal_list: |
|
|
if not isinstance(cal_data, dict): |
|
|
continue |
|
|
|
|
|
|
|
|
if "coef" not in cal_data or "intercept" not in cal_data: |
|
|
continue |
|
|
|
|
|
coef = np.array(cal_data["coef"]) |
|
|
intercept = np.array(cal_data["intercept"]) |
|
|
|
|
|
|
|
|
if "x0" not in cal_data or "y0" not in cal_data: |
|
|
continue |
|
|
|
|
|
x0 = np.array(cal_data["x0"]) |
|
|
y0 = np.array(cal_data["y0"]) |
|
|
|
|
|
|
|
|
x_min = cal_data.get("x_min", -np.inf) |
|
|
x_max = cal_data.get("x_max", np.inf) |
|
|
|
|
|
|
|
|
|
|
|
probs = scores[:, year].reshape(-1, 1) |
|
|
T = probs @ coef + intercept |
|
|
T = T.flatten() |
|
|
|
|
|
|
|
|
T = np.clip(T, x_min, x_max) |
|
|
|
|
|
|
|
|
|
|
|
calibrated = np.interp(T, x0, y0) |
|
|
|
|
|
year_predictions.append(calibrated) |
|
|
|
|
|
if len(year_predictions) == 0: |
|
|
|
|
|
calibrated_scores.append(scores[:, year]) |
|
|
else: |
|
|
|
|
|
calibrated_scores.append(np.mean(year_predictions, axis=0)) |
|
|
|
|
|
return np.stack(calibrated_scores, axis=1) |
|
|
|
|
|
def preprocess_dicom(self, dicom_paths: List[str]) -> torch.Tensor: |
|
|
""" |
|
|
Preprocess DICOM files for model input. |
|
|
|
|
|
Args: |
|
|
dicom_paths: List of paths to DICOM files |
|
|
|
|
|
Returns: |
|
|
Preprocessed tensor ready for model input |
|
|
""" |
|
|
|
|
|
result = self.image_processor(dicom_paths, file_type="dicom", return_tensors="pt") |
|
|
pixel_values = result["pixel_values"] |
|
|
|
|
|
|
|
|
if pixel_values.ndim == 4: |
|
|
pixel_values = pixel_values.unsqueeze(0) |
|
|
|
|
|
return pixel_values.to(self.device) |
|
|
|
|
|
def predict(self, dicom_paths: List[str], return_attentions: bool = False) -> SybilOutput: |
|
|
""" |
|
|
Run prediction on a CT scan series. |
|
|
|
|
|
Args: |
|
|
dicom_paths: List of paths to DICOM files for a single CT series |
|
|
return_attentions: Whether to return attention maps |
|
|
|
|
|
Returns: |
|
|
SybilOutput with risk scores and optional attention maps |
|
|
""" |
|
|
|
|
|
pixel_values = self.preprocess_dicom(dicom_paths) |
|
|
|
|
|
|
|
|
all_predictions = [] |
|
|
all_attentions = [] |
|
|
|
|
|
with torch.no_grad(): |
|
|
for model in self.models: |
|
|
output = model( |
|
|
pixel_values=pixel_values, |
|
|
return_attentions=return_attentions |
|
|
) |
|
|
|
|
|
|
|
|
if hasattr(output, 'risk_scores'): |
|
|
predictions = output.risk_scores |
|
|
else: |
|
|
predictions = output[0] if isinstance(output, tuple) else output |
|
|
|
|
|
all_predictions.append(predictions.cpu().numpy()) |
|
|
|
|
|
if return_attentions and hasattr(output, 'image_attention'): |
|
|
all_attentions.append(output.image_attention) |
|
|
|
|
|
|
|
|
ensemble_pred = np.mean(all_predictions, axis=0) |
|
|
|
|
|
|
|
|
calibrated_pred = self._apply_calibration(ensemble_pred) |
|
|
|
|
|
|
|
|
risk_scores = torch.from_numpy(calibrated_pred).float() |
|
|
|
|
|
|
|
|
attentions = None |
|
|
if return_attentions and all_attentions: |
|
|
attentions = {"image_attention": torch.stack(all_attentions).mean(dim=0)} |
|
|
|
|
|
return SybilOutput(risk_scores=risk_scores, attentions=attentions) |
|
|
|
|
|
def __call__(self, dicom_paths: List[str] = None, dicom_series: List[List[str]] = None, **kwargs) -> SybilOutput: |
|
|
""" |
|
|
Convenience method for prediction. |
|
|
|
|
|
Args: |
|
|
dicom_paths: List of DICOM file paths for a single series |
|
|
dicom_series: List of lists of DICOM paths for batch processing |
|
|
**kwargs: Additional arguments passed to predict() |
|
|
|
|
|
Returns: |
|
|
SybilOutput with predictions |
|
|
""" |
|
|
if dicom_series is not None: |
|
|
|
|
|
all_outputs = [] |
|
|
for paths in dicom_series: |
|
|
output = self.predict(paths, **kwargs) |
|
|
all_outputs.append(output.risk_scores) |
|
|
|
|
|
risk_scores = torch.stack(all_outputs) |
|
|
return SybilOutput(risk_scores=risk_scores) |
|
|
elif dicom_paths is not None: |
|
|
return self.predict(dicom_paths, **kwargs) |
|
|
else: |
|
|
raise ValueError("Either dicom_paths or dicom_series must be provided") |
|
|
|
|
|
@classmethod |
|
|
def from_pretrained(cls, pretrained_model_name_or_path: str, **kwargs): |
|
|
""" |
|
|
Load model from Hugging Face hub or local path. |
|
|
|
|
|
Args: |
|
|
pretrained_model_name_or_path: HF model ID or local path |
|
|
**kwargs: Additional configuration arguments |
|
|
|
|
|
Returns: |
|
|
SybilHFWrapper instance |
|
|
""" |
|
|
|
|
|
config = kwargs.pop("config", None) |
|
|
if config is None: |
|
|
try: |
|
|
config = SybilConfig.from_pretrained(pretrained_model_name_or_path) |
|
|
except: |
|
|
config = SybilConfig() |
|
|
|
|
|
return cls(config=config) |