Spaces:
Sleeping
Sleeping
| from datasets import load_dataset | |
| import gradio as gr | |
| import tensorflow as tf | |
| from tensorflow.keras.preprocessing.image import img_to_array | |
| from huggingface_hub import hf_hub_download | |
| from PIL import Image | |
| import os | |
| import numpy as np | |
| from modelbuilder import capsnet_custom_objects | |
| from defs import class_to_disease_map | |
| from pathlib import Path | |
| # -------CONSTANTS-------# | |
| TARGET_SIZE = (256, 256) # target size for masked images | |
| # Your dataset repo on HF | |
| # DATASET_REPO = "valste/lung-disease-xrays" | |
| EXAMPLES_DIR = Path("./tmp/external_xrays_299x299x3") | |
| EXAMPLES_DIR.mkdir(parents=True, exist_ok=True) | |
| # ------------------------------------------------------------ | |
| # Determine the running environment: local machine or huggingface spaces | |
| # ------------------------------------------------------------ | |
| def running_in_spaces() -> bool: | |
| """Return True if app is running inside Hugging Face Spaces.""" | |
| return ( | |
| os.environ.get("SPACE_ID") is not None or os.environ.get("SYSTEM") == "spaces" | |
| ) | |
| is_spaces = running_in_spaces() | |
| if is_spaces: | |
| print(f"Running in Hugging Face Spaces environment.") | |
| else: | |
| print(f"Running on local machine:{os.environ.get('COMPUTERNAME','Unknown')}") | |
| class DemoException(Exception): | |
| pass | |
| # ------------------------------------------------------------ | |
| # 1️⃣ Load the models from Hugging Face Hub: | |
| # capsnet for disease classification and GAN for lung segmentation/masking | |
| # as well as images dataset | |
| # ------------------------------------------------------------ | |
| gan_model_path = None | |
| capsnet_model_path = None | |
| dataset = None | |
| data_dir = None | |
| if is_spaces: | |
| gan_model_path = hf_hub_download( | |
| repo_id="valste/lung-segmentation-gan", filename="model.keras" | |
| ) | |
| capsnet_model_path = hf_hub_download( | |
| repo_id="valste/capsnet-4class-lung-disease-classifier", filename="model.keras" | |
| ) | |
| else: | |
| # local machine | |
| capsnet_model_path = os.path.join( | |
| ".", "models", "capsnet-4class-lung-disease-classifier", "model.keras" | |
| ) | |
| gan_model_path = os.path.join(".", "models", "lung-segmentation-gan", "model.keras") | |
| #----Load models----# | |
| model_gan = tf.keras.models.load_model(gan_model_path, compile=False) | |
| model_capsnet = tf.keras.models.load_model(capsnet_model_path, custom_objects=capsnet_custom_objects, compile=False) | |
| # ------------------------------------------------------------ | |
| # 2️⃣ Load sample X-ray imagepaths dataset | |
| # ------------------------------------------------------------ | |
| # Build examples directly from the folder | |
| img_paths = [[str(p)] for p in sorted(EXAMPLES_DIR.glob("*.png"))] | |
| img_names = [Path(p[0]).name for p in img_paths] | |
| print("Examples found:", len(img_paths)) | |
| print("First example:", img_paths[0] if img_paths else "NONE") | |
| # ------------------------------------------------------------ | |
| # 3️⃣ Define preprocessing and inference function | |
| # ------------------------------------------------------------ | |
| def create_binary_mask(img: Image.Image, seg_model=model_gan, target_size=TARGET_SIZE) -> np.ndarray: | |
| """Create a binary mask from a PIL Image.""" | |
| # --- 1) Grayscale base image --- | |
| img_gray = img.convert("L") | |
| #print(f" image size: {img_gray.size}") | |
| img_gray = img_gray.resize(target_size, Image.BILINEAR) | |
| #print(f" image rescaled size: {img_gray.size}") | |
| # float32 in [0, 1] | |
| gray_array = np.array(img_gray, dtype=np.float32) / 255.0 | |
| #print(f" gray_array shape: {gray_array.shape}") | |
| # --- 2) GAN input --- | |
| gan_input = gray_array[..., np.newaxis] # (H, W, 1) | |
| gan_input = np.expand_dims(gan_input, axis=0) # (1, H, W, 1) | |
| #print(f" GAN input shape (before prediction): {gan_input.shape}") | |
| # --- 3) Run segmentation GAN --- | |
| prediction = seg_model.predict(gan_input) | |
| mask = (prediction[0, :, :, 0] > 0.5).astype(np.uint8) * 255 # take first batch & channel, threshold at 0.5, get 0/255 mask | |
| mask = (mask > 127).astype(np.uint8) # ensure strictly binary mask with values 0 or 1 | |
| return mask | |
| def create_masked_img(img: Image.Image, seg_model=model_gan, target_size=TARGET_SIZE) -> tuple[tf.Tensor, np.ndarray, np.ndarray]: | |
| """Create masked image tensor for CapsNet input from a PIL Image.""" | |
| img = img.resize(target_size, Image.BILINEAR) | |
| mask = create_binary_mask(img, seg_model=seg_model, target_size=target_size) # (H, W), values in {0, 1} | |
| print("mask:", mask.shape, mask.min(), mask.max(), mask.mean()) | |
| img_arr = img_to_array(img).astype(np.float32) # (H,W,1) | |
| print("mask->img_arr:", img_arr.shape, img_arr.min(), img_arr.max(), img_arr.mean()) | |
| # --- 4) Apply mask --- | |
| masked = img_arr * mask[..., None] # still float # (H,W,1) | |
| print("masked 1:", masked.shape, masked.min(), masked.max(), masked.mean()) | |
| masked = masked.astype(np.uint8) # back to 0–255 image # (H,W,1) | |
| print("masked 2:", masked.shape, masked.min(), masked.max(), masked.mean()) | |
| # --- 5) Prepare input for CapsNet: expand to 3 RGB channels --- | |
| # masked = np.repeat(masked, 3, axis=-1) # (H,W,3) | |
| # print("masked 3 ", masked.shape, masked.min(), masked.max(), masked.mean()) | |
| x = tf.convert_to_tensor( | |
| np.expand_dims(masked, axis=0), # adding batch dimension -> (1, 256, 256, 3) | |
| dtype=tf.float32, # ✅ float32, not uint8 | |
| ) | |
| print("capsnet input x:", x.shape, x.dtype) | |
| return x, masked, mask | |
| def to_probabilities(predictions, model_type: str = "capsnet"): | |
| """ | |
| Convert model outputs to class probabilities. | |
| Parameters | |
| ---------- | |
| dis_confs : np.ndarray | |
| Raw model outputs. | |
| For CapsNet this is expected to be of shape (n, n_classes, 1) | |
| where the last dim is just a singleton and dis_confs need normalization means the sum along class axis is not equal to 1. | |
| model_type : str | |
| "capsnet" → normalize along class axis and squeeze last dim. | |
| anything else → return dis_confs unchanged as np.ndarray. | |
| Returns | |
| ------- | |
| np.ndarray | |
| Probabilities of shape (n, n_classes) for capsnet, | |
| or np.asarray(dis_confs) for other models. | |
| """ | |
| predictions = np.asarray(predictions) | |
| if model_type != "capsnet": | |
| # Assume other models (e.g. softmax CNN) already return probs. | |
| return predictions | |
| # --- CapsNet branch --- | |
| # Ensure we have a 3D tensor with the singleton last dimension | |
| if predictions.ndim == 2: | |
| # (n, n_classes) -> (n, n_classes, 1) | |
| predictions = predictions[..., np.newaxis] | |
| elif predictions.ndim != 3: | |
| raise ValueError(f"Expected dis_confs to have 2 or 3 dims for capsnet, got shape {predictions.shape}") | |
| # Normalize along the class axis (axis=1) | |
| sums = np.sum(predictions, axis=1, keepdims=True) | |
| # Avoid division by zero just in case | |
| eps = 1e-12 | |
| sums = np.where(sums == 0, eps, sums) | |
| predictions = predictions / sums | |
| # Remove the last singleton dim: (n, n_classes, 1) -> (n, n_classes) | |
| predictions = predictions.squeeze(-1) | |
| return predictions | |
| def map_probabilities_to_classes(probabilities, class_map): | |
| """ | |
| Map a probability vector to a dict {class_name: probability}. | |
| Parameters | |
| ---------- | |
| probabilities : array-like | |
| Shape (n_classes,) – probabilities after softmax or normalization. | |
| class_map : dict | |
| Mapping from class index → class name. | |
| Returns | |
| ------- | |
| dict | |
| {class_name: float(probability)} sorted by descending probability. | |
| """ | |
| probs = np.asarray(probabilities).flatten() | |
| if len(probs) != len(class_map): | |
| raise ValueError(f"Probability length {len(probs)} does not match class_map size {len(class_map)}") | |
| mapped = {class_map[i]: float(probs[i]) for i in range(len(probs))} | |
| # Sort by highest probability | |
| mapped = dict(sorted(mapped.items(), key=lambda x: x[1], reverse=True)) | |
| return mapped | |
| def predict(img_path: str) -> tuple[str, np.ndarray, np.ndarray]: | |
| img = Image.open(img_path) | |
| x, masked, mask = create_masked_img(img) | |
| preds = model_capsnet.predict(x, verbose=1) #(1,4,1) aka (batch_size, 4_class_prob, 1) | |
| probs = to_probabilities(preds, model_type="capsnet") # shape (1,4) | |
| probs = probs[0] # shape (4,) | |
| prob_dict = map_probabilities_to_classes(probs, class_to_disease_map) | |
| filename_out = os.path.basename(img_path) | |
| return filename_out, masked, prob_dict | |