Spaces:
Running
Running
| """ | |
| image_functions.py (OPTIMIZED) | |
| Functions for computing and processing image statistics for synthetic image detection - 64 Features | |
| """ | |
| import numpy as np | |
| import cv2 | |
| from skimage.measure import shannon_entropy | |
| from scipy.stats import skew, kurtosis | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| def preprocess_color_spaces(img_array): | |
| """Converts image to all required color spaces once. | |
| Args: | |
| img_array: RGB image array (uint8, 0-255) | |
| Returns: | |
| Dictionary with pre-converted color spaces | |
| """ | |
| return { | |
| 'rgb': img_array, | |
| 'hsv': cv2.cvtColor(img_array, cv2.COLOR_RGB2HSV), | |
| 'ycbcr': cv2.cvtColor(img_array, cv2.COLOR_RGB2YCrCb), | |
| 'gray': cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) | |
| } | |
| def hsv_features(hsv_array): | |
| """Computes HSV color space features.""" | |
| h_mean = np.mean(hsv_array[:, :, 0]) | |
| h_var = np.var(hsv_array[:, :, 0]) | |
| s_mean = np.mean(hsv_array[:, :, 1]) | |
| s_var = np.var(hsv_array[:, :, 1]) | |
| v_mean = np.mean(hsv_array[:, :, 2]) | |
| v_var = np.var(hsv_array[:, :, 2]) | |
| return { | |
| 'h_mean': h_mean, 'h_var': h_var, | |
| 's_mean': s_mean, 's_var': s_var, | |
| 'v_mean': v_mean, 'v_var': v_var | |
| } | |
| def ycbcr_basic_features(ycbcr_array): | |
| """Computes basic YCbCr statistics in one pass. | |
| Combines mean, variance, and correlation to minimize passes over data. | |
| """ | |
| # Flatten channels once | |
| y_flat = ycbcr_array[:, :, 0].ravel() | |
| cb_flat = ycbcr_array[:, :, 1].ravel() | |
| cr_flat = ycbcr_array[:, :, 2].ravel() | |
| # Compute all basic stats in one go | |
| y_mean = np.mean(y_flat) | |
| y_var = np.var(y_flat) | |
| cb_mean = np.mean(cb_flat) | |
| cb_var = np.var(cb_flat) | |
| cr_mean = np.mean(cr_flat) | |
| cr_var = np.var(cr_flat) | |
| # Correlations | |
| cbcr_corr = np.corrcoef(cb_flat, cr_flat)[0, 1] | |
| y_cb_corr = np.corrcoef(y_flat, cb_flat)[0, 1] | |
| y_cr_corr = np.corrcoef(y_flat, cr_flat)[0, 1] | |
| return { | |
| 'y_mean': y_mean, 'y_var': y_var, | |
| 'cb_mean': cb_mean, 'cb_var': cb_var, | |
| 'cr_mean': cr_mean, 'cr_var': cr_var, | |
| 'cbcr_corr': cbcr_corr, | |
| 'y_cb_corr': y_cb_corr, | |
| 'y_cr_corr': y_cr_corr | |
| } | |
| def ycbcr_higher_moments(ycbcr_array): | |
| """Computes skewness, kurtosis, median, MAD for chrominance channels.""" | |
| cb_flat = ycbcr_array[:, :, 1].ravel() | |
| cr_flat = ycbcr_array[:, :, 2].ravel() | |
| y_flat = ycbcr_array[:, :, 0].ravel() | |
| # Chrominance higher moments | |
| cb_skew = skew(cb_flat) | |
| cb_kurt = kurtosis(cb_flat) | |
| cr_skew = skew(cr_flat) | |
| cr_kurt = kurtosis(cr_flat) | |
| # Median | |
| cb_median = np.median(cb_flat) | |
| cr_median = np.median(cr_flat) | |
| y_median = np.median(y_flat) | |
| # MAD (reuse already computed means from basic features) | |
| cb_mean = np.mean(cb_flat) | |
| cr_mean = np.mean(cr_flat) | |
| cb_mad = np.mean(np.abs(cb_flat - cb_mean)) | |
| cr_mad = np.mean(np.abs(cr_flat - cr_mean)) | |
| # Range | |
| cb_range = np.ptp(ycbcr_array[:, :, 1]) | |
| cr_range = np.ptp(ycbcr_array[:, :, 2]) | |
| return { | |
| 'cb_skew': cb_skew, 'cb_kurt': cb_kurt, | |
| 'cr_skew': cr_skew, 'cr_kurt': cr_kurt, | |
| 'cb_median': cb_median, 'cr_median': cr_median, 'y_median': y_median, | |
| 'cb_mad': cb_mad, 'cr_mad': cr_mad, | |
| 'cb_range': cb_range, 'cr_range': cr_range | |
| } | |
| def ycbcr_entropy_features(ycbcr_array): | |
| """Computes entropy for Y, Cb, and Cr channels.""" | |
| return { | |
| 'y_entropy': shannon_entropy(ycbcr_array[:, :, 0]), | |
| 'cb_entropy': shannon_entropy(ycbcr_array[:, :, 1]), | |
| 'cr_entropy': shannon_entropy(ycbcr_array[:, :, 2]) | |
| } | |
| def variance_ratio_features(y_var, cb_var, cr_var): | |
| """Computes variance ratios between YCbCr channels. | |
| Args: | |
| y_var, cb_var, cr_var: Pre-computed variances | |
| """ | |
| eps = 1e-10 | |
| return { | |
| 'cb_y_var_ratio': cb_var / (y_var + eps), | |
| 'cr_y_var_ratio': cr_var / (y_var + eps), | |
| 'cb_cr_var_ratio': cb_var / (cr_var + eps) | |
| } | |
| def gradient_magnitude_features(ycbcr_array): | |
| """Computes gradient magnitude statistics for Cb and Cr channels.""" | |
| # Sobel gradients for Cb channel | |
| cb_grad_x = cv2.Sobel(ycbcr_array[:, :, 1], cv2.CV_64F, 1, 0, ksize=3) | |
| cb_grad_y = cv2.Sobel(ycbcr_array[:, :, 1], cv2.CV_64F, 0, 1, ksize=3) | |
| cb_grad_mag = np.sqrt(cb_grad_x**2 + cb_grad_y**2) | |
| # Sobel gradients for Cr channel | |
| cr_grad_x = cv2.Sobel(ycbcr_array[:, :, 2], cv2.CV_64F, 1, 0, ksize=3) | |
| cr_grad_y = cv2.Sobel(ycbcr_array[:, :, 2], cv2.CV_64F, 0, 1, ksize=3) | |
| cr_grad_mag = np.sqrt(cr_grad_x**2 + cr_grad_y**2) | |
| return { | |
| 'cb_grad_mean': np.mean(cb_grad_mag), | |
| 'cb_grad_std': np.std(cb_grad_mag), | |
| 'cr_grad_mean': np.mean(cr_grad_mag), | |
| 'cr_grad_std': np.std(cr_grad_mag) | |
| } | |
| def benford_law_features(gray_array, block_size=8, quantization_step=10): | |
| """Computes Benford's Law features on DCT coefficients. | |
| Args: | |
| gray_array: Grayscale image array (pre-converted) | |
| block_size: Size of DCT blocks (default 8x8) | |
| quantization_step: Quantization step for DCT coefficients | |
| """ | |
| h, w = gray_array.shape | |
| h = (h // block_size) * block_size | |
| w = (w // block_size) * block_size | |
| gray_array = gray_array[:h, :w] | |
| first_digits = [] | |
| for i in range(0, h, block_size): | |
| for j in range(0, w, block_size): | |
| block = gray_array[i:i+block_size, j:j+block_size].astype(np.float32) | |
| dct_block = cv2.dct(block) | |
| quantized = np.round(dct_block[1:, 1:] / quantization_step).flatten() | |
| abs_vals = np.abs(quantized[quantized != 0]) | |
| for val in abs_vals: | |
| val_str = str(int(abs(val))) | |
| if val_str and val_str[0] != '0': | |
| first_digits.append(int(val_str[0])) | |
| if len(first_digits) == 0: | |
| return { | |
| 'benford_ks_stat': 0.5, | |
| 'benford_mean_digit': 5.0, | |
| 'benford_digit_std': 0.0 | |
| } | |
| benford_theoretical = np.array([np.log10(1 + 1/d) for d in range(1, 10)]) | |
| observed_counts = np.array([np.sum(np.array(first_digits) == d) for d in range(1, 10)]) | |
| observed_freq = observed_counts / len(first_digits) | |
| ks_stat = np.max(np.abs(np.cumsum(observed_freq) - np.cumsum(benford_theoretical))) | |
| return { | |
| 'benford_ks_stat': ks_stat, | |
| 'benford_mean_digit': np.mean(first_digits), | |
| 'benford_digit_std': np.std(first_digits) | |
| } | |
| def saturation_clipping_features(rgb_array): | |
| """Computes saturation-clipping features (pixels at 0 and 255).""" | |
| total_pixels = rgb_array.shape[0] * rgb_array.shape[1] | |
| # Vectorized computation for all channels at once | |
| clip_low = np.sum(rgb_array == 0, axis=(0, 1)) / total_pixels * 100 | |
| clip_high = np.sum(rgb_array == 255, axis=(0, 1)) / total_pixels * 100 | |
| return { | |
| 'r_clip_low': clip_low[0], 'r_clip_high': clip_high[0], | |
| 'g_clip_low': clip_low[1], 'g_clip_high': clip_high[1], | |
| 'b_clip_low': clip_low[2], 'b_clip_high': clip_high[2] | |
| } | |
| def histogram_features(hsv_array, rgb_array): | |
| """Computes histogram-based features on HSV color space.""" | |
| h_flat = hsv_array[:, :, 0].ravel() | |
| return { | |
| 'entropy': shannon_entropy(rgb_array), | |
| 'skewness': skew(h_flat), | |
| 'kurtosis': kurtosis(h_flat) | |
| } | |
| def covariance_features(ycbcr_array): | |
| """Computes cross-channel covariance matrix for YCbCr (off-diagonal only).""" | |
| cov_matrix = np.cov(ycbcr_array.reshape(-1, 3).T) | |
| return { | |
| 'cov_01': cov_matrix[0, 1], | |
| 'cov_02': cov_matrix[0, 2], | |
| 'cov_12': cov_matrix[1, 2] | |
| } | |
| def color_entropy_feature(hsv_array): | |
| """Computes average color entropy across HSV channels.""" | |
| return { | |
| 'color_entropy': np.mean([shannon_entropy(hsv_array[:, :, i]) for i in range(3)]) | |
| } | |
| def residual_features(rgb_array, blur_kernel_size=5): | |
| """Computes residual-based features from high-frequency components.""" | |
| predicted_array = cv2.GaussianBlur(rgb_array, (blur_kernel_size, blur_kernel_size), 0) | |
| residual = rgb_array.astype(np.float32) - predicted_array.astype(np.float32) | |
| return { | |
| 'mean_res': np.mean(residual, axis=(0, 1)), | |
| 'var_res': np.var(residual, axis=(0, 1)) | |
| } | |
| def features_to_vector(features): | |
| """Converts feature dictionary to flat 1D vector for ML classifiers. | |
| Total features: 64 | |
| """ | |
| vector = [] | |
| scalar_keys = [ | |
| # HSV features (6) | |
| 'h_mean', 'h_var', 's_mean', 's_var', 'v_mean', 'v_var', | |
| # YCbCr basic statistics (9) | |
| 'y_mean', 'y_var', 'cb_mean', 'cb_var', 'cr_mean', 'cr_var', | |
| 'cbcr_corr', 'y_cb_corr', 'y_cr_corr', | |
| # Histogram features (4) | |
| 'entropy', 'skewness', 'kurtosis', 'color_entropy', | |
| # Higher-order moments (10) | |
| 'cb_skew', 'cb_kurt', 'cr_skew', 'cr_kurt', | |
| 'cb_median', 'cr_median', 'y_median', | |
| 'cb_mad', 'cr_mad', | |
| 'cb_range', 'cr_range', | |
| # Entropy features (3) | |
| 'cb_entropy', 'cr_entropy', 'y_entropy', | |
| # Variance ratios (3) | |
| 'cb_y_var_ratio', 'cr_y_var_ratio', 'cb_cr_var_ratio', | |
| # Gradient features (4) | |
| 'cb_grad_mean', 'cb_grad_std', 'cr_grad_mean', 'cr_grad_std', | |
| # Benford's Law features (3) | |
| 'benford_ks_stat', 'benford_mean_digit', 'benford_digit_std', | |
| # Saturation clipping features (6) | |
| 'r_clip_low', 'r_clip_high', 'g_clip_low', 'g_clip_high', | |
| 'b_clip_low', 'b_clip_high' | |
| ] | |
| for key in scalar_keys: | |
| if key in features: | |
| vector.append(features[key]) | |
| # Covariance off-diagonal (3) | |
| for key in ['cov_01', 'cov_02', 'cov_12']: | |
| if key in features: | |
| vector.append(features[key]) | |
| # Residual features (6) | |
| if 'mean_res' in features: | |
| vector.extend(features['mean_res']) | |
| if 'var_res' in features: | |
| vector.extend(features['var_res']) | |
| return np.array(vector) | |
| def extract_features_for_ml(img_array, blur_kernel_size=5): | |
| """Extracts all features and converts to ML-compatible vector. | |
| OPTIMIZED: Color space conversions done once at the beginning. | |
| Args: | |
| img_array: RGB image array (uint8, 0-255) | |
| blur_kernel_size: Kernel size for residual feature computation | |
| Returns: | |
| 1D numpy array with 64 features | |
| """ | |
| # Convert to all color spaces ONCE | |
| color_spaces = preprocess_color_spaces(img_array) | |
| features = {} | |
| # HSV-based features | |
| features.update(hsv_features(color_spaces['hsv'])) | |
| features.update(histogram_features(color_spaces['hsv'], color_spaces['rgb'])) | |
| features.update(color_entropy_feature(color_spaces['hsv'])) | |
| # YCbCr-based features (using pre-converted array) | |
| basic_ycbcr = ycbcr_basic_features(color_spaces['ycbcr']) | |
| features.update(basic_ycbcr) | |
| features.update(ycbcr_higher_moments(color_spaces['ycbcr'])) | |
| features.update(ycbcr_entropy_features(color_spaces['ycbcr'])) | |
| features.update(gradient_magnitude_features(color_spaces['ycbcr'])) | |
| features.update(covariance_features(color_spaces['ycbcr'])) | |
| # Variance ratios (using pre-computed variances) | |
| features.update(variance_ratio_features( | |
| basic_ycbcr['y_var'], | |
| basic_ycbcr['cb_var'], | |
| basic_ycbcr['cr_var'] | |
| )) | |
| # RGB-based features | |
| features.update(residual_features(color_spaces['rgb'], blur_kernel_size)) | |
| features.update(saturation_clipping_features(color_spaces['rgb'])) | |
| # Grayscale-based features | |
| features.update(benford_law_features(color_spaces['gray'])) | |
| return features_to_vector(features) | |
| def process_single_image(img_path): | |
| """Processes a single image for parallel processing. | |
| Args: | |
| img_path: Path to image file | |
| Returns: | |
| Feature vector (64 features) or None if error | |
| """ | |
| try: | |
| img_array = cv2.imread(str(img_path)) | |
| if img_array is None: | |
| return None | |
| img_array = cv2.cvtColor(img_array, cv2.COLOR_BGR2RGB) | |
| return extract_features_for_ml(img_array) | |
| except Exception as e: | |
| print(f"Error processing {img_path.name}: {e}") | |
| return None |