Spaces:
Running
Running
| # Dependencies | |
| import numpy as np | |
| from scipy import fft | |
| from utils.logger import get_logger | |
| from config.schemas import MetricResult | |
| from config.constants import MetricType | |
| from utils.image_processor import ImageProcessor | |
| from config.constants import FREQUENCY_ANALYSIS_PARAMS | |
| # Suppress NumPy warning | |
| np.seterr(divide = 'ignore', | |
| invalid = 'ignore', | |
| ) | |
| # Setup Logging | |
| logger = get_logger(__name__) | |
| class FrequencyAnalyzer: | |
| """ | |
| FFT-based frequency domain analysis for AI detection | |
| Core principle: | |
| --------------- | |
| - Real photos : Smooth frequency falloff (natural optical blur) | |
| - AI images : Unnatural frequency spikes or gaps (artifacts from generation) | |
| Method: | |
| ------- | |
| 1. Convert to luminance | |
| 2. Compute 2D FFT | |
| 3. Compute radial frequency spectrum | |
| 4. Analyze high-frequency content and distribution patterns | |
| """ | |
| def __init__(self): | |
| self.image_processor = ImageProcessor() | |
| def detect(self, image: np.ndarray) -> MetricResult: | |
| """ | |
| Run frequency domain analysis | |
| Arguments: | |
| ---------- | |
| image { np.ndarray } : RGB image array (H, W, 3) | |
| Returns: | |
| -------- | |
| { MetricResult } : Structured frequency-domain metric result containing: | |
| - score : Suspicion score [0.0, 1.0] | |
| - confidence : Reliability of frequency evidence | |
| - details : FFT and spectrum diagnostics | |
| """ | |
| try: | |
| logger.debug(f"Running frequency analysis on image shape {image.shape}") | |
| # Convert to luminance | |
| luminance = self.image_processor.rgb_to_luminance(image = image) | |
| # Normalize luminance (remove DC component for FFT stability) | |
| normalized_luminance = luminance - np.mean(luminance) | |
| if not np.any(normalized_luminance): | |
| logger.debug("FFT skipped: zero-variance luminance") | |
| return MetricResult(metric_type = MetricType.FREQUENCY, | |
| score = FREQUENCY_ANALYSIS_PARAMS.NEUTRAL_SCORE, | |
| confidence = 0.0, | |
| details = {"reason": "zero_variance_luminance"} | |
| ) | |
| # Compute FFT on normalized_luminance | |
| fft_magnitude = self._compute_fft_magnitude(luminance = normalized_luminance) | |
| # Analyze radial frequency spectrum | |
| radial_spectrum = self._compute_radial_spectrum(fft_magnitude = fft_magnitude) | |
| # Detect anomalies | |
| anomaly_score, freq_details = self._analyze_frequency_anomalies(radial_spectrum = radial_spectrum) | |
| logger.debug(f"Frequency analysis: Anomaly Score={anomaly_score:.3f}") | |
| # Distance from neutral = stronger evidence = higher confidence | |
| confidence = float(np.clip((abs(anomaly_score - FREQUENCY_ANALYSIS_PARAMS.NEUTRAL_SCORE) * 2.0), 0.0, 1.0)) | |
| return MetricResult(metric_type = MetricType.FREQUENCY, | |
| score = float(anomaly_score), | |
| confidence = confidence, | |
| details = {"spectrum_bins" : int(len(radial_spectrum)), | |
| **freq_details, | |
| } | |
| ) | |
| except Exception as e: | |
| logger.error(f"Frequency analysis failed: {e}") | |
| # Return neutral score on error | |
| return MetricResult(metric_type = MetricType.FREQUENCY, | |
| score = FREQUENCY_ANALYSIS_PARAMS.NEUTRAL_SCORE, | |
| confidence = 0.0, | |
| details = {"error" : "frequency_analysis_failed"}, | |
| ) | |
| def _compute_fft_magnitude(self, luminance: np.ndarray) -> np.ndarray: | |
| """ | |
| Compute 2D FFT magnitude spectrum | |
| Arguments: | |
| ---------- | |
| luminance { np.ndarray } : Luminance channel (H, W) | |
| Returns: | |
| -------- | |
| { np.ndarray } : FFT magnitude spectrum (centered) | |
| """ | |
| # Compute 2D FFT | |
| f = fft.fft2(luminance) | |
| # Shift zero frequency to center | |
| f_shifted = fft.fftshift(f) | |
| # Compute magnitude spectrum | |
| magnitude = np.abs(f_shifted) | |
| # Log scale for better visualization | |
| magnitude_log = np.log1p(magnitude) | |
| return magnitude_log | |
| def _compute_radial_spectrum(self, fft_magnitude: np.ndarray) -> np.ndarray: | |
| """ | |
| Compute radial average of frequency spectrum | |
| Arguments: | |
| ---------- | |
| fft_magnitude { np.ndarray } : FFT magnitude spectrum | |
| Returns: | |
| -------- | |
| { np.ndarray } : Radial spectrum (1D array) | |
| """ | |
| h, w = fft_magnitude.shape | |
| center_y, center_x = h // 2, w // 2 | |
| # Create coordinate grids | |
| y, x = np.ogrid[:h, :w] | |
| # Compute radial distances from center | |
| r = np.sqrt((x - center_x)**2 + (y - center_y)**2).astype(int) | |
| # Maximum radius | |
| max_radius = min(center_x, center_y) | |
| # Compute radial bins | |
| bins = np.linspace(0, max_radius, FREQUENCY_ANALYSIS_PARAMS.BINS + 1) | |
| radial_spectrum = np.zeros(FREQUENCY_ANALYSIS_PARAMS.BINS) | |
| # Average magnitude in each radial bin | |
| for i in range(FREQUENCY_ANALYSIS_PARAMS.BINS): | |
| mask = (r >= bins[i]) & (r < bins[i + 1]) | |
| if np.any(mask): | |
| radial_spectrum[i] = np.mean(fft_magnitude[mask]) | |
| return radial_spectrum | |
| def _analyze_frequency_anomalies(self, radial_spectrum: np.ndarray) -> tuple[float, dict]: | |
| """ | |
| Analyze frequency spectrum for AI generation artifacts | |
| Checks: | |
| ------- | |
| 1. High-frequency content (AI images often have unnatural HF energy) | |
| 2. Frequency distribution smoothness | |
| 3. Spectral slope deviation from natural images | |
| Arguments: | |
| ---------- | |
| radial_spectrum { np.ndarray } : Radial frequency spectrum | |
| Returns: | |
| -------- | |
| { tuple } : A tuple containing | |
| - Suspicion score [0.0, 1.0], and | |
| - frequency details in a dictionary | |
| """ | |
| if (len(radial_spectrum) < FREQUENCY_ANALYSIS_PARAMS.MIN_SPECTRUM_SAMPLES): | |
| return (FREQUENCY_ANALYSIS_PARAMS.NEUTRAL_SCORE, | |
| {"reason" : "insufficient_frequency_samples", | |
| "spectrum_bins" : int(len(radial_spectrum)), | |
| } | |
| ) | |
| # Normalize spectrum | |
| spectrum_norm = radial_spectrum / (np.max(radial_spectrum) + 1e-10) | |
| # High-frequency Energy Analysis | |
| high_freq_start = int(len(spectrum_norm) * FREQUENCY_ANALYSIS_PARAMS.HIGH_FREQ_THRESHOLD) | |
| if (high_freq_start >= len(spectrum_norm) - 1): | |
| return (FREQUENCY_ANALYSIS_PARAMS.NEUTRAL_SCORE, | |
| {"reason" : "invalid_frequency_partition"} | |
| ) | |
| high_freq_energy = np.mean(spectrum_norm[high_freq_start:]) | |
| low_freq_energy = np.mean(spectrum_norm[:high_freq_start]) | |
| hf_ratio = high_freq_energy / (low_freq_energy + 1e-10) | |
| # Natural images : HF ratio typically 0.1-0.3 | |
| # AI images : Can be higher (0.3-0.6) or lower (<0.1) | |
| hf_anomaly = 0.0 | |
| if (hf_ratio > FREQUENCY_ANALYSIS_PARAMS.HF_RATIO_UPPER): | |
| hf_anomaly = min(1.0, (hf_ratio - FREQUENCY_ANALYSIS_PARAMS.HF_RATIO_UPPER) * FREQUENCY_ANALYSIS_PARAMS.HF_UPPER_SCALE) | |
| elif (hf_ratio < FREQUENCY_ANALYSIS_PARAMS.HF_RATIO_LOWER): | |
| hf_anomaly = min(1.0, (FREQUENCY_ANALYSIS_PARAMS.HF_RATIO_LOWER - hf_ratio) * FREQUENCY_ANALYSIS_PARAMS.HF_LOWER_SCALE) | |
| # Spectral Smoothness Analysis | |
| spectral_diff = np.abs(np.diff(spectrum_norm)) | |
| roughness = np.mean(spectral_diff) | |
| roughness_score = np.clip(roughness * FREQUENCY_ANALYSIS_PARAMS.ROUGHNESS_SCALE, 0.0, 1.0) | |
| # Power Law Deviation Analysis | |
| x = np.arange(1, len(spectrum_norm) + 1) | |
| log_spectrum = np.log(spectrum_norm + 1e-10) | |
| log_x = np.log(x) | |
| # Linear fit in log-log space | |
| coeffs = np.polyfit(log_x, log_spectrum, 1) | |
| fitted = np.polyval(coeffs, log_x) | |
| deviation = np.mean(np.abs(log_spectrum - fitted)) | |
| deviation_score = np.clip(deviation * FREQUENCY_ANALYSIS_PARAMS.DEVIATION_SCALE, 0.0, 1.0) | |
| # Combine scores | |
| weights = FREQUENCY_ANALYSIS_PARAMS.SUBMETRIC_WEIGHTS | |
| combined_score = (weights['hf_anomaly'] * hf_anomaly + weights['roughness'] * roughness_score + weights['deviation'] * deviation_score) | |
| final_score = float(np.clip(combined_score, 0.0, 1.0)) | |
| frequency_dict = {"low_freq_energy" : float(low_freq_energy), | |
| "high_freq_energy" : float(high_freq_energy), | |
| "hf_ratio" : float(hf_ratio), | |
| "hf_anomaly" : float(hf_anomaly), | |
| "roughness" : float(roughness), | |
| "roughness_score" : float(roughness_score), | |
| "spectral_deviation" : float(deviation), | |
| "deviation_score" : float(deviation_score), | |
| "high_freq_start_bin" : int(high_freq_start), | |
| } | |
| logger.debug(f"FFT scores - HF anomaly: {hf_anomaly:.3f}, roughness: {roughness_score:.3f}, deviation: {deviation_score:.3f}") | |
| return (final_score, frequency_dict) | |