from datasets import load_dataset import gradio as gr import tensorflow as tf from tensorflow.keras.preprocessing.image import img_to_array from huggingface_hub import hf_hub_download from PIL import Image import os import numpy as np from modelbuilder import capsnet_custom_objects from defs import class_to_disease_map from pathlib import Path # -------CONSTANTS-------# TARGET_SIZE = (256, 256) # target size for masked images # Your dataset repo on HF # DATASET_REPO = "valste/lung-disease-xrays" EXAMPLES_DIR = Path("./tmp/external_xrays_299x299x3") EXAMPLES_DIR.mkdir(parents=True, exist_ok=True) # ------------------------------------------------------------ # Determine the running environment: local machine or huggingface spaces # ------------------------------------------------------------ def running_in_spaces() -> bool: """Return True if app is running inside Hugging Face Spaces.""" return ( os.environ.get("SPACE_ID") is not None or os.environ.get("SYSTEM") == "spaces" ) is_spaces = running_in_spaces() if is_spaces: print(f"Running in Hugging Face Spaces environment.") else: print(f"Running on local machine:{os.environ.get('COMPUTERNAME','Unknown')}") class DemoException(Exception): pass # ------------------------------------------------------------ # 1️⃣ Load the models from Hugging Face Hub: # capsnet for disease classification and GAN for lung segmentation/masking # as well as images dataset # ------------------------------------------------------------ gan_model_path = None capsnet_model_path = None dataset = None data_dir = None if is_spaces: gan_model_path = hf_hub_download( repo_id="valste/lung-segmentation-gan", filename="model.keras" ) capsnet_model_path = hf_hub_download( repo_id="valste/capsnet-4class-lung-disease-classifier", filename="model.keras" ) else: # local machine capsnet_model_path = os.path.join( ".", "models", "capsnet-4class-lung-disease-classifier", "model.keras" ) gan_model_path = os.path.join(".", "models", "lung-segmentation-gan", "model.keras") #----Load models----# model_gan = tf.keras.models.load_model(gan_model_path, compile=False) model_capsnet = tf.keras.models.load_model(capsnet_model_path, custom_objects=capsnet_custom_objects, compile=False) # ------------------------------------------------------------ # 2️⃣ Load sample X-ray imagepaths dataset # ------------------------------------------------------------ # Build examples directly from the folder img_paths = [[str(p)] for p in sorted(EXAMPLES_DIR.glob("*.png"))] img_names = [Path(p[0]).name for p in img_paths] print("Examples found:", len(img_paths)) print("First example:", img_paths[0] if img_paths else "NONE") # ------------------------------------------------------------ # 3️⃣ Define preprocessing and inference function # ------------------------------------------------------------ def create_binary_mask(img: Image.Image, seg_model=model_gan, target_size=TARGET_SIZE) -> np.ndarray: """Create a binary mask from a PIL Image.""" # --- 1) Grayscale base image --- img_gray = img.convert("L") #print(f" image size: {img_gray.size}") img_gray = img_gray.resize(target_size, Image.BILINEAR) #print(f" image rescaled size: {img_gray.size}") # float32 in [0, 1] gray_array = np.array(img_gray, dtype=np.float32) / 255.0 #print(f" gray_array shape: {gray_array.shape}") # --- 2) GAN input --- gan_input = gray_array[..., np.newaxis] # (H, W, 1) gan_input = np.expand_dims(gan_input, axis=0) # (1, H, W, 1) #print(f" GAN input shape (before prediction): {gan_input.shape}") # --- 3) Run segmentation GAN --- prediction = seg_model.predict(gan_input) mask = (prediction[0, :, :, 0] > 0.5).astype(np.uint8) * 255 # take first batch & channel, threshold at 0.5, get 0/255 mask mask = (mask > 127).astype(np.uint8) # ensure strictly binary mask with values 0 or 1 return mask def create_masked_img(img: Image.Image, seg_model=model_gan, target_size=TARGET_SIZE) -> tuple[tf.Tensor, np.ndarray, np.ndarray]: """Create masked image tensor for CapsNet input from a PIL Image.""" img = img.resize(target_size, Image.BILINEAR) mask = create_binary_mask(img, seg_model=seg_model, target_size=target_size) # (H, W), values in {0, 1} print("mask:", mask.shape, mask.min(), mask.max(), mask.mean()) img_arr = img_to_array(img).astype(np.float32) # (H,W,1) print("mask->img_arr:", img_arr.shape, img_arr.min(), img_arr.max(), img_arr.mean()) # --- 4) Apply mask --- masked = img_arr * mask[..., None] # still float # (H,W,1) print("masked 1:", masked.shape, masked.min(), masked.max(), masked.mean()) masked = masked.astype(np.uint8) # back to 0–255 image # (H,W,1) print("masked 2:", masked.shape, masked.min(), masked.max(), masked.mean()) # --- 5) Prepare input for CapsNet: expand to 3 RGB channels --- # masked = np.repeat(masked, 3, axis=-1) # (H,W,3) # print("masked 3 ", masked.shape, masked.min(), masked.max(), masked.mean()) x = tf.convert_to_tensor( np.expand_dims(masked, axis=0), # adding batch dimension -> (1, 256, 256, 3) dtype=tf.float32, # ✅ float32, not uint8 ) print("capsnet input x:", x.shape, x.dtype) return x, masked, mask def to_probabilities(predictions, model_type: str = "capsnet"): """ Convert model outputs to class probabilities. Parameters ---------- dis_confs : np.ndarray Raw model outputs. For CapsNet this is expected to be of shape (n, n_classes, 1) where the last dim is just a singleton and dis_confs need normalization means the sum along class axis is not equal to 1. model_type : str "capsnet" → normalize along class axis and squeeze last dim. anything else → return dis_confs unchanged as np.ndarray. Returns ------- np.ndarray Probabilities of shape (n, n_classes) for capsnet, or np.asarray(dis_confs) for other models. """ predictions = np.asarray(predictions) if model_type != "capsnet": # Assume other models (e.g. softmax CNN) already return probs. return predictions # --- CapsNet branch --- # Ensure we have a 3D tensor with the singleton last dimension if predictions.ndim == 2: # (n, n_classes) -> (n, n_classes, 1) predictions = predictions[..., np.newaxis] elif predictions.ndim != 3: raise ValueError(f"Expected dis_confs to have 2 or 3 dims for capsnet, got shape {predictions.shape}") # Normalize along the class axis (axis=1) sums = np.sum(predictions, axis=1, keepdims=True) # Avoid division by zero just in case eps = 1e-12 sums = np.where(sums == 0, eps, sums) predictions = predictions / sums # Remove the last singleton dim: (n, n_classes, 1) -> (n, n_classes) predictions = predictions.squeeze(-1) return predictions def map_probabilities_to_classes(probabilities, class_map): """ Map a probability vector to a dict {class_name: probability}. Parameters ---------- probabilities : array-like Shape (n_classes,) – probabilities after softmax or normalization. class_map : dict Mapping from class index → class name. Returns ------- dict {class_name: float(probability)} sorted by descending probability. """ probs = np.asarray(probabilities).flatten() if len(probs) != len(class_map): raise ValueError(f"Probability length {len(probs)} does not match class_map size {len(class_map)}") mapped = {class_map[i]: float(probs[i]) for i in range(len(probs))} # Sort by highest probability mapped = dict(sorted(mapped.items(), key=lambda x: x[1], reverse=True)) return mapped def predict(img_path: str) -> tuple[str, np.ndarray, np.ndarray]: img = Image.open(img_path) x, masked, mask = create_masked_img(img) preds = model_capsnet.predict(x, verbose=1) #(1,4,1) aka (batch_size, 4_class_prob, 1) probs = to_probabilities(preds, model_type="capsnet") # shape (1,4) probs = probs[0] # shape (4,) prob_dict = map_probabilities_to_classes(probs, class_to_disease_map) filename_out = os.path.basename(img_path) return filename_out, masked, prob_dict