import numpy as np import cv2 import pydicom import logging from typing import Dict, Any, List, Tuple, Union from PIL import Image logger = logging.getLogger("ElephMind-QC") class QualityControlEngine: """ Advanced Quality Control Engine (Gatekeeper). Implements the 9-Point QC Checklist. Metrics: 1. Structural (DICOM) 2. Intensity (Contrast) 3. Blur (Laplacian) 4. Noise (SNR) 5. Saturation (Clipping) 6. Spatial (Aspect Ratio) Decision: QC Score = Weighted Sum Threshold >= 0.75 -> PASS """ def __init__(self): # Weights defined by user self.weights = { "structure": 0.30, # Weight 3 (Normalized approx) "blur": 0.20, # Weight 2 "contrast": 0.20, # Weight 2 "noise": 0.10, # Weight 1 "saturation": 0.10, "spatial": 0.10 } # Thresholds self.thresholds = { "blur_var": 100.0, # Laplacian Variance < 100 -> Blurry "contrast_std": 10.0, # Std Dev < 10 -> Low Contrast "entropy": 4.0, # Entropy < 4.0 -> Low Info "snr_min": 2.0, # Signal-to-Noise Ratio < 2.0 -> Noisy "saturation_max": 0.05, # >5% pixels at min/max -> Saturated "aspect_min": 0.5, # Too thin "aspect_max": 2.0 # Too wide } def evaluate_dicom(self, dataset: pydicom.dataset.FileDataset) -> Dict[str, Any]: """ Gate 1: Structural DICOM Check. """ reasons = [] passed = True try: # 1. Pixel Data Presence if not hasattr(dataset, "PixelData") or dataset.PixelData is None: return {"passed": False, "score": 0.0, "reasons": ["CRITICAL: Missing PixelData"]} # 2. Dimensions rows = getattr(dataset, "Rows", 0) cols = getattr(dataset, "Columns", 0) if rows <= 0 or cols <= 0: return {"passed": False, "score": 0.0, "reasons": ["CRITICAL: Invalid Dimensions (Rows/Cols <= 0)"]} # 3. Transfer Syntax (Compression check - basic) # If we can read pixel_array, it's usually mostly fine, preventing crash is handled in processor. # Here we just check logical validity. pass except Exception as e: return {"passed": False, "score": 0.0, "reasons": [f"CRITICAL: DICOM Corrupt ({str(e)})"]} return {"passed": True, "score": 1.0, "reasons": []} def compute_metrics(self, image: np.ndarray) -> Dict[str, float]: """ Compute raw metrics for the image (H, W) or (H, W, C). Image input should be uint8 0-255 or float. """ metrics = {} # Ensure Grayscale for calculation if len(image.shape) == 3: gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY) else: gray = image # 1. Blur (Variance of Laplacian) metrics['blur_var'] = cv2.Laplacian(gray, cv2.CV_64F).var() # 2. Intensity / Contrast metrics['std_dev'] = np.std(gray) # Entropy hist, _ = np.histogram(gray, bins=256, range=(0, 256)) prob = hist / (np.sum(hist) + 1e-8) prob = prob[prob > 0] metrics['entropy'] = -np.sum(prob * np.log2(prob)) # 3. Noise (Simple SNR estimate) # Signal = Mean, Noise = Std(High Pass) # Simple High Pass: Image - Blurred blurred = cv2.GaussianBlur(gray, (5, 5), 0) noise_img = gray.astype(float) - blurred.astype(float) noise_std = np.std(noise_img) + 1e-8 signal_mean = np.mean(gray) metrics['snr'] = signal_mean / noise_std # 4. Saturation # % pixels at 0 or 255 n_pixels = gray.size n_sat = np.sum(gray <= 5) + np.sum(gray >= 250) metrics['saturation_pct'] = n_sat / n_pixels # 5. Spatial h, w = gray.shape metrics['aspect_ratio'] = w / h return metrics def run_quality_check(self, image_input: Union[Image.Image, np.ndarray, pydicom.dataset.FileDataset]) -> Dict[str, Any]: """ Main Entry Point. Returns: { "passed": bool, "quality_score": float (0-1), "reasons": List[str], "metrics": Dict } """ reasons = [] scores = {} # --- PHASE 1: DICOM STRUCTURE (If DICOM) --- dicom_score = 1.0 if isinstance(image_input, pydicom.dataset.FileDataset): res_struct = self.evaluate_dicom(image_input) if not res_struct['passed']: return { "passed": False, "quality_score": 0.0, "reasons": res_struct['reasons'], "metrics": {} } # Convert to numpy for image analysis using standard processor logic (simplified here or assume pre-converted) # ideally the caller passes the converted image. # If input is DICOM, we assume we can't analyze image metrics easily here without converting. # To simplify integration: Check DICOM Structure, then rely on caller to pass Image object for Visual QC. # For this implementation, we assume input is PIL Image or Numpy Array for Visual QC. pass # Prepare Image if isinstance(image_input, Image.Image): img_np = np.array(image_input) elif isinstance(image_input, np.ndarray): img_np = image_input else: # If strictly DICOM passed without conversion capability, we only did struct check return {"passed": True, "quality_score": 1.0, "reasons": [], "metrics": {}} # --- PHASE 2: VISUAL METRICS --- m = self.compute_metrics(img_np) # 1. Blur Check # Sigmoid-like soft score or Hard Threshold? User implies Hard Rules composed into Score. # "Structure: weight 3, Blur: weight 2..." # Let's assign 0 or 1 per category based on threshold, then weight. # Blur if m['blur_var'] < self.thresholds['blur_var']: scores['blur'] = 0.0 reasons.append("Image Floue (Netteté insuffisante)") else: scores['blur'] = 1.0 # Contrast / Intensity if m['std_dev'] < self.thresholds['contrast_std'] or m['entropy'] < self.thresholds['entropy']: scores['contrast'] = 0.0 reasons.append("Contraste Insuffisant (Image plate/sombre)") else: scores['contrast'] = 1.0 # Noise if m['snr'] < self.thresholds['snr_min']: scores['noise'] = 0.0 reasons.append("Bruit Excessif (SNR faible)") else: scores['noise'] = 1.0 # Saturation if m['saturation_pct'] > self.thresholds['saturation_max']: scores['saturation'] = 0.0 reasons.append("Saturation Excessive (>5% clipping)") else: scores['saturation'] = 1.0 # Spatial if not (self.thresholds['aspect_min'] <= m['aspect_ratio'] <= self.thresholds['aspect_max']): scores['spatial'] = 0.0 reasons.append(f"Format Anatomique Invalide (Ratio {m['aspect_ratio']:.2f})") else: scores['spatial'] = 1.0 # Structural (Implicitly 1 if we got here with an image) scores['structure'] = 1.0 # --- PHASE 3: GLOBAL SCORE --- # QC_score = Sum(w * s) final_score = ( self.weights['structure'] * scores.get('structure', 1.0) + self.weights['blur'] * scores.get('blur', 1.0) + self.weights['contrast'] * scores.get('contrast', 1.0) + self.weights['noise'] * scores.get('noise', 1.0) + self.weights['saturation'] * scores.get('saturation', 1.0) + self.weights['spatial'] * scores.get('spatial', 1.0) ) # Normalize weights sum just in case total_weight = sum(self.weights.values()) final_score = final_score / total_weight # DECISION is_passed = final_score >= 0.75 status = "PASSED" if is_passed else "REJECTED" logger.info(f"QC Evaluation: {status} (Score: {final_score:.2f}) - Reasons: {reasons}") return { "passed": is_passed, "quality_score": round(final_score, 2), "reasons": reasons, "metrics": m }