elephmind-api / quality_control.py
zousko-stark's picture
Upload folder using huggingface_hub
7585389 verified
raw
history blame
9.02 kB
import numpy as np
import cv2
import pydicom
import logging
from typing import Dict, Any, List, Tuple, Union
from PIL import Image
logger = logging.getLogger("ElephMind-QC")
class QualityControlEngine:
"""
Advanced Quality Control Engine (Gatekeeper).
Implements the 9-Point QC Checklist.
Metrics:
1. Structural (DICOM)
2. Intensity (Contrast)
3. Blur (Laplacian)
4. Noise (SNR)
5. Saturation (Clipping)
6. Spatial (Aspect Ratio)
Decision:
QC Score = Weighted Sum
Threshold >= 0.75 -> PASS
"""
def __init__(self):
# Weights defined by user
self.weights = {
"structure": 0.30, # Weight 3 (Normalized approx)
"blur": 0.20, # Weight 2
"contrast": 0.20, # Weight 2
"noise": 0.10, # Weight 1
"saturation": 0.10,
"spatial": 0.10
}
# Thresholds
self.thresholds = {
"blur_var": 100.0, # Laplacian Variance < 100 -> Blurry
"contrast_std": 10.0, # Std Dev < 10 -> Low Contrast
"entropy": 4.0, # Entropy < 4.0 -> Low Info
"snr_min": 2.0, # Signal-to-Noise Ratio < 2.0 -> Noisy
"saturation_max": 0.05, # >5% pixels at min/max -> Saturated
"aspect_min": 0.5, # Too thin
"aspect_max": 2.0 # Too wide
}
def evaluate_dicom(self, dataset: pydicom.dataset.FileDataset) -> Dict[str, Any]:
"""
Gate 1: Structural DICOM Check.
"""
reasons = []
passed = True
try:
# 1. Pixel Data Presence
if not hasattr(dataset, "PixelData") or dataset.PixelData is None:
return {"passed": False, "score": 0.0, "reasons": ["CRITICAL: Missing PixelData"]}
# 2. Dimensions
rows = getattr(dataset, "Rows", 0)
cols = getattr(dataset, "Columns", 0)
if rows <= 0 or cols <= 0:
return {"passed": False, "score": 0.0, "reasons": ["CRITICAL: Invalid Dimensions (Rows/Cols <= 0)"]}
# 3. Transfer Syntax (Compression check - basic)
# If we can read pixel_array, it's usually mostly fine, preventing crash is handled in processor.
# Here we just check logical validity.
pass
except Exception as e:
return {"passed": False, "score": 0.0, "reasons": [f"CRITICAL: DICOM Corrupt ({str(e)})"]}
return {"passed": True, "score": 1.0, "reasons": []}
def compute_metrics(self, image: np.ndarray) -> Dict[str, float]:
"""
Compute raw metrics for the image (H, W) or (H, W, C).
Image input should be uint8 0-255 or float.
"""
metrics = {}
# Ensure Grayscale for calculation
if len(image.shape) == 3:
gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
else:
gray = image
# 1. Blur (Variance of Laplacian)
metrics['blur_var'] = cv2.Laplacian(gray, cv2.CV_64F).var()
# 2. Intensity / Contrast
metrics['std_dev'] = np.std(gray)
# Entropy
hist, _ = np.histogram(gray, bins=256, range=(0, 256))
prob = hist / (np.sum(hist) + 1e-8)
prob = prob[prob > 0]
metrics['entropy'] = -np.sum(prob * np.log2(prob))
# 3. Noise (Simple SNR estimate)
# Signal = Mean, Noise = Std(High Pass)
# Simple High Pass: Image - Blurred
blurred = cv2.GaussianBlur(gray, (5, 5), 0)
noise_img = gray.astype(float) - blurred.astype(float)
noise_std = np.std(noise_img) + 1e-8
signal_mean = np.mean(gray)
metrics['snr'] = signal_mean / noise_std
# 4. Saturation
# % pixels at 0 or 255
n_pixels = gray.size
n_sat = np.sum(gray <= 5) + np.sum(gray >= 250)
metrics['saturation_pct'] = n_sat / n_pixels
# 5. Spatial
h, w = gray.shape
metrics['aspect_ratio'] = w / h
return metrics
def run_quality_check(self, image_input: Union[Image.Image, np.ndarray, pydicom.dataset.FileDataset]) -> Dict[str, Any]:
"""
Main Entry Point.
Returns: {
"passed": bool,
"quality_score": float (0-1),
"reasons": List[str],
"metrics": Dict
}
"""
reasons = []
scores = {}
# --- PHASE 1: DICOM STRUCTURE (If DICOM) ---
dicom_score = 1.0
if isinstance(image_input, pydicom.dataset.FileDataset):
res_struct = self.evaluate_dicom(image_input)
if not res_struct['passed']:
return {
"passed": False,
"quality_score": 0.0,
"reasons": res_struct['reasons'],
"metrics": {}
}
# Convert to numpy for image analysis using standard processor logic (simplified here or assume pre-converted)
# ideally the caller passes the converted image.
# If input is DICOM, we assume we can't analyze image metrics easily here without converting.
# To simplify integration: Check DICOM Structure, then rely on caller to pass Image object for Visual QC.
# For this implementation, we assume input is PIL Image or Numpy Array for Visual QC.
pass
# Prepare Image
if isinstance(image_input, Image.Image):
img_np = np.array(image_input)
elif isinstance(image_input, np.ndarray):
img_np = image_input
else:
# If strictly DICOM passed without conversion capability, we only did struct check
return {"passed": True, "quality_score": 1.0, "reasons": [], "metrics": {}}
# --- PHASE 2: VISUAL METRICS ---
m = self.compute_metrics(img_np)
# 1. Blur Check
# Sigmoid-like soft score or Hard Threshold? User implies Hard Rules composed into Score.
# "Structure: weight 3, Blur: weight 2..."
# Let's assign 0 or 1 per category based on threshold, then weight.
# Blur
if m['blur_var'] < self.thresholds['blur_var']:
scores['blur'] = 0.0
reasons.append("Image Floue (Netteté insuffisante)")
else:
scores['blur'] = 1.0
# Contrast / Intensity
if m['std_dev'] < self.thresholds['contrast_std'] or m['entropy'] < self.thresholds['entropy']:
scores['contrast'] = 0.0
reasons.append("Contraste Insuffisant (Image plate/sombre)")
else:
scores['contrast'] = 1.0
# Noise
if m['snr'] < self.thresholds['snr_min']:
scores['noise'] = 0.0
reasons.append("Bruit Excessif (SNR faible)")
else:
scores['noise'] = 1.0
# Saturation
if m['saturation_pct'] > self.thresholds['saturation_max']:
scores['saturation'] = 0.0
reasons.append("Saturation Excessive (>5% clipping)")
else:
scores['saturation'] = 1.0
# Spatial
if not (self.thresholds['aspect_min'] <= m['aspect_ratio'] <= self.thresholds['aspect_max']):
scores['spatial'] = 0.0
reasons.append(f"Format Anatomique Invalide (Ratio {m['aspect_ratio']:.2f})")
else:
scores['spatial'] = 1.0
# Structural (Implicitly 1 if we got here with an image)
scores['structure'] = 1.0
# --- PHASE 3: GLOBAL SCORE ---
# QC_score = Sum(w * s)
final_score = (
self.weights['structure'] * scores.get('structure', 1.0) +
self.weights['blur'] * scores.get('blur', 1.0) +
self.weights['contrast'] * scores.get('contrast', 1.0) +
self.weights['noise'] * scores.get('noise', 1.0) +
self.weights['saturation'] * scores.get('saturation', 1.0) +
self.weights['spatial'] * scores.get('spatial', 1.0)
)
# Normalize weights sum just in case
total_weight = sum(self.weights.values())
final_score = final_score / total_weight
# DECISION
is_passed = final_score >= 0.75
status = "PASSED" if is_passed else "REJECTED"
logger.info(f"QC Evaluation: {status} (Score: {final_score:.2f}) - Reasons: {reasons}")
return {
"passed": is_passed,
"quality_score": round(final_score, 2),
"reasons": reasons,
"metrics": m
}