| """ |
| TruFor: AI-driven image forgery detection and localization. |
| |
| TruFor is an AI-driven solution for digital image forensics that combines |
| high-level RGB features with low-level noise-sensitive fingerprints (Noiseprint++) |
| through a transformer-based fusion architecture. |
| |
| Original TruFor Work: |
| Research Group of University Federico II of Naples ('GRIP-UNINA') |
| https://github.com/grip-unina/TruFor |
| |
| Reference Bibtex: |
| @InProceedings{Guillaro_2023_CVPR, |
| author = {Guillaro, Fabrizio and Cozzolino, Davide and Sud, Avneesh and Dufour, Nicholas and Verdoliva, Luisa}, |
| title = {TruFor: Leveraging All-Round Clues for Trustworthy Image Forgery Detection and Localization}, |
| booktitle = {Proceedings of the IEEE/CVF Conference on Computer Vision and Pattern Recognition (CVPR)}, |
| month = {June}, |
| year = {2023}, |
| pages = {20606-20615} |
| } |
| """ |
|
|
| import base64 |
| import json |
| import os |
| from io import BytesIO |
| from pathlib import Path |
| from typing import Tuple |
|
|
| import numpy as np |
| from PIL import Image |
|
|
| |
| |
| try: |
| import spaces |
| SPACES_AVAILABLE = True |
| except ImportError: |
| SPACES_AVAILABLE = False |
| spaces = None |
|
|
|
|
| def _parse_request(input_str: str) -> Tuple[str, int, bool]: |
| """ |
| Accept plain path or JSON payload: |
| { |
| "path": "/path/to/image.jpg", |
| "gpu": 0, # GPU device (-1 for CPU, 0+ for GPU) |
| "return_map": true # Include base64 PNG localization map |
| } |
| """ |
| default_gpu = 0 |
| default_return_map = False |
| try: |
| data = json.loads(input_str) |
| if isinstance(data, dict): |
| path = data.get("path", input_str).strip() |
| gpu = int(data.get("gpu", default_gpu)) |
| return_map = bool(data.get("return_map", default_return_map)) |
| return path, gpu, return_map |
| except Exception: |
| pass |
| return input_str.strip(), default_gpu, default_return_map |
|
|
|
|
| def _encode_png(arr: np.ndarray) -> str: |
| """Encode numpy array as base64 PNG.""" |
| |
| if arr.max() <= 1.0: |
| arr = (arr * 255).astype(np.uint8) |
| else: |
| arr = np.clip(arr, 0, 255).astype(np.uint8) |
| |
| img = Image.fromarray(arr, mode='L') |
| buf = BytesIO() |
| img.save(buf, format="PNG") |
| buf.seek(0) |
| b64 = base64.b64encode(buf.getvalue()).decode("utf-8") |
| return f"data:image/png;base64,{b64}" |
|
|
|
|
|
|
|
|
| def _perform_trufor_internal(image_path: str, gpu_device: int, want_map: bool) -> str: |
| """ |
| Internal function that performs TruFor analysis. |
| This is separated so it can be wrapped with @spaces.GPU for ZeroGPU support. |
| """ |
| |
| device = f'cuda:{gpu_device}' if gpu_device >= 0 else 'cpu' |
| |
| |
| workspace_root = Path(__file__).parent.parent.parent.parent |
| weights_path = workspace_root / "weights" / "trufor" / "trufor.pth.tar" |
| |
| |
| if not weights_path.exists(): |
| try: |
| from src.utils.weight_downloader import ensure_trufor_weights |
| success, message = ensure_trufor_weights(workspace_root=workspace_root, auto_download=True) |
| if not success: |
| return json.dumps({ |
| "tool": "perform_trufor", |
| "status": "error", |
| "error": f"TruFor weights not found and download failed.\n{message}", |
| }) |
| |
| if not weights_path.exists(): |
| return json.dumps({ |
| "tool": "perform_trufor", |
| "status": "error", |
| "error": f"TruFor weights still not found at {weights_path} after download attempt.", |
| }) |
| except ImportError: |
| |
| return json.dumps({ |
| "tool": "perform_trufor", |
| "status": "error", |
| "error": ( |
| f"TruFor weights not found at {weights_path}.\n" |
| f"Please download from: https://www.grip.unina.it/download/prog/TruFor/TruFor_weights.zip\n" |
| f"Extract and place at: {weights_path}" |
| ), |
| }) |
| |
| |
| if not weights_path.exists(): |
| try: |
| from src.utils.weight_downloader import ensure_trufor_weights |
| success, message = ensure_trufor_weights(workspace_root=workspace_root, auto_download=True) |
| if not success: |
| return json.dumps({ |
| "tool": "perform_trufor", |
| "status": "error", |
| "error": f"TruFor weights not found and download failed.\n{message}", |
| }) |
| |
| if not weights_path.exists(): |
| return json.dumps({ |
| "tool": "perform_trufor", |
| "status": "error", |
| "error": f"TruFor weights still not found at {weights_path} after download attempt.", |
| }) |
| except ImportError: |
| |
| return json.dumps({ |
| "tool": "perform_trufor", |
| "status": "error", |
| "error": ( |
| f"TruFor weights not found at {weights_path}.\n" |
| f"Please download from: https://www.grip.unina.it/download/prog/TruFor/TruFor_weights.zip\n" |
| f"Extract and place at: {weights_path}" |
| ), |
| }) |
| |
| try: |
| import torch |
| from torch.nn import functional as F |
| from torch.utils.data import DataLoader |
| |
| |
| import sys |
| project_root = Path(__file__).resolve().parent.parent.parent.parent |
| if str(project_root) not in sys.path: |
| sys.path.insert(0, str(project_root)) |
| |
| try: |
| from src.tools.forensic.trufor_support.config import update_config, _C as config |
| from src.tools.forensic.trufor_support.data_core import myDataset |
| from src.tools.forensic.trufor_support.models.cmx.builder_np_conf import myEncoderDecoder as confcmx |
| except ImportError as e: |
| return json.dumps({ |
| "tool": "perform_trufor", |
| "status": "error", |
| "error": f"TruFor model architecture not found: {str(e)}", |
| "note": ( |
| "To use TruFor, you need the TruFor repository architecture files. " |
| "The files should be located at: src/tools/forensic/trufor_support/\n" |
| "Required files:\n" + |
| "\n".join(f" - {f}" for f in [ |
| "config.py", |
| "data_core.py", |
| "models/cmx/builder_np_conf.py", |
| "models/DnCNN.py" |
| ]) + |
| "\n\nIf files are missing, copy them from the TruFor repository:\n" |
| "https://github.com/grip-unina/TruFor" |
| ), |
| }) |
| |
| |
| import argparse |
| parser = argparse.ArgumentParser() |
| parser.add_argument('-gpu', '--gpu', type=int, default=gpu_device) |
| parser.add_argument('-in', '--input', type=str, default=image_path) |
| parser.add_argument('-out', '--output', type=str, default='') |
| parser.add_argument('opts', nargs=argparse.REMAINDER, default=[]) |
| args = parser.parse_args([]) |
| update_config(config, args) |
| |
| |
| if device != 'cpu': |
| import torch.backends.cudnn as cudnn |
| cudnn.benchmark = config.CUDNN.BENCHMARK |
| cudnn.deterministic = config.CUDNN.DETERMINISTIC |
| cudnn.enabled = config.CUDNN.ENABLED |
| |
| |
| test_dataset = myDataset(list_img=[image_path]) |
| testloader = DataLoader(test_dataset, batch_size=1) |
| |
| |
| if config.MODEL.NAME == 'detconfcmx': |
| model = confcmx(cfg=config) |
| else: |
| raise ValueError(f"Unsupported model: {config.MODEL.NAME}") |
| |
| |
| |
| checkpoint = torch.load( |
| str(weights_path), |
| map_location=torch.device(device), |
| weights_only=False, |
| ) |
| model.load_state_dict(checkpoint['state_dict']) |
| model = model.to(device) |
| model.eval() |
| |
| |
| with torch.no_grad(): |
| for rgb, path in testloader: |
| rgb = rgb.to(device) |
| |
| pred, conf, det, npp = model(rgb) |
| |
| det_score = torch.sigmoid(det).item() |
| |
| pred = torch.squeeze(pred, 0) |
| pred = F.softmax(pred, dim=0)[1] |
| pred_np = pred.cpu().numpy() |
| |
| |
| if device != 'cpu': |
| torch.cuda.empty_cache() |
| |
| |
| manipulation_prob = float(det_score) |
| detection_score = float(det_score) |
| |
| localization_map_encoded = None |
| map_size = None |
| |
| if want_map: |
| |
| pred_normalized = pred_np |
| if pred_normalized.max() > 1.0: |
| pred_normalized = pred_normalized / pred_normalized.max() |
| |
| localization_map_encoded = _encode_png((pred_normalized * 255).astype(np.uint8)) |
| map_size = (pred_np.shape[1], pred_np.shape[0]) |
| |
| result = { |
| "tool": "perform_trufor", |
| "status": "completed", |
| "image_path": image_path, |
| "manipulation_probability": manipulation_prob, |
| "detection_score": detection_score, |
| "localization_map": localization_map_encoded, |
| "localization_map_size": map_size, |
| "note": ( |
| "TruFor combines RGB features with Noiseprint++ for forgery detection. " |
| "manipulation_probability indicates the likelihood of image manipulation (0-1). " |
| "Higher values suggest greater probability of forgery." |
| ), |
| } |
| |
| return json.dumps(result) |
| |
| |
| return json.dumps({ |
| "tool": "perform_trufor", |
| "status": "error", |
| "error": "No image was processed", |
| }) |
| |
| except ImportError as e: |
| return json.dumps({ |
| "tool": "perform_trufor", |
| "status": "error", |
| "error": f"Missing dependencies: {e}. Please install PyTorch: pip install torch torchvision", |
| }) |
| except Exception as e: |
| return json.dumps({ |
| "tool": "perform_trufor", |
| "status": "error", |
| "error": str(e), |
| }) |
|
|
|
|
| |
| |
| if SPACES_AVAILABLE: |
| @spaces.GPU(duration=120) |
| def _trufor_gpu_wrapper(image_path: str, gpu_device: int, want_map: bool) -> str: |
| """ |
| GPU-wrapped version of TruFor for ZeroGPU Spaces. |
| This function is detected by HF Spaces at startup. |
| """ |
| return _perform_trufor_internal(image_path, gpu_device, want_map) |
| else: |
| |
| def _trufor_gpu_wrapper(image_path: str, gpu_device: int, want_map: bool) -> str: |
| """Fallback wrapper when spaces module is not available.""" |
| return _perform_trufor_internal(image_path, gpu_device, want_map) |
|
|
|
|
| def perform_trufor(input_str: str) -> str: |
| """ |
| Run TruFor forgery detection and localization on an image. |
| |
| TruFor combines RGB features with Noiseprint++ to detect and localize |
| image forgeries using a transformer-based fusion architecture. |
| |
| Returns JSON with: |
| - manipulation_probability: Overall probability of manipulation (0-1) |
| - detection_score: Detection confidence score |
| - localization_map: Base64 PNG of forgery localization map (optional) |
| - localization_map_size: Size of the map (width, height) |
| |
| This function supports both traditional GPU Spaces and ZeroGPU Spaces on Hugging Face. |
| For ZeroGPU, GPU resources are allocated dynamically only when needed. |
| """ |
| image_path, gpu_device, want_map = _parse_request(input_str) |
| |
| |
| if SPACES_AVAILABLE and gpu_device >= 0: |
| |
| |
| return _trufor_gpu_wrapper(image_path, gpu_device, want_map) |
| else: |
| |
| return _perform_trufor_internal(image_path, gpu_device, want_map) |
|
|
|
|
| __all__ = ["perform_trufor"] |
|
|