#!/usr/bin/env python3 """ CT scan preprocessor for the inference pipeline. This is the earlier working detection flow restored with stricter defaults to reduce over-detection. """ from pathlib import Path import numpy as np import SimpleITK as sitk from scipy.ndimage import zoom HU_MIN = -1000 HU_MAX = 400 NODULE_PATCH_SIZE = 64 CONTEXT_PATCH_SIZE = 96 CONTEXT_TARGET_SIZE = 48 CLASSIFIER_PATCH_SIZE = 32 def load_ct_scan(scan_path): """Load a CT scan from .mhd, .nii, .nii.gz, .npz, or .npy.""" scan_path = str(scan_path) ext = Path(scan_path).suffix.lower() if ext in ['.npz', '.npy']: try: if ext == '.npz': data = np.load(scan_path) if 'patch' in data: ct_array = data['patch'] elif 'image' in data: ct_array = data['image'] else: ct_array = data[data.files[0]] else: ct_array = np.load(scan_path) return ct_array.astype(np.float32), np.zeros(3), np.ones(3), np.eye(3) except Exception as exc: raise ValueError(f"Failed to load numpy file {scan_path}: {exc}") from exc try: image = sitk.ReadImage(scan_path) except RuntimeError as exc: emsg = str(exc) if ext == '.mhd' and 'No such file or directory' in emsg: raise ValueError( "MHD file error: The associated .raw or .zraw file is missing. " "For web uploads, use .nii.gz or .npz if possible." ) from exc raise ValueError(f"SimpleITK failed to read {scan_path}: {emsg}") from exc ct_array = sitk.GetArrayFromImage(image) origin = np.array(image.GetOrigin()) spacing = np.array(image.GetSpacing()) direction = np.array(image.GetDirection()).reshape(3, 3) return ct_array, origin, spacing, direction def normalize_hu(ct_array, hu_min=HU_MIN, hu_max=HU_MAX): """Clip HU values and normalize to [0, 1].""" clipped = np.clip(ct_array, hu_min, hu_max) normalized = (clipped - hu_min) / (hu_max - hu_min) return normalized.astype(np.float32) def normalize_hu_signed(ct_array, hu_min=HU_MIN, hu_max=HU_MAX): """Clip HU values and normalize to [-1, 1].""" normalized = normalize_hu(ct_array, hu_min, hu_max) return (normalized * 2 - 1).astype(np.float32) def create_lung_mask(ct_scan, threshold_lung=-320): """Create a conservative lung mask for candidate search.""" from scipy.ndimage import binary_dilation, binary_erosion, binary_fill_holes from scipy.ndimage import generate_binary_structure from skimage.measure import label, regionprops print("Creating lung mask...") mask = ct_scan < threshold_lung mask = binary_erosion(mask, iterations=1) mask = binary_dilation(mask, iterations=1) labeled = label(mask) regions = regionprops(labeled) if not regions: print("ERROR: No lung regions found!") return np.zeros_like(ct_scan, dtype=bool) valid_regions = [] total_vol = mask.size for region in regions: if region.area < 1000 or region.area > total_vol * 0.3: continue cz, _, _ = region.centroid if cz < mask.shape[0] * 0.1 or cz > mask.shape[0] * 0.9: continue valid_regions.append(region) valid_regions.sort(key=lambda region: region.area, reverse=True) lung_mask = np.zeros_like(mask, dtype=bool) for region in valid_regions[:2]: lung_mask[labeled == region.label] = True lung_mask = binary_fill_holes(lung_mask) struct = generate_binary_structure(3, 1) lung_mask = binary_dilation(lung_mask, structure=struct, iterations=2) print(" Removing mediastinum...") center_x = lung_mask.shape[2] // 2 mediastinum_width = int(lung_mask.shape[2] * 0.25 / 2) lung_mask[:, :, center_x - mediastinum_width:center_x + mediastinum_width] = False print(" Restricting to lung z-range...") lung_slices = np.any(lung_mask, axis=(1, 2)) lung_z_indices = np.where(lung_slices)[0] if len(lung_z_indices) > 0: lung_z_min = int(lung_z_indices[0]) lung_z_max = int(lung_z_indices[-1]) z_extent = lung_z_max - lung_z_min if z_extent > lung_mask.shape[0] * 0.80: print(f" WARNING: Lung mask too large ({z_extent} slices), using defaults") lung_z_min = int(lung_mask.shape[0] * 0.15) lung_z_max = int(lung_mask.shape[0] * 0.75) else: margin = int(z_extent * 0.10) lung_z_min = max(0, lung_z_min - margin) lung_z_max = min(lung_mask.shape[0] - 1, lung_z_max + margin) lung_mask[:lung_z_min, :, :] = False lung_mask[lung_z_max + 1:, :, :] = False print(f" Lung z-range: [{lung_z_min}:{lung_z_max}]") print(f"✓ Final lung mask: {np.sum(lung_mask):,} voxels") return lung_mask def find_candidates_blob(ct_normalized, lung_mask, min_sigma=1.5, max_sigma=7, num_sigma=10, threshold=0.25, max_candidates=50): """Find candidate nodules using 3D Difference of Gaussians inside lungs.""" from skimage.feature import blob_dog del num_sigma print( f"[DEBUG] find_candidates_blob(min_sigma={min_sigma}, " f"max_sigma={max_sigma}, threshold={threshold}, max_candidates={max_candidates})" ) # Guard against overly permissive thresholds that explode false positives. if threshold < 0.08: print(f"Warning: blob_dog threshold {threshold:.3f} is too low; forcing 0.12") threshold = 0.12 lung_indices = np.where(lung_mask) if len(lung_indices[0]) == 0: return [] z_min, z_max = lung_indices[0].min(), lung_indices[0].max() y_min, y_max = lung_indices[1].min(), lung_indices[1].max() x_min, x_max = lung_indices[2].min(), lung_indices[2].max() buffer = 5 z_min = max(0, z_min - buffer) z_max = min(ct_normalized.shape[0], z_max + buffer) y_min = max(0, y_min - buffer) y_max = min(ct_normalized.shape[1], y_max + buffer) x_min = max(0, x_min - buffer) x_max = min(ct_normalized.shape[2], x_max + buffer) ct_crop = ct_normalized[z_min:z_max, y_min:y_max, x_min:x_max] print(f"Detecting blobs in 3D DoG ({ct_crop.shape})...") blobs = blob_dog( ct_crop, min_sigma=min_sigma, max_sigma=max_sigma, threshold=threshold, overlap=0.5, ) print(f" Found {len(blobs)} raw candidates") candidates = [] rejected_outside = 0 rejected_intensity = 0 rejected_size = 0 for blob in blobs: z_c, y_c, x_c, sigma = blob z = int(z_c + z_min) y = int(y_c + y_min) x = int(x_c + x_min) radius = sigma * np.sqrt(3) if ( z >= ct_normalized.shape[0] or y >= ct_normalized.shape[1] or x >= ct_normalized.shape[2] or not lung_mask[z, y, x] ): rejected_outside += 1 continue patch_size = 3 z1, z2 = max(0, z - patch_size), min(ct_normalized.shape[0], z + patch_size + 1) y1, y2 = max(0, y - patch_size), min(ct_normalized.shape[1], y + patch_size + 1) x1, x2 = max(0, x - patch_size), min(ct_normalized.shape[2], x + patch_size + 1) patch_intensity = ct_normalized[z1:z2, y1:y2, x1:x2].mean() if patch_intensity < 0.18: rejected_intensity += 1 continue if radius < 1.0 or radius > 15: rejected_size += 1 continue candidates.append({ 'location': (z, y, x), 'radius': radius, 'intensity': float(patch_intensity), }) print(f"✓ {len(candidates)} valid candidates") print(f" Rejected: {rejected_outside} outside, {rejected_intensity} low, {rejected_size} size") candidates.sort(key=lambda cand: cand['intensity'], reverse=True) return candidates[:max_candidates] def extract_patch(ct_array, center_zyx, patch_size, pad_value=0): """Extract a cubic patch centered at the given voxel coordinate.""" half = patch_size // 2 cz, cy, cx = center_zyx z_dim, y_dim, x_dim = ct_array.shape z0, z1 = cz - half, cz + half y0, y1 = cy - half, cy + half x0, x1 = cx - half, cx + half pads = [ (max(0, -z0), max(0, z1 - z_dim)), (max(0, -y0), max(0, y1 - y_dim)), (max(0, -x0), max(0, x1 - x_dim)), ] needs_pad = any(before > 0 or after > 0 for before, after in pads) if needs_pad: ct_array = np.pad(ct_array, pads, mode='constant', constant_values=pad_value) cz += pads[0][0] cy += pads[1][0] cx += pads[2][0] z0, z1 = cz - half, cz + half y0, y1 = cy - half, cy + half x0, x1 = cx - half, cx + half patch = ct_array[z0:z1, y0:y1, x0:x1] if patch.shape == (patch_size, patch_size, patch_size): return patch result = np.full((patch_size, patch_size, patch_size), pad_value, dtype=ct_array.dtype) sz = min(patch.shape[0], patch_size) sy = min(patch.shape[1], patch_size) sx = min(patch.shape[2], patch_size) result[:sz, :sy, :sx] = patch[:sz, :sy, :sx] return result def downsample_patch(patch, target_size): """Downsample a 3D patch using trilinear interpolation.""" if patch is None: return None factor = target_size / patch.shape[0] return zoom(patch, factor, order=1).astype(np.float32) def preprocess_for_detection(scan_path, use_blob_candidates=True): """Full preprocessing pipeline for inference.""" ct_raw, origin, spacing, direction = load_ct_scan(scan_path) target_spacing = np.array([1.0, 1.0, 1.0]) current_spacing = spacing[::-1] resize_factor = current_spacing / target_spacing if not np.allclose(resize_factor, 1.0, atol=1e-2): ct_raw = zoom(ct_raw, resize_factor, order=1).astype(np.float32) spacing = target_spacing[::-1] ct_01 = normalize_hu(ct_raw) ct_signed = normalize_hu_signed(ct_raw) lung_mask = create_lung_mask(ct_raw) candidates_raw = [] if use_blob_candidates: candidates_raw = find_candidates_blob(ct_01, lung_mask) print(f"[DEBUG] preprocess_for_detection raw_candidates={len(candidates_raw)}") candidates = [] rejected_patch_errors = 0 for candidate in candidates_raw: z, y, x = candidate['location'] nodule_patch = extract_patch(ct_signed, (z, y, x), NODULE_PATCH_SIZE, pad_value=-1.0) context_patch_96 = extract_patch(ct_signed, (z, y, x), CONTEXT_PATCH_SIZE, pad_value=-1.0) if nodule_patch is None or context_patch_96 is None: rejected_patch_errors += 1 continue context_patch = downsample_patch(context_patch_96, CONTEXT_TARGET_SIZE) candidates.append({ 'nodule_patch': nodule_patch, 'context_patch': context_patch, 'location': (z, y, x), 'radius': candidate['radius'], 'intensity': candidate['intensity'], }) print( f"[DEBUG] preprocess_for_detection patches_ready={len(candidates)} " f"rejected_patch_errors={rejected_patch_errors}" ) metadata = { 'origin': origin.tolist(), 'spacing': spacing.tolist(), 'shape': list(ct_raw.shape), 'scan_path': str(scan_path), 'direction': direction.tolist(), } return candidates, ct_01, metadata, lung_mask def extract_classification_patch(ct_normalized, location, size=CLASSIFIER_PATCH_SIZE): """Extract a 32^3 patch for the malignancy classifier.""" if ct_normalized.shape == (NODULE_PATCH_SIZE,) * 3: offset = (NODULE_PATCH_SIZE - size) // 2 return ct_normalized[ offset:offset + size, offset:offset + size, offset:offset + size, ].copy() return extract_patch(ct_normalized, location, size, pad_value=0)