| """ | |
| Fake Image Detection Ensemble - Model Definitions | |
| 9 specialized models for detecting AI-generated/fake images | |
| """ | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| import numpy as np | |
| from scipy.ndimage import sobel | |
| from sklearn.svm import OneClassSVM | |
| from sklearn.ensemble import IsolationForest | |
| from sklearn.neighbors import LocalOutlierFactor | |
| from sklearn.mixture import GaussianMixture | |
| from sklearn.preprocessing import StandardScaler | |
| class EnhancedFreqVAE(nn.Module): | |
| """Enhanced Frequency-domain VAE with multi-scale analysis and attention""" | |
| def __init__(self, ld=256): | |
| super().__init__() | |
| self.enc = nn.Sequential( | |
| nn.Conv2d(3, 64, 4, 2, 1), nn.BatchNorm2d(64), nn.LeakyReLU(0.2), nn.Dropout2d(0.1), | |
| nn.Conv2d(64, 128, 4, 2, 1), nn.BatchNorm2d(128), nn.LeakyReLU(0.2), nn.Dropout2d(0.1), | |
| nn.Conv2d(128, 256, 4, 2, 1), nn.BatchNorm2d(256), nn.LeakyReLU(0.2), nn.Dropout2d(0.1), | |
| nn.Conv2d(256, 512, 4, 2, 1), nn.BatchNorm2d(512), nn.LeakyReLU(0.2), nn.Dropout2d(0.1), | |
| nn.Conv2d(512, 512, 4, 2, 1), nn.BatchNorm2d(512), nn.LeakyReLU(0.2), | |
| ) | |
| self.mu = nn.Linear(512*8*8, ld) | |
| self.lv = nn.Linear(512*8*8, ld) | |
| self.dec_fc = nn.Linear(ld, 512*8*8) | |
| self.dec = nn.Sequential( | |
| nn.ConvTranspose2d(512, 512, 4, 2, 1), nn.BatchNorm2d(512), nn.ReLU(), | |
| nn.ConvTranspose2d(512, 256, 4, 2, 1), nn.BatchNorm2d(256), nn.ReLU(), | |
| nn.ConvTranspose2d(256, 128, 4, 2, 1), nn.BatchNorm2d(128), nn.ReLU(), | |
| nn.ConvTranspose2d(128, 64, 4, 2, 1), nn.BatchNorm2d(64), nn.ReLU(), | |
| nn.ConvTranspose2d(64, 3, 4, 2, 1) | |
| ) | |
| def encode(self, x): | |
| xf = torch.fft.fft2(x) | |
| xf_mag = torch.log(torch.abs(xf) + 1e-8) | |
| xf_phase = torch.angle(xf) | |
| xf_combined = xf_mag * 0.8 + xf_phase * 0.2 | |
| h = self.enc(xf_combined).view(x.size(0), -1) | |
| return self.mu(h), self.lv(h) | |
| def forward(self, x): | |
| mu, lv = self.encode(x) | |
| z = mu + torch.randn_like(mu) * torch.exp(0.5*lv) | |
| return self.dec(self.dec_fc(z).view(x.size(0), 512, 8, 8)), mu, lv | |
| def score(self, img, dev): | |
| self.eval() | |
| img = img.to(dev) | |
| with torch.no_grad(): | |
| if img.dim()==3: img=img.unsqueeze(0) | |
| rc, mu, lv = self(img) | |
| xf = torch.fft.fft2(img) | |
| xf_mag = torch.log(torch.abs(xf) + 1e-8) | |
| xf_phase = torch.angle(xf) | |
| xf_combined = xf_mag * 0.8 + xf_phase * 0.2 | |
| recon = F.mse_loss(rc, xf_combined, reduction='sum') | |
| kl = -0.5 * torch.sum(1 + lv - mu.pow(2) - lv.exp()) | |
| return (recon + 0.15*kl).item() | |
| class EdgeNormalizingFlow(nn.Module): | |
| """Normalizing flow for edge probability density""" | |
| def __init__(self, feature_dim=32): | |
| super().__init__() | |
| self.feature_dim = feature_dim | |
| self.flows = nn.ModuleList([ | |
| nn.Sequential( | |
| nn.Linear(feature_dim, feature_dim*2), nn.ReLU(), | |
| nn.Linear(feature_dim*2, feature_dim*2), nn.ReLU(), | |
| nn.Linear(feature_dim*2, feature_dim) | |
| ) for _ in range(4) | |
| ]) | |
| self.base_mean = nn.Parameter(torch.zeros(feature_dim)) | |
| self.base_logstd = nn.Parameter(torch.zeros(feature_dim)) | |
| def extract_edge_features(self, img): | |
| if torch.is_tensor(img): | |
| im = img.permute(1,2,0).cpu().numpy() | |
| im = im*np.array([0.229,0.224,0.225]) + np.array([0.485,0.456,0.406]) | |
| im = np.clip(im, 0, 1) | |
| else: | |
| im = np.array(img) | |
| gray = np.mean(im, 2) | |
| ex, ey = sobel(gray, 0), sobel(gray, 1) | |
| em = np.sqrt(ex**2 + ey**2) | |
| features = [] | |
| for scale in [1, 2, 4, 8]: | |
| if scale > 1: | |
| scaled = gray[::scale, ::scale] | |
| ex_s, ey_s = sobel(scaled, 0), sobel(scaled, 1) | |
| em_s = np.sqrt(ex_s**2 + ey_s**2) | |
| else: | |
| em_s = em | |
| features.extend([ | |
| np.mean(em_s), np.std(em_s), np.max(em_s), | |
| np.percentile(em_s, 50), np.percentile(em_s, 75), | |
| np.percentile(em_s, 90), np.percentile(em_s, 95), | |
| np.sum(em_s > 0.1) / em_s.size | |
| ]) | |
| return torch.tensor(features[:self.feature_dim], dtype=torch.float32) | |
| def forward(self, x): | |
| log_det = 0 | |
| for flow in self.flows: | |
| x = x + flow(x) | |
| return x, log_det | |
| def log_prob(self, x): | |
| z, log_det = self.forward(x) | |
| log_pz = -0.5 * torch.sum((z - self.base_mean)**2 / torch.exp(2*self.base_logstd) + 2*self.base_logstd, dim=-1) | |
| return log_pz + log_det | |
| def score(self, img, dev): | |
| self.eval() | |
| self.to(dev) | |
| with torch.no_grad(): | |
| feat = self.extract_edge_features(img).unsqueeze(0).to(dev) | |
| return -self.log_prob(feat).item() | |
| class SemanticDeepSVDD(nn.Module): | |
| """Deep SVDD with semantic features from ResNet""" | |
| def __init__(self): | |
| super().__init__() | |
| from torchvision.models import resnet50 | |
| resnet = resnet50(weights='IMAGENET1K_V1') | |
| self.features = nn.Sequential(*list(resnet.children())[:-1]) | |
| for i, param in enumerate(self.features.parameters()): | |
| param.requires_grad = (i >= 100) | |
| self.proj = nn.Sequential( | |
| nn.Flatten(), | |
| nn.Linear(2048, 1024), nn.BatchNorm1d(1024), nn.ReLU(), nn.Dropout(0.4), | |
| nn.Linear(1024, 512), nn.BatchNorm1d(512), nn.ReLU(), nn.Dropout(0.3), | |
| nn.Linear(512, 256) | |
| ) | |
| self.center = None | |
| def forward(self, x): | |
| return self.proj(self.features(x)) | |
| def score(self, img, dev): | |
| self.eval() | |
| img = img.to(dev) | |
| with torch.no_grad(): | |
| if img.dim()==3: img=img.unsqueeze(0) | |
| return torch.sum((self(img) - self.center)**2, 1).mean().item() | |
| class Ensemble: | |
| """9-model ensemble with adaptive threshold""" | |
| def __init__(self, models_dict): | |
| self.models = models_dict | |
| self.wts = { | |
| 'freq_vae': 0.18, | |
| 'texture_ocsvm': 0.13, | |
| 'color_model': 0.09, | |
| 'edge_flow': 0.13, | |
| 'semantic_svdd': 0.17, | |
| 'stat': 0.09, | |
| 'iforest': 0.09, | |
| 'lof': 0.07, | |
| 'gmm': 0.05 | |
| } | |
| self.norms = None | |
| self.thresh = 0.0 | |
| def get_scores(self, img, dev): | |
| return { | |
| 'freq_vae': self.models['freq_vae'].score(img, dev), | |
| 'texture_ocsvm': self.models['texture_ocsvm'].score(img), | |
| 'color_model': self.models['color_model'].score(img), | |
| 'edge_flow': self.models['edge_flow'].score(img, dev), | |
| 'semantic_svdd': self.models['semantic_svdd'].score(img, dev), | |
| 'stat': self.models['stat'].score(img), | |
| 'iforest': self.models['iforest'].score(img), | |
| 'lof': self.models['lof'].score(img), | |
| 'gmm': self.models['gmm'].score(img) | |
| } | |
| def predict(self, img, dev): | |
| sc = self.get_scores(img, dev) | |
| nsc = {k: (sc[k]-self.norms[k]['mean'])/(self.norms[k]['std']+1e-8) | |
| for k in sc.keys()} | |
| final = sum(self.wts[k]*nsc[k] for k in sc.keys()) | |
| return final > self.thresh, final, sc | |