Spaces:
Sleeping
Sleeping
File size: 10,675 Bytes
e7f1d57 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 |
# Dependencies
import numpy as np
from scipy import fft
from utils.logger import get_logger
from config.schemas import MetricResult
from config.constants import MetricType
from utils.image_processor import ImageProcessor
from config.constants import FREQUENCY_ANALYSIS_PARAMS
# Suppress NumPy warning
np.seterr(divide = 'ignore',
invalid = 'ignore',
)
# Setup Logging
logger = get_logger(__name__)
class FrequencyAnalyzer:
"""
FFT-based frequency domain analysis for AI detection
Core principle:
---------------
- Real photos : Smooth frequency falloff (natural optical blur)
- AI images : Unnatural frequency spikes or gaps (artifacts from generation)
Method:
-------
1. Convert to luminance
2. Compute 2D FFT
3. Compute radial frequency spectrum
4. Analyze high-frequency content and distribution patterns
"""
def __init__(self):
self.image_processor = ImageProcessor()
def detect(self, image: np.ndarray) -> MetricResult:
"""
Run frequency domain analysis
Arguments:
----------
image { np.ndarray } : RGB image array (H, W, 3)
Returns:
--------
{ MetricResult } : Structured frequency-domain metric result containing:
- score : Suspicion score [0.0, 1.0]
- confidence : Reliability of frequency evidence
- details : FFT and spectrum diagnostics
"""
try:
logger.debug(f"Running frequency analysis on image shape {image.shape}")
# Convert to luminance
luminance = self.image_processor.rgb_to_luminance(image = image)
# Normalize luminance (remove DC component for FFT stability)
normalized_luminance = luminance - np.mean(luminance)
if not np.any(normalized_luminance):
logger.debug("FFT skipped: zero-variance luminance")
return MetricResult(metric_type = MetricType.FREQUENCY,
score = FREQUENCY_ANALYSIS_PARAMS.NEUTRAL_SCORE,
confidence = 0.0,
details = {"reason": "zero_variance_luminance"}
)
# Compute FFT on normalized_luminance
fft_magnitude = self._compute_fft_magnitude(luminance = normalized_luminance)
# Analyze radial frequency spectrum
radial_spectrum = self._compute_radial_spectrum(fft_magnitude = fft_magnitude)
# Detect anomalies
anomaly_score, freq_details = self._analyze_frequency_anomalies(radial_spectrum = radial_spectrum)
logger.debug(f"Frequency analysis: Anomaly Score={anomaly_score:.3f}")
# Distance from neutral = stronger evidence = higher confidence
confidence = float(np.clip((abs(anomaly_score - FREQUENCY_ANALYSIS_PARAMS.NEUTRAL_SCORE) * 2.0), 0.0, 1.0))
return MetricResult(metric_type = MetricType.FREQUENCY,
score = float(anomaly_score),
confidence = confidence,
details = {"spectrum_bins" : int(len(radial_spectrum)),
**freq_details,
}
)
except Exception as e:
logger.error(f"Frequency analysis failed: {e}")
# Return neutral score on error
return MetricResult(metric_type = MetricType.FREQUENCY,
score = FREQUENCY_ANALYSIS_PARAMS.NEUTRAL_SCORE,
confidence = 0.0,
details = {"error" : "frequency_analysis_failed"},
)
def _compute_fft_magnitude(self, luminance: np.ndarray) -> np.ndarray:
"""
Compute 2D FFT magnitude spectrum
Arguments:
----------
luminance { np.ndarray } : Luminance channel (H, W)
Returns:
--------
{ np.ndarray } : FFT magnitude spectrum (centered)
"""
# Compute 2D FFT
f = fft.fft2(luminance)
# Shift zero frequency to center
f_shifted = fft.fftshift(f)
# Compute magnitude spectrum
magnitude = np.abs(f_shifted)
# Log scale for better visualization
magnitude_log = np.log1p(magnitude)
return magnitude_log
def _compute_radial_spectrum(self, fft_magnitude: np.ndarray) -> np.ndarray:
"""
Compute radial average of frequency spectrum
Arguments:
----------
fft_magnitude { np.ndarray } : FFT magnitude spectrum
Returns:
--------
{ np.ndarray } : Radial spectrum (1D array)
"""
h, w = fft_magnitude.shape
center_y, center_x = h // 2, w // 2
# Create coordinate grids
y, x = np.ogrid[:h, :w]
# Compute radial distances from center
r = np.sqrt((x - center_x)**2 + (y - center_y)**2).astype(int)
# Maximum radius
max_radius = min(center_x, center_y)
# Compute radial bins
bins = np.linspace(0, max_radius, FREQUENCY_ANALYSIS_PARAMS.BINS + 1)
radial_spectrum = np.zeros(FREQUENCY_ANALYSIS_PARAMS.BINS)
# Average magnitude in each radial bin
for i in range(FREQUENCY_ANALYSIS_PARAMS.BINS):
mask = (r >= bins[i]) & (r < bins[i + 1])
if np.any(mask):
radial_spectrum[i] = np.mean(fft_magnitude[mask])
return radial_spectrum
def _analyze_frequency_anomalies(self, radial_spectrum: np.ndarray) -> tuple[float, dict]:
"""
Analyze frequency spectrum for AI generation artifacts
Checks:
-------
1. High-frequency content (AI images often have unnatural HF energy)
2. Frequency distribution smoothness
3. Spectral slope deviation from natural images
Arguments:
----------
radial_spectrum { np.ndarray } : Radial frequency spectrum
Returns:
--------
{ tuple } : A tuple containing
- Suspicion score [0.0, 1.0], and
- frequency details in a dictionary
"""
if (len(radial_spectrum) < FREQUENCY_ANALYSIS_PARAMS.MIN_SPECTRUM_SAMPLES):
return (FREQUENCY_ANALYSIS_PARAMS.NEUTRAL_SCORE,
{"reason" : "insufficient_frequency_samples",
"spectrum_bins" : int(len(radial_spectrum)),
}
)
# Normalize spectrum
spectrum_norm = radial_spectrum / (np.max(radial_spectrum) + 1e-10)
# High-frequency Energy Analysis
high_freq_start = int(len(spectrum_norm) * FREQUENCY_ANALYSIS_PARAMS.HIGH_FREQ_THRESHOLD)
if (high_freq_start >= len(spectrum_norm) - 1):
return (FREQUENCY_ANALYSIS_PARAMS.NEUTRAL_SCORE,
{"reason" : "invalid_frequency_partition"}
)
high_freq_energy = np.mean(spectrum_norm[high_freq_start:])
low_freq_energy = np.mean(spectrum_norm[:high_freq_start])
hf_ratio = high_freq_energy / (low_freq_energy + 1e-10)
# Natural images : HF ratio typically 0.1-0.3
# AI images : Can be higher (0.3-0.6) or lower (<0.1)
hf_anomaly = 0.0
if (hf_ratio > FREQUENCY_ANALYSIS_PARAMS.HF_RATIO_UPPER):
hf_anomaly = min(1.0, (hf_ratio - FREQUENCY_ANALYSIS_PARAMS.HF_RATIO_UPPER) * FREQUENCY_ANALYSIS_PARAMS.HF_UPPER_SCALE)
elif (hf_ratio < FREQUENCY_ANALYSIS_PARAMS.HF_RATIO_LOWER):
hf_anomaly = min(1.0, (FREQUENCY_ANALYSIS_PARAMS.HF_RATIO_LOWER - hf_ratio) * FREQUENCY_ANALYSIS_PARAMS.HF_LOWER_SCALE)
# Spectral Smoothness Analysis
spectral_diff = np.abs(np.diff(spectrum_norm))
roughness = np.mean(spectral_diff)
roughness_score = np.clip(roughness * FREQUENCY_ANALYSIS_PARAMS.ROUGHNESS_SCALE, 0.0, 1.0)
# Power Law Deviation Analysis
x = np.arange(1, len(spectrum_norm) + 1)
log_spectrum = np.log(spectrum_norm + 1e-10)
log_x = np.log(x)
# Linear fit in log-log space
coeffs = np.polyfit(log_x, log_spectrum, 1)
fitted = np.polyval(coeffs, log_x)
deviation = np.mean(np.abs(log_spectrum - fitted))
deviation_score = np.clip(deviation * FREQUENCY_ANALYSIS_PARAMS.DEVIATION_SCALE, 0.0, 1.0)
# Combine scores
weights = FREQUENCY_ANALYSIS_PARAMS.SUBMETRIC_WEIGHTS
combined_score = (weights['hf_anomaly'] * hf_anomaly + weights['roughness'] * roughness_score + weights['deviation'] * deviation_score)
final_score = float(np.clip(combined_score, 0.0, 1.0))
frequency_dict = {"low_freq_energy" : float(low_freq_energy),
"high_freq_energy" : float(high_freq_energy),
"hf_ratio" : float(hf_ratio),
"hf_anomaly" : float(hf_anomaly),
"roughness" : float(roughness),
"roughness_score" : float(roughness_score),
"spectral_deviation" : float(deviation),
"deviation_score" : float(deviation_score),
"high_freq_start_bin" : int(high_freq_start),
}
logger.debug(f"FFT scores - HF anomaly: {hf_anomaly:.3f}, roughness: {roughness_score:.3f}, deviation: {deviation_score:.3f}")
return (final_score, frequency_dict)
|