df2 / src /tools /forensic /trufor_tools.py
Mustafa Akcanca
Fix imports
625a285
"""
TruFor: AI-driven image forgery detection and localization.
TruFor is an AI-driven solution for digital image forensics that combines
high-level RGB features with low-level noise-sensitive fingerprints (Noiseprint++)
through a transformer-based fusion architecture.
Original TruFor Work:
Research Group of University Federico II of Naples ('GRIP-UNINA')
https://github.com/grip-unina/TruFor
Reference Bibtex:
@InProceedings{Guillaro_2023_CVPR,
author = {Guillaro, Fabrizio and Cozzolino, Davide and Sud, Avneesh and Dufour, Nicholas and Verdoliva, Luisa},
title = {TruFor: Leveraging All-Round Clues for Trustworthy Image Forgery Detection and Localization},
booktitle = {Proceedings of the IEEE/CVF Conference on Computer Vision and Pattern Recognition (CVPR)},
month = {June},
year = {2023},
pages = {20606-20615}
}
"""
import base64
import json
import os
from io import BytesIO
from pathlib import Path
from typing import Tuple
import numpy as np
from PIL import Image
# Try to import spaces module for ZeroGPU support (Hugging Face Spaces)
# This must be at module level for HF Spaces to detect @spaces.GPU decorators
try:
import spaces
SPACES_AVAILABLE = True
except ImportError:
SPACES_AVAILABLE = False
spaces = None
def _parse_request(input_str: str) -> Tuple[str, int, bool]:
"""
Accept plain path or JSON payload:
{
"path": "/path/to/image.jpg",
"gpu": 0, # GPU device (-1 for CPU, 0+ for GPU)
"return_map": true # Include base64 PNG localization map
}
"""
default_gpu = 0
default_return_map = False
try:
data = json.loads(input_str)
if isinstance(data, dict):
path = data.get("path", input_str).strip()
gpu = int(data.get("gpu", default_gpu))
return_map = bool(data.get("return_map", default_return_map))
return path, gpu, return_map
except Exception:
pass
return input_str.strip(), default_gpu, default_return_map
def _encode_png(arr: np.ndarray) -> str:
"""Encode numpy array as base64 PNG."""
# Normalize to 0-255
if arr.max() <= 1.0:
arr = (arr * 255).astype(np.uint8)
else:
arr = np.clip(arr, 0, 255).astype(np.uint8)
img = Image.fromarray(arr, mode='L')
buf = BytesIO()
img.save(buf, format="PNG")
buf.seek(0)
b64 = base64.b64encode(buf.getvalue()).decode("utf-8")
return f"data:image/png;base64,{b64}"
def _perform_trufor_internal(image_path: str, gpu_device: int, want_map: bool) -> str:
"""
Internal function that performs TruFor analysis.
This is separated so it can be wrapped with @spaces.GPU for ZeroGPU support.
"""
# Determine device
device = f'cuda:{gpu_device}' if gpu_device >= 0 else 'cpu'
# Find weights file
workspace_root = Path(__file__).parent.parent.parent.parent
weights_path = workspace_root / "weights" / "trufor" / "trufor.pth.tar"
# Try to download weights if missing
if not weights_path.exists():
try:
from src.utils.weight_downloader import ensure_trufor_weights
success, message = ensure_trufor_weights(workspace_root=workspace_root, auto_download=True)
if not success:
return json.dumps({
"tool": "perform_trufor",
"status": "error",
"error": f"TruFor weights not found and download failed.\n{message}",
})
# Weights should now be available
if not weights_path.exists():
return json.dumps({
"tool": "perform_trufor",
"status": "error",
"error": f"TruFor weights still not found at {weights_path} after download attempt.",
})
except ImportError:
# Fallback if downloader not available
return json.dumps({
"tool": "perform_trufor",
"status": "error",
"error": (
f"TruFor weights not found at {weights_path}.\n"
f"Please download from: https://www.grip.unina.it/download/prog/TruFor/TruFor_weights.zip\n"
f"Extract and place at: {weights_path}"
),
})
# Try to download weights if missing
if not weights_path.exists():
try:
from src.utils.weight_downloader import ensure_trufor_weights
success, message = ensure_trufor_weights(workspace_root=workspace_root, auto_download=True)
if not success:
return json.dumps({
"tool": "perform_trufor",
"status": "error",
"error": f"TruFor weights not found and download failed.\n{message}",
})
# Weights should now be available
if not weights_path.exists():
return json.dumps({
"tool": "perform_trufor",
"status": "error",
"error": f"TruFor weights still not found at {weights_path} after download attempt.",
})
except ImportError:
# Fallback if downloader not available
return json.dumps({
"tool": "perform_trufor",
"status": "error",
"error": (
f"TruFor weights not found at {weights_path}.\n"
f"Please download from: https://www.grip.unina.it/download/prog/TruFor/TruFor_weights.zip\n"
f"Extract and place at: {weights_path}"
),
})
try:
import torch
from torch.nn import functional as F
from torch.utils.data import DataLoader
# Import TruFor components
import sys
project_root = Path(__file__).resolve().parent.parent.parent.parent
if str(project_root) not in sys.path:
sys.path.insert(0, str(project_root))
try:
from src.tools.forensic.trufor_support.config import update_config, _C as config
from src.tools.forensic.trufor_support.data_core import myDataset
from src.tools.forensic.trufor_support.models.cmx.builder_np_conf import myEncoderDecoder as confcmx
except ImportError as e:
return json.dumps({
"tool": "perform_trufor",
"status": "error",
"error": f"TruFor model architecture not found: {str(e)}",
"note": (
"To use TruFor, you need the TruFor repository architecture files. "
"The files should be located at: src/tools/forensic/trufor_support/\n"
"Required files:\n" +
"\n".join(f" - {f}" for f in [
"config.py",
"data_core.py",
"models/cmx/builder_np_conf.py",
"models/DnCNN.py"
]) +
"\n\nIf files are missing, copy them from the TruFor repository:\n"
"https://github.com/grip-unina/TruFor"
),
})
# Set up config
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('-gpu', '--gpu', type=int, default=gpu_device)
parser.add_argument('-in', '--input', type=str, default=image_path)
parser.add_argument('-out', '--output', type=str, default='')
parser.add_argument('opts', nargs=argparse.REMAINDER, default=[])
args = parser.parse_args([])
update_config(config, args)
# Set device-specific settings
if device != 'cpu':
import torch.backends.cudnn as cudnn
cudnn.benchmark = config.CUDNN.BENCHMARK
cudnn.deterministic = config.CUDNN.DETERMINISTIC
cudnn.enabled = config.CUDNN.ENABLED
# Create dataset
test_dataset = myDataset(list_img=[image_path])
testloader = DataLoader(test_dataset, batch_size=1)
# Load model
if config.MODEL.NAME == 'detconfcmx':
model = confcmx(cfg=config)
else:
raise ValueError(f"Unsupported model: {config.MODEL.NAME}")
# The checkpoint includes pickled tensors; loading with weights_only=False is expected here.
# weights file is checked into repo, so we trust the source.
checkpoint = torch.load(
str(weights_path),
map_location=torch.device(device),
weights_only=False,
)
model.load_state_dict(checkpoint['state_dict'])
model = model.to(device)
model.eval()
# Process image
with torch.no_grad():
for rgb, path in testloader:
rgb = rgb.to(device)
pred, conf, det, npp = model(rgb)
det_score = torch.sigmoid(det).item()
pred = torch.squeeze(pred, 0)
pred = F.softmax(pred, dim=0)[1]
pred_np = pred.cpu().numpy()
# Clean up GPU memory
if device != 'cpu':
torch.cuda.empty_cache()
# Prepare results
manipulation_prob = float(det_score)
detection_score = float(det_score)
localization_map_encoded = None
map_size = None
if want_map:
# Normalize prediction map to 0-1 range for visualization
pred_normalized = pred_np
if pred_normalized.max() > 1.0:
pred_normalized = pred_normalized / pred_normalized.max()
localization_map_encoded = _encode_png((pred_normalized * 255).astype(np.uint8))
map_size = (pred_np.shape[1], pred_np.shape[0]) # (width, height)
result = {
"tool": "perform_trufor",
"status": "completed",
"image_path": image_path,
"manipulation_probability": manipulation_prob,
"detection_score": detection_score,
"localization_map": localization_map_encoded,
"localization_map_size": map_size,
"note": (
"TruFor combines RGB features with Noiseprint++ for forgery detection. "
"manipulation_probability indicates the likelihood of image manipulation (0-1). "
"Higher values suggest greater probability of forgery."
),
}
return json.dumps(result)
# If we get here, no image was processed
return json.dumps({
"tool": "perform_trufor",
"status": "error",
"error": "No image was processed",
})
except ImportError as e:
return json.dumps({
"tool": "perform_trufor",
"status": "error",
"error": f"Missing dependencies: {e}. Please install PyTorch: pip install torch torchvision",
})
except Exception as e:
return json.dumps({
"tool": "perform_trufor",
"status": "error",
"error": str(e),
})
# Module-level GPU function for ZeroGPU detection at startup
# This must be at module level for Hugging Face Spaces to detect @spaces.GPU
if SPACES_AVAILABLE:
@spaces.GPU(duration=120) # 120 seconds max for TruFor inference
def _trufor_gpu_wrapper(image_path: str, gpu_device: int, want_map: bool) -> str:
"""
GPU-wrapped version of TruFor for ZeroGPU Spaces.
This function is detected by HF Spaces at startup.
"""
return _perform_trufor_internal(image_path, gpu_device, want_map)
else:
# Fallback for non-Spaces environments
def _trufor_gpu_wrapper(image_path: str, gpu_device: int, want_map: bool) -> str:
"""Fallback wrapper when spaces module is not available."""
return _perform_trufor_internal(image_path, gpu_device, want_map)
def perform_trufor(input_str: str) -> str:
"""
Run TruFor forgery detection and localization on an image.
TruFor combines RGB features with Noiseprint++ to detect and localize
image forgeries using a transformer-based fusion architecture.
Returns JSON with:
- manipulation_probability: Overall probability of manipulation (0-1)
- detection_score: Detection confidence score
- localization_map: Base64 PNG of forgery localization map (optional)
- localization_map_size: Size of the map (width, height)
This function supports both traditional GPU Spaces and ZeroGPU Spaces on Hugging Face.
For ZeroGPU, GPU resources are allocated dynamically only when needed.
"""
image_path, gpu_device, want_map = _parse_request(input_str)
# Use GPU wrapper if GPU is requested and spaces module is available
if SPACES_AVAILABLE and gpu_device >= 0:
# Use the module-level decorated function for ZeroGPU
# This ensures HF Spaces detects @spaces.GPU at startup
return _trufor_gpu_wrapper(image_path, gpu_device, want_map)
else:
# CPU mode or spaces module not available - use internal function directly
return _perform_trufor_internal(image_path, gpu_device, want_map)
__all__ = ["perform_trufor"]