valste's picture
fixed for local runs
f235f14
from datasets import load_dataset
import gradio as gr
import tensorflow as tf
from tensorflow.keras.preprocessing.image import img_to_array
from huggingface_hub import hf_hub_download
from PIL import Image
import os
import numpy as np
from modelbuilder import capsnet_custom_objects
from defs import class_to_disease_map
from pathlib import Path
# -------CONSTANTS-------#
TARGET_SIZE = (256, 256) # target size for masked images
# Your dataset repo on HF
# DATASET_REPO = "valste/lung-disease-xrays"
EXAMPLES_DIR = Path("./tmp/external_xrays_299x299x3")
EXAMPLES_DIR.mkdir(parents=True, exist_ok=True)
# ------------------------------------------------------------
# Determine the running environment: local machine or huggingface spaces
# ------------------------------------------------------------
def running_in_spaces() -> bool:
"""Return True if app is running inside Hugging Face Spaces."""
return (
os.environ.get("SPACE_ID") is not None or os.environ.get("SYSTEM") == "spaces"
)
is_spaces = running_in_spaces()
if is_spaces:
print(f"Running in Hugging Face Spaces environment.")
else:
print(f"Running on local machine:{os.environ.get('COMPUTERNAME','Unknown')}")
class DemoException(Exception):
pass
# ------------------------------------------------------------
# 1️⃣ Load the models from Hugging Face Hub:
# capsnet for disease classification and GAN for lung segmentation/masking
# as well as images dataset
# ------------------------------------------------------------
gan_model_path = None
capsnet_model_path = None
dataset = None
data_dir = None
if is_spaces:
gan_model_path = hf_hub_download(
repo_id="valste/lung-segmentation-gan", filename="model.keras"
)
capsnet_model_path = hf_hub_download(
repo_id="valste/capsnet-4class-lung-disease-classifier", filename="model.keras"
)
else:
# local machine
capsnet_model_path = os.path.join(
".", "models", "capsnet-4class-lung-disease-classifier", "model.keras"
)
gan_model_path = os.path.join(".", "models", "lung-segmentation-gan", "model.keras")
#----Load models----#
model_gan = tf.keras.models.load_model(gan_model_path, compile=False)
model_capsnet = tf.keras.models.load_model(capsnet_model_path, custom_objects=capsnet_custom_objects, compile=False)
# ------------------------------------------------------------
# 2️⃣ Load sample X-ray imagepaths dataset
# ------------------------------------------------------------
# Build examples directly from the folder
img_paths = [[str(p)] for p in sorted(EXAMPLES_DIR.glob("*.png"))]
img_names = [Path(p[0]).name for p in img_paths]
print("Examples found:", len(img_paths))
print("First example:", img_paths[0] if img_paths else "NONE")
# ------------------------------------------------------------
# 3️⃣ Define preprocessing and inference function
# ------------------------------------------------------------
def create_binary_mask(img: Image.Image, seg_model=model_gan, target_size=TARGET_SIZE) -> np.ndarray:
"""Create a binary mask from a PIL Image."""
# --- 1) Grayscale base image ---
img_gray = img.convert("L")
#print(f" image size: {img_gray.size}")
img_gray = img_gray.resize(target_size, Image.BILINEAR)
#print(f" image rescaled size: {img_gray.size}")
# float32 in [0, 1]
gray_array = np.array(img_gray, dtype=np.float32) / 255.0
#print(f" gray_array shape: {gray_array.shape}")
# --- 2) GAN input ---
gan_input = gray_array[..., np.newaxis] # (H, W, 1)
gan_input = np.expand_dims(gan_input, axis=0) # (1, H, W, 1)
#print(f" GAN input shape (before prediction): {gan_input.shape}")
# --- 3) Run segmentation GAN ---
prediction = seg_model.predict(gan_input)
mask = (prediction[0, :, :, 0] > 0.5).astype(np.uint8) * 255 # take first batch & channel, threshold at 0.5, get 0/255 mask
mask = (mask > 127).astype(np.uint8) # ensure strictly binary mask with values 0 or 1
return mask
def create_masked_img(img: Image.Image, seg_model=model_gan, target_size=TARGET_SIZE) -> tuple[tf.Tensor, np.ndarray, np.ndarray]:
"""Create masked image tensor for CapsNet input from a PIL Image."""
img = img.resize(target_size, Image.BILINEAR)
mask = create_binary_mask(img, seg_model=seg_model, target_size=target_size) # (H, W), values in {0, 1}
print("mask:", mask.shape, mask.min(), mask.max(), mask.mean())
img_arr = img_to_array(img).astype(np.float32) # (H,W,1)
print("mask->img_arr:", img_arr.shape, img_arr.min(), img_arr.max(), img_arr.mean())
# --- 4) Apply mask ---
masked = img_arr * mask[..., None] # still float # (H,W,1)
print("masked 1:", masked.shape, masked.min(), masked.max(), masked.mean())
masked = masked.astype(np.uint8) # back to 0–255 image # (H,W,1)
print("masked 2:", masked.shape, masked.min(), masked.max(), masked.mean())
# --- 5) Prepare input for CapsNet: expand to 3 RGB channels ---
# masked = np.repeat(masked, 3, axis=-1) # (H,W,3)
# print("masked 3 ", masked.shape, masked.min(), masked.max(), masked.mean())
x = tf.convert_to_tensor(
np.expand_dims(masked, axis=0), # adding batch dimension -> (1, 256, 256, 3)
dtype=tf.float32, # ✅ float32, not uint8
)
print("capsnet input x:", x.shape, x.dtype)
return x, masked, mask
def to_probabilities(predictions, model_type: str = "capsnet"):
"""
Convert model outputs to class probabilities.
Parameters
----------
dis_confs : np.ndarray
Raw model outputs.
For CapsNet this is expected to be of shape (n, n_classes, 1)
where the last dim is just a singleton and dis_confs need normalization means the sum along class axis is not equal to 1.
model_type : str
"capsnet" → normalize along class axis and squeeze last dim.
anything else → return dis_confs unchanged as np.ndarray.
Returns
-------
np.ndarray
Probabilities of shape (n, n_classes) for capsnet,
or np.asarray(dis_confs) for other models.
"""
predictions = np.asarray(predictions)
if model_type != "capsnet":
# Assume other models (e.g. softmax CNN) already return probs.
return predictions
# --- CapsNet branch ---
# Ensure we have a 3D tensor with the singleton last dimension
if predictions.ndim == 2:
# (n, n_classes) -> (n, n_classes, 1)
predictions = predictions[..., np.newaxis]
elif predictions.ndim != 3:
raise ValueError(f"Expected dis_confs to have 2 or 3 dims for capsnet, got shape {predictions.shape}")
# Normalize along the class axis (axis=1)
sums = np.sum(predictions, axis=1, keepdims=True)
# Avoid division by zero just in case
eps = 1e-12
sums = np.where(sums == 0, eps, sums)
predictions = predictions / sums
# Remove the last singleton dim: (n, n_classes, 1) -> (n, n_classes)
predictions = predictions.squeeze(-1)
return predictions
def map_probabilities_to_classes(probabilities, class_map):
"""
Map a probability vector to a dict {class_name: probability}.
Parameters
----------
probabilities : array-like
Shape (n_classes,) – probabilities after softmax or normalization.
class_map : dict
Mapping from class index → class name.
Returns
-------
dict
{class_name: float(probability)} sorted by descending probability.
"""
probs = np.asarray(probabilities).flatten()
if len(probs) != len(class_map):
raise ValueError(f"Probability length {len(probs)} does not match class_map size {len(class_map)}")
mapped = {class_map[i]: float(probs[i]) for i in range(len(probs))}
# Sort by highest probability
mapped = dict(sorted(mapped.items(), key=lambda x: x[1], reverse=True))
return mapped
def predict(img_path: str) -> tuple[str, np.ndarray, np.ndarray]:
img = Image.open(img_path)
x, masked, mask = create_masked_img(img)
preds = model_capsnet.predict(x, verbose=1) #(1,4,1) aka (batch_size, 4_class_prob, 1)
probs = to_probabilities(preds, model_type="capsnet") # shape (1,4)
probs = probs[0] # shape (4,)
prob_dict = map_probabilities_to_classes(probs, class_to_disease_map)
filename_out = os.path.basename(img_path)
return filename_out, masked, prob_dict