ImageForensics-AI / metrics /noise_analyzer.py
satyaki-mitra's picture
Initial commit: ImageForensics-AI statistical image screening system
9ff8ef6
# Dependencies
import numpy as np
from utils.logger import get_logger
from config.schemas import MetricResult
from config.constants import MetricType
from utils.image_processor import ImageProcessor
from config.constants import NOISE_ANALYSIS_PARAMS
# Suppress NumPy warning
np.seterr(divide = 'ignore',
invalid = 'ignore',
)
# Setup Logging
logger = get_logger(__name__)
class NoiseAnalyzer:
"""
Noise pattern analysis for AI detection
Core principle:
---------------
- Real photos : Sensor noise follows Poisson distribution (shot noise) + Gaussian (read noise)
- AI images : Too uniform, artificially smooth, or completely missing noise
Method:
-------
1. Extract local patches
2. Estimate noise variance in each patch
3. Analyze noise consistency and distribution
4. Check for unnatural uniformity
"""
def __init__(self):
self.image_processor = ImageProcessor()
def detect(self, image: np.ndarray) -> MetricResult:
"""
Run noise pattern analysis
Arguments:
----------
image { np.ndarray } : RGB image array (H, W, 3)
Returns:
--------
{ MetricResult } : Structured Noise-domain metric result containing:
- score : Suspicion score [0.0, 1.0]
- confidence : Reliability of noise evidence
- details : Noise related diagnostics
"""
try:
logger.debug(f"Running noise analysis on image shape {image.shape}")
# Convert to luminance
luminance = self.image_processor.rgb_to_luminance(image = image)
# Extract patches
patches = self._extract_patches(luminance = luminance)
if (len(patches) == 0):
logger.warning("No patches extracted for noise analysis")
return MetricResult(metric_type = MetricType.NOISE,
score = NOISE_ANALYSIS_PARAMS.NEUTRAL_SCORE,
confidence = 0.0,
details = {"reason": "no_patches_extracted"},
)
# Estimate noise in each patch
noise_estimates, mad_values, laplacian_energy = self._estimate_noise_per_patch(patches = patches)
# Filter Noise Estimates, MAD and Laplacian Energy for finite values only
filtered_mask = np.isfinite(noise_estimates)
filtered_noise_estimates = noise_estimates[filtered_mask]
filtered_mad = mad_values[filtered_mask]
filtered_laplacian_energy = laplacian_energy[filtered_mask]
if (len(filtered_noise_estimates) < NOISE_ANALYSIS_PARAMS.MIN_ESTIMATES):
logger.debug("Insufficient valid noise estimates after filtering")
return MetricResult(metric_type = MetricType.NOISE,
score = NOISE_ANALYSIS_PARAMS.NEUTRAL_SCORE,
confidence = 0.0,
details = {"reason" : "insufficient_noise_estimates",
"patches_total" : int(len(patches)),
"patches_valid" : int(len(filtered_noise_estimates)),
},
)
logger.debug(f"Noise patches: total={len(patches)}, valid={len(filtered_noise_estimates)}")
# Analyze noise distribution
noise_score, noise_details = self._analyze_noise_distribution(noise_estimates = filtered_noise_estimates,
mad_values = filtered_mad,
laplacian_energy = filtered_laplacian_energy,
)
# Confidence: distance from neutral
confidence = float(np.clip((abs(noise_score - NOISE_ANALYSIS_PARAMS.NEUTRAL_SCORE) * 2.0), 0.0, 1.0))
logger.debug(f"Noise analysis: score={noise_score:.3f}, patches={len(patches)}, valid={len(filtered_noise_estimates)}")
return MetricResult(metric_type = MetricType.NOISE,
score = float(noise_score),
confidence = confidence,
details = {"patches_total" : int(len(patches)),
"patches_valid" : int(len(filtered_noise_estimates)),
**noise_details,
},
)
except Exception as e:
logger.error(f"Noise analysis failed: {e}")
# Return neutral score on error
return MetricResult(metric_type = MetricType.NOISE,
score = NOISE_ANALYSIS_PARAMS.NEUTRAL_SCORE,
confidence = 0.0,
details = {"error": "noise_analysis_failed"},
)
def _extract_patches(self, luminance: np.ndarray) -> np.ndarray:
"""
Extract patches from image for local noise estimation
Arguments:
----------
luminance { np.ndarray } : Luminance channel (H, W)
Returns:
--------
{ np.ndarray } : Array of patches
"""
patches = self.image_processor.extract_patches(image = luminance,
patch_size = NOISE_ANALYSIS_PARAMS.PATCH_SIZE,
stride = NOISE_ANALYSIS_PARAMS.STRIDE,
max_patches = NOISE_ANALYSIS_PARAMS.SAMPLES,
)
return patches
def _estimate_noise_per_patch(self, patches: np.ndarray) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Estimate noise variance in each patch using median absolute deviation
Uses Median Absolute Deviation (MAD) which is robust to edges/textures
Arguments:
----------
patches { np.ndarray } : Array of image patches (N, patch_size, patch_size)
Returns:
--------
{ tuple } : A tuple containing
- Array of noise estimates per patch
- Array of MAD values
- Array of Laplacian Energy Values
"""
noise_estimates = list()
mad_values = list()
laplacian_energy_values = list()
for patch in patches:
# Skip patches with too much structure (edges, textures)
variance = np.var(patch)
if (variance < NOISE_ANALYSIS_PARAMS.VARIANCE_LOW_THRESHOLD):
# Too uniform, skip
continue
if (variance > NOISE_ANALYSIS_PARAMS.VARIANCE_HIGH_THRESHOLD):
# Too much structure, skip
continue
# Use Median Absolute Deviation for robust noise estimation
laplacian = self._apply_laplacian(patch = patch)
mad = np.median(np.abs(laplacian - np.median(laplacian)))
# Convert MAD to noise standard deviation estimate: For Gaussian noise: σ ≈ 1.4826 × MAD
noise_std = NOISE_ANALYSIS_PARAMS.MAD_TO_STD_FACTOR * mad
# Calculate Laplacian Energy
lap_energy = float(np.mean(laplacian ** 2))
# Append corresponding values to their storages
mad_values.append(mad)
noise_estimates.append(noise_std)
laplacian_energy_values.append(lap_energy)
return np.array(noise_estimates), np.array(mad_values), np.array(laplacian_energy_values)
def _apply_laplacian(self, patch: np.ndarray) -> np.ndarray:
"""
Apply Laplacian filter to isolate high-frequency noise
Arguments:
----------
patch { np.ndarray } : Image patch
Returns:
--------
{ np.ndarray } : Laplacian-filtered patch
"""
# Simple 3x3 Laplacian kernel
kernel = np.array([[0, 1, 0],
[1, -4, 1],
[0, 1, 0]],
)
# Pad patch
padded = np.pad(patch, 1, mode = 'reflect')
# Apply convolution
h, w = patch.shape
result = np.zeros_like(patch)
for i in range(h):
for j in range(w):
region = padded[i:i+3, j:j+3]
result[i, j] = np.sum(region * kernel)
return result
def _analyze_noise_distribution(self, noise_estimates: np.ndarray, mad_values: np.ndarray, laplacian_energy: np.ndarray,) -> tuple[float, dict]:
"""
Analyze noise distribution for anomalies
Checks:
-------
1. Coefficient of variation (consistency)
2. Overall noise level (too low = suspicious)
3. Distribution shape (too uniform = suspicious)
Arguments:
----------
noise_estimates { np.ndarray } : Array of noise standard deviations
mad_values { np.ndarray } : Array of MAD values
laplacian_energy { np.ndarray } : Array of Laplacian Energy Values
Returns:
--------
{ tuple } : A tuple containing:
- Suspicion score [0.0, 1.0]
- Noise Distribution detailed diagnostics
"""
if (len(noise_estimates) < NOISE_ANALYSIS_PARAMS.MIN_ESTIMATES):
return (NOISE_ANALYSIS_PARAMS.NEUTRAL_SCORE,
{"reason": "insufficient_noise_samples"},
)
# Remove outliers (keep middle 80%)
q10 = np.percentile(noise_estimates, NOISE_ANALYSIS_PARAMS.OUTLIER_PERCENTILE_LOW)
q90 = np.percentile(noise_estimates, NOISE_ANALYSIS_PARAMS.OUTLIER_PERCENTILE_HIGH)
filtered = noise_estimates[(noise_estimates >= q10) & (noise_estimates <= q90)]
if (len(filtered) < NOISE_ANALYSIS_PARAMS.MIN_FILTERED_SAMPLES):
return (NOISE_ANALYSIS_PARAMS.NEUTRAL_SCORE,
{"reason": "insufficient_filtered_samples"},
)
mean_noise = np.mean(filtered)
std_noise = np.std(filtered)
# Coefficient of Variation (CV) Analysis
cv = std_noise / (mean_noise + 1e-10)
cv_anomaly = 0.0
if (cv < NOISE_ANALYSIS_PARAMS.CV_UNIFORM_THRESHOLD):
# Too uniform
cv_anomaly = (NOISE_ANALYSIS_PARAMS.CV_UNIFORM_THRESHOLD - cv) * NOISE_ANALYSIS_PARAMS.CV_UNIFORM_SCALE
elif (cv > NOISE_ANALYSIS_PARAMS.CV_VARIABLE_THRESHOLD):
# Too variable
cv_anomaly = min(1.0, (cv - NOISE_ANALYSIS_PARAMS.CV_VARIABLE_THRESHOLD) * NOISE_ANALYSIS_PARAMS.CV_VARIABLE_SCALE)
# Overall noise level Analysis
noise_level_anomaly = 0.0
if (mean_noise < NOISE_ANALYSIS_PARAMS.LEVEL_CLEAN_THRESHOLD):
# Too clean
noise_level_anomaly = (NOISE_ANALYSIS_PARAMS.LEVEL_CLEAN_THRESHOLD - mean_noise) / NOISE_ANALYSIS_PARAMS.LEVEL_CLEAN_THRESHOLD
elif (mean_noise < NOISE_ANALYSIS_PARAMS.LEVEL_LOW_THRESHOLD):
# Slightly low
noise_level_anomaly = (NOISE_ANALYSIS_PARAMS.LEVEL_LOW_THRESHOLD - mean_noise) / NOISE_ANALYSIS_PARAMS.LEVEL_LOW_THRESHOLD * 0.5
# Distribution shape Analysis
q25 = np.percentile(filtered, NOISE_ANALYSIS_PARAMS.IQR_PERCENTILE_LOW)
q75 = np.percentile(filtered, NOISE_ANALYSIS_PARAMS.IQR_PERCENTILE_HIGH)
iqr = q75 - q25
iqr_ratio = iqr / (mean_noise + 1e-10)
iqr_anomaly = 0.0
if (iqr_ratio < NOISE_ANALYSIS_PARAMS.IQR_THRESHOLD):
iqr_anomaly = (NOISE_ANALYSIS_PARAMS.IQR_THRESHOLD - iqr_ratio) * NOISE_ANALYSIS_PARAMS.IQR_SCALE
# Clip sub-anomalies for safety
cv_anomaly = np.clip(cv_anomaly, 0.0, 1.0)
noise_level_anomaly = np.clip(noise_level_anomaly, 0.0, 1.0)
iqr_anomaly = np.clip(iqr_anomaly, 0.0, 1.0)
# Combine scores
weights = NOISE_ANALYSIS_PARAMS.SUBMETRIC_WEIGHTS
combined_score = (weights['cv_anomaly'] * cv_anomaly + weights['noise_level_anomaly'] * noise_level_anomaly + weights['iqr_anomaly'] * iqr_anomaly)
final_score = float(np.clip(combined_score, 0.0, 1.0))
# Calculate Forensic Stats
mad_mean = float(np.mean(mad_values)) if len(mad_values) else 0.0
laplacian_energy_mu = float(np.mean(laplacian_energy)) if len(laplacian_energy) else 0.0
noise_details_dict = {"mean_noise" : float(mean_noise),
"std_noise" : float(std_noise),
"cv" : float(cv),
"cv_anomaly" : float(cv_anomaly),
"noise_level_anomaly" : float(noise_level_anomaly),
"iqr_ratio" : float(iqr_ratio),
"iqr_anomaly" : float(iqr_anomaly),
"mad_mean" : mad_mean,
"laplacian_energy" : laplacian_energy_mu,
}
logger.debug(f"Noise scores - CV: {cv:.3f}, mean: {mean_noise:.3f}, IQR ratio: {iqr_ratio:.3f}")
return final_score, noise_details_dict