|
|
""" |
|
|
Frequency-domain forensic helpers. |
|
|
""" |
|
|
|
|
|
import json |
|
|
|
|
|
|
|
|
def analyze_frequency_domain(input_str: str) -> str: |
|
|
""" |
|
|
Analyze DCT/FFT frequency domain features. |
|
|
|
|
|
Extracts comprehensive frequency domain features including: |
|
|
- DCT coefficient statistics (mean, std) |
|
|
- FFT radial profile statistics (mean, std, decay rate) |
|
|
- Frequency band energies (low, mid, high) |
|
|
- Peakiness metric for detecting upsampling artifacts |
|
|
""" |
|
|
image_path = input_str.strip() |
|
|
try: |
|
|
import numpy as np |
|
|
import cv2 |
|
|
from PIL import Image |
|
|
from scipy.fftpack import dct, fft2, fftshift |
|
|
from scipy import stats |
|
|
|
|
|
img = Image.open(image_path).convert("RGB") |
|
|
image = np.array(img) |
|
|
|
|
|
|
|
|
if len(image.shape) == 3: |
|
|
gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY) |
|
|
else: |
|
|
gray = image |
|
|
|
|
|
features = {} |
|
|
|
|
|
|
|
|
|
|
|
dct_map = dct(dct(gray.T, norm='ortho').T, norm='ortho') |
|
|
|
|
|
|
|
|
dct_coeffs = dct_map.flatten() |
|
|
dct_coeffs = dct_coeffs[1:] |
|
|
|
|
|
|
|
|
dct_abs = np.abs(dct_coeffs) |
|
|
features['dct_mean'] = float(np.mean(dct_abs)) |
|
|
features['dct_std'] = float(np.std(dct_abs)) |
|
|
|
|
|
|
|
|
|
|
|
f = fft2(gray.astype(np.float64)) |
|
|
fshift = fftshift(f) |
|
|
|
|
|
|
|
|
|
|
|
magnitude_spectrum = 20 * np.log10(np.abs(fshift) + 1e-10) |
|
|
|
|
|
|
|
|
|
|
|
h, w = magnitude_spectrum.shape |
|
|
cy, cx = h // 2, w // 2 |
|
|
y, x = np.ogrid[-cy:h-cy, -cx:w-cx] |
|
|
r = np.sqrt(x**2 + y**2) |
|
|
r = r.astype(int) |
|
|
|
|
|
|
|
|
tbin = np.bincount(r.ravel(), magnitude_spectrum.ravel()) |
|
|
nr = np.bincount(r.ravel()) |
|
|
radial_profile = tbin / np.maximum(nr, 1) |
|
|
|
|
|
|
|
|
if len(radial_profile) > 1: |
|
|
radial_profile_nonzero = radial_profile[1:] |
|
|
else: |
|
|
radial_profile_nonzero = radial_profile |
|
|
|
|
|
|
|
|
features['fft_radial_mean'] = float(np.mean(radial_profile_nonzero)) |
|
|
features['fft_radial_std'] = float(np.std(radial_profile_nonzero)) |
|
|
|
|
|
|
|
|
|
|
|
n = len(radial_profile_nonzero) |
|
|
if n >= 3: |
|
|
|
|
|
|
|
|
radii = np.arange(1, n + 1, dtype=np.float64) |
|
|
|
|
|
log_radii = np.log(radii + 1e-10) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
slope, intercept, r_value, p_value, std_err = stats.linregress( |
|
|
log_radii, radial_profile_nonzero |
|
|
) |
|
|
features['fft_radial_decay'] = float(slope) |
|
|
features['fft_radial_decay_r2'] = float(r_value**2) |
|
|
except: |
|
|
|
|
|
features['fft_radial_decay'] = float( |
|
|
radial_profile_nonzero[0] - radial_profile_nonzero[-1] |
|
|
) |
|
|
features['fft_radial_decay_r2'] = 0.0 |
|
|
else: |
|
|
features['fft_radial_decay'] = 0.0 |
|
|
features['fft_radial_decay_r2'] = 0.0 |
|
|
|
|
|
|
|
|
if n >= 9: |
|
|
|
|
|
edges = np.linspace(0, n, 4, dtype=int) |
|
|
low_band = radial_profile_nonzero[edges[0]:edges[1]] |
|
|
mid_band = radial_profile_nonzero[edges[1]:edges[2]] |
|
|
high_band = radial_profile_nonzero[edges[2]:edges[3]] |
|
|
|
|
|
features['fft_low_energy'] = float(np.mean(low_band)) |
|
|
features['fft_mid_energy'] = float(np.mean(mid_band)) |
|
|
features['fft_high_energy'] = float(np.mean(high_band)) |
|
|
|
|
|
|
|
|
|
|
|
profile_mean = np.mean(radial_profile_nonzero) |
|
|
profile_max = np.max(radial_profile_nonzero) |
|
|
features['fft_peakiness'] = float(profile_max / (profile_mean + 1e-10)) |
|
|
else: |
|
|
|
|
|
features['fft_low_energy'] = 0.0 |
|
|
features['fft_mid_energy'] = 0.0 |
|
|
features['fft_high_energy'] = 0.0 |
|
|
features['fft_peakiness'] = 0.0 |
|
|
|
|
|
result = { |
|
|
"tool": "analyze_frequency_domain", |
|
|
"status": "completed", |
|
|
**features, |
|
|
} |
|
|
|
|
|
return json.dumps(result) |
|
|
except Exception as e: |
|
|
return json.dumps( |
|
|
{ |
|
|
"tool": "analyze_frequency_domain", |
|
|
"status": "error", |
|
|
"error": str(e), |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
__all__ = ["analyze_frequency_domain"] |
|
|
|
|
|
|
|
|
|