Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import logging | |
| import os | |
| import shutil | |
| import subprocess | |
| import tempfile | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Dict, Optional, Tuple | |
| import numpy as np | |
| from PIL import Image | |
| LOGGER = logging.getLogger(__name__) | |
| class TruForUnavailableError(RuntimeError): | |
| """Raised when the TruFor assets are missing or inference fails.""" | |
| class TruForResult: | |
| score: Optional[float] | |
| map_overlay: Optional[Image.Image] | |
| class TruForEngine: | |
| """Wrapper that executes TruFor inference through docker or python backends.""" | |
| def __init__( | |
| self, | |
| repo_root: Optional[Path] = None, | |
| weights_path: Optional[Path] = None, | |
| device: str = "cpu", | |
| ) -> None: | |
| self.base_dir = Path(__file__).resolve().parent | |
| self.device = device | |
| self.backend: Optional[str] = None | |
| self.status_message = "TruFor backend not initialized." | |
| backend_pref = os.environ.get("TRUFOR_BACKEND", "auto").lower() | |
| if backend_pref not in {"auto", "native", "docker"}: | |
| backend_pref = "auto" | |
| errors: list[str] = [] | |
| if backend_pref in {"auto", "native"}: | |
| try: | |
| self._configure_native_backend(repo_root, weights_path) | |
| self.backend = "native" | |
| self.status_message = "TruFor ready (bundled python backend)." | |
| except TruForUnavailableError as exc: | |
| errors.append(f"Native backend unavailable: {exc}") | |
| if backend_pref == "native": | |
| raise | |
| if self.backend is None and backend_pref in {"auto", "docker"}: | |
| try: | |
| self._configure_docker_backend() | |
| self.backend = "docker" | |
| self.status_message = f'TruFor ready (docker image "{self.docker_image}").' | |
| except TruForUnavailableError as exc: | |
| errors.append(f"Docker backend unavailable: {exc}") | |
| if backend_pref == "docker": | |
| raise | |
| if self.backend is None: | |
| raise TruForUnavailableError(" | ".join(errors) if errors else "TruFor backend unavailable.") | |
| # ------------------------------------------------------------------ | |
| # Backend configuration helpers | |
| # ------------------------------------------------------------------ | |
| def _configure_docker_backend(self) -> None: | |
| if shutil.which("docker") is None: | |
| raise TruForUnavailableError("docker CLI not found on PATH.") | |
| test_docker_dir = self.base_dir / "test_docker" | |
| if not test_docker_dir.exists(): | |
| raise TruForUnavailableError("test_docker directory not found in workspace.") | |
| image_name = os.environ.get("TRUFOR_DOCKER_IMAGE", "trufor") | |
| inspect = subprocess.run( | |
| ["docker", "image", "inspect", image_name], | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True, | |
| check=False, | |
| ) | |
| if inspect.returncode != 0: | |
| raise TruForUnavailableError( | |
| f'Docker image "{image_name}" not found. Build it with "bash test_docker/docker_build.sh".' | |
| ) | |
| weights_candidate = Path(os.environ.get("TRUFOR_DOCKER_WEIGHTS", self.base_dir / "weights")).expanduser() | |
| weight_file = weights_candidate / "trufor.pth.tar" | |
| self.docker_weights_dir: Optional[Path] | |
| self.docker_weights_dir = weight_file.parent if weight_file.exists() else None | |
| self.docker_runtime = os.environ.get("TRUFOR_DOCKER_RUNTIME") | |
| gpu_pref = os.environ.get("TRUFOR_DOCKER_GPU") | |
| if gpu_pref is None: | |
| gpu_pref = "-1" if self.device == "cpu" else "0" | |
| self.docker_gpu = gpu_pref | |
| gpus_arg = os.environ.get("TRUFOR_DOCKER_GPUS_ARG") | |
| if not gpus_arg and gpu_pref not in {"-1", "cpu", "none"}: | |
| gpus_arg = "all" | |
| self.docker_gpus_arg = gpus_arg | |
| self.docker_image = image_name | |
| def _configure_native_backend(self, _repo_root: Optional[Path], weights_path: Optional[Path]) -> None: | |
| try: | |
| from trufor_native import TruForBundledModel | |
| except ImportError as exc: # pragma: no cover - packaging guard | |
| raise TruForUnavailableError("Bundled TruFor modules are not available.") from exc | |
| default_weights = self.base_dir / "weights" / "trufor.pth.tar" | |
| weight_candidate = weights_path or os.environ.get("TRUFOR_WEIGHTS") or default_weights | |
| weight_path = Path(weight_candidate).expanduser() | |
| if not weight_path.exists(): | |
| raise TruForUnavailableError( | |
| f"TruFor weights missing at {weight_path}. Place trufor.pth.tar under weights/ or set TRUFOR_WEIGHTS." | |
| ) | |
| try: | |
| self.native_model = TruForBundledModel(weight_path, device=self.device) | |
| except Exception as exc: # pragma: no cover - propagate detailed failure | |
| raise TruForUnavailableError(f"Failed to initialise bundled TruFor model: {exc}") from exc | |
| # ------------------------------------------------------------------ | |
| # Public API | |
| # ------------------------------------------------------------------ | |
| def infer(self, image: Image.Image) -> TruForResult: | |
| if image is None: | |
| raise TruForUnavailableError("No image supplied to TruFor inference.") | |
| prepared_image, cropped = self._strip_gps_overlay(image) | |
| if cropped: | |
| LOGGER.debug( | |
| "Cropping %d px GPS overlay before TruFor inference.", | |
| image.height - prepared_image.height, | |
| ) | |
| if self.backend == "docker": | |
| return self._infer_docker(prepared_image) | |
| if self.backend == "native": | |
| return self._infer_native(prepared_image) | |
| raise TruForUnavailableError("TruFor backend not configured.") | |
| # ------------------------------------------------------------------ | |
| # Inference helpers | |
| # ------------------------------------------------------------------ | |
| def _infer_native(self, image: Image.Image) -> TruForResult: | |
| outputs = self.native_model.predict(image) | |
| map_overlay = None | |
| try: | |
| map_overlay = self._apply_heatmap(image, outputs.tamper_map) | |
| except Exception as exc: # pragma: no cover - visualisation fallback | |
| LOGGER.debug("Failed to build tamper heatmap: %s", exc) | |
| return TruForResult( | |
| score=outputs.detection_score, | |
| map_overlay=map_overlay, | |
| ) | |
| def _infer_docker(self, image: Image.Image) -> TruForResult: | |
| with tempfile.TemporaryDirectory(prefix="trufor_docker_") as workdir: | |
| workdir_path = Path(workdir) | |
| input_dir = workdir_path / "data" | |
| output_dir = workdir_path / "data_out" | |
| input_dir.mkdir(parents=True, exist_ok=True) | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| input_path = input_dir / "input.png" | |
| image.convert("RGB").save(input_path) | |
| cmd = ["docker", "run", "--rm"] | |
| if self.docker_runtime: | |
| cmd.extend(["--runtime", self.docker_runtime]) | |
| gpu_flag = str(self.docker_gpu) | |
| if gpu_flag.lower() in {"cpu", "none"}: | |
| gpu_flag = "-1" | |
| if gpu_flag != "-1" and self.docker_gpus_arg: | |
| cmd.extend(["--gpus", self.docker_gpus_arg]) | |
| cmd.extend([ | |
| "-v", | |
| f"{input_dir.resolve()}:/data:ro", | |
| "-v", | |
| f"{output_dir.resolve()}:/data_out:rw", | |
| ]) | |
| if self.docker_weights_dir is not None: | |
| cmd.extend([ | |
| "-v", | |
| f"{self.docker_weights_dir.resolve()}:/weights:ro", | |
| ]) | |
| cmd.append(self.docker_image) | |
| cmd.extend( | |
| [ | |
| "-gpu", | |
| gpu_flag, | |
| "-in", | |
| "data/input.png", | |
| "-out", | |
| "data_out", | |
| ] | |
| ) | |
| LOGGER.debug("Running TruFor docker command: %s", " ".join(cmd)) | |
| result = subprocess.run( | |
| cmd, | |
| text=True, | |
| capture_output=True, | |
| check=False, | |
| ) | |
| return self._process_results(result, output_dir, image) | |
| # ------------------------------------------------------------------ | |
| # Result parsing | |
| # ------------------------------------------------------------------ | |
| def _process_results(self, run_result: subprocess.CompletedProcess[str], output_dir: Path, image: Image.Image) -> TruForResult: | |
| if run_result.returncode != 0: | |
| stderr_tail = "\n".join(run_result.stderr.strip().splitlines()[-8:]) if run_result.stderr else "" | |
| LOGGER.error("TruFor stderr: %s", stderr_tail) | |
| raise TruForUnavailableError( | |
| "TruFor inference failed. Inspect dependencies and stderr:\n" + stderr_tail | |
| ) | |
| npz_files = list(output_dir.rglob("*.npz")) | |
| if not npz_files: | |
| stdout_tail = "\n".join(run_result.stdout.strip().splitlines()[-8:]) if run_result.stdout else "" | |
| raise TruForUnavailableError( | |
| "TruFor inference produced no output files. Stdout tail:\n" + stdout_tail | |
| ) | |
| data = np.load(npz_files[0], allow_pickle=False) | |
| tamper_map = data.get("map") | |
| score = float(data["score"]) if "score" in data.files else None | |
| map_overlay = None | |
| try: | |
| map_overlay = self._apply_heatmap(image, tamper_map) if tamper_map is not None else None | |
| except Exception as exc: # pragma: no cover | |
| LOGGER.debug("Failed to build tamper heatmap: %s", exc) | |
| return TruForResult( | |
| score=score, | |
| map_overlay=map_overlay, | |
| ) | |
| def _apply_heatmap(base: Image.Image, data: np.ndarray, alpha: float = 0.55) -> Image.Image: | |
| base_rgb = base.convert("RGB") | |
| if data is None or data.ndim != 2: | |
| raise ValueError("Expected a 2D map from TruFor") | |
| data = np.asarray(data, dtype=np.float32) | |
| if np.allclose(data.max(), data.min()): | |
| norm = np.zeros_like(data, dtype=np.float32) | |
| else: | |
| norm = (data - data.min()) / (data.max() - data.min()) | |
| heat = np.zeros((*norm.shape, 3), dtype=np.uint8) | |
| heat[..., 0] = np.clip(norm * 255, 0, 255).astype(np.uint8) | |
| heat[..., 1] = np.clip(np.sqrt(norm) * 255, 0, 255).astype(np.uint8) | |
| heat[..., 2] = np.clip((1.0 - norm) * 255, 0, 255).astype(np.uint8) | |
| heat_img = Image.fromarray(heat, mode="RGB").resize(base_rgb.size, Image.BILINEAR) | |
| return Image.blend(base_rgb, heat_img, alpha) | |
| def _strip_gps_overlay(image: Image.Image) -> Tuple[Image.Image, bool]: | |
| gray = np.asarray(image.convert("L"), dtype=np.uint8) | |
| hsv = np.asarray(image.convert("HSV"), dtype=np.uint8) | |
| hue = hsv[..., 0] / 255.0 | |
| sat = hsv[..., 1] / 255.0 | |
| val = hsv[..., 2] / 255.0 | |
| height, width = gray.shape | |
| min_overlay = max(int(height * 0.08), 40) | |
| max_overlay = max(int(height * 0.45), min_overlay + 1) | |
| if height <= min_overlay: | |
| return image, False | |
| start_row = height - min_overlay | |
| stop_row = max(height - max_overlay, 1) | |
| row_means = gray.mean(axis=1) | |
| sat_means = sat.mean(axis=1) | |
| blue_mask = (hue >= 0.5) & (hue <= 0.75) & (sat >= 0.25) & (val <= 0.95) | |
| yellow_mask = (hue >= 0.08) & (hue <= 0.18) & (sat >= 0.35) & (val >= 0.45) | |
| white_mask = (val >= 0.87) & (sat <= 0.28) | |
| dark_mask = val <= 0.28 | |
| overlay_mask = blue_mask | yellow_mask | white_mask | dark_mask | |
| overlay_ratio_rows = overlay_mask.mean(axis=1) | |
| boundary = None | |
| best_score = 0.0 | |
| # First, try to detect a long contiguous overlay band using hysteresis on coverage. | |
| high_ratio = 0.52 | |
| low_ratio = 0.36 | |
| run_len = 0 | |
| run_top = height | |
| for row in range(height - 1, stop_row - 1, -1): | |
| ratio = overlay_ratio_rows[row] | |
| if ratio >= high_ratio or (run_len > 0 and ratio >= low_ratio): | |
| run_len += 1 | |
| run_top = row | |
| if run_len >= max_overlay: | |
| break | |
| elif run_len > 0: | |
| if run_len >= min_overlay: | |
| break | |
| run_len = 0 | |
| run_top = height | |
| elif height - row >= max_overlay: | |
| break | |
| if run_len >= min_overlay: | |
| boundary_candidate = run_top | |
| overlay_consistency = overlay_ratio_rows[boundary_candidate:height].mean() | |
| boundary_strength = abs(row_means[boundary_candidate - 1] - row_means[boundary_candidate]) if boundary_candidate > 0 else abs(row_means[boundary_candidate] - row_means[min(boundary_candidate + 1, height - 1)]) | |
| if overlay_consistency >= 0.45 and boundary_strength >= 4.0: | |
| overlay_height = height - boundary_candidate | |
| margin = min(max(int(overlay_height * 0.25), 18), boundary_candidate) | |
| crop_row = max(boundary_candidate - margin, 0) | |
| cropped_image = image.crop((0, 0, width, crop_row)) | |
| return cropped_image, True | |
| # Detect GPS Map Camera overlay at the bottom and crop it out if present. | |
| for row in range(start_row, stop_row - 1, -1): | |
| overlay_height = height - row | |
| if overlay_height < min_overlay: | |
| continue | |
| if overlay_height > max_overlay: | |
| break | |
| overlay_hue = hue[row:height, :] | |
| overlay_sat = sat[row:height, :] | |
| overlay_val = val[row:height, :] | |
| high_sat_ratio = float((overlay_sat > 0.3).mean()) | |
| dark_ratio = float((overlay_val < 0.3).mean()) | |
| bright_ratio = float((overlay_val > 0.88).mean()) | |
| colored_band_ratio = float(((overlay_sat > 0.32) & (overlay_val > 0.25) & (overlay_val < 0.85)).mean()) | |
| blue_ratio = float(((overlay_hue > 0.48) & (overlay_hue < 0.74) & (overlay_sat > 0.28)).mean()) | |
| yellow_ratio = float(((overlay_hue > 0.07) & (overlay_hue < 0.2) & (overlay_sat > 0.35) & (overlay_val > 0.45)).mean()) | |
| prev_mean = row_means[row - 1] if row > 0 else row_means[row] | |
| boundary_strength = abs(prev_mean - row_means[row]) | |
| saturation_jump = sat_means[row - 1] - sat_means[row] if row > 0 else 0.0 | |
| score = 0.0 | |
| if high_sat_ratio > 0.38: | |
| score += 0.8 | |
| if colored_band_ratio > 0.35: | |
| score += 0.7 | |
| if blue_ratio > 0.22: | |
| score += 0.8 | |
| if yellow_ratio > 0.12: | |
| score += 0.5 | |
| if dark_ratio > 0.32: | |
| score += 0.6 | |
| if bright_ratio > 0.07: | |
| score += 0.5 | |
| if boundary_strength > 5.5: | |
| score += 0.6 | |
| if saturation_jump < -0.05: | |
| score += 0.4 | |
| edge_ratio = boundary_strength / max(row_means[row], 1.0) | |
| if edge_ratio > 0.11: | |
| score += 0.3 | |
| if overlay_ratio_rows[row:height].mean() > 0.42: | |
| score += 0.3 | |
| if score > best_score: | |
| best_score = score | |
| boundary = row | |
| if boundary is None or best_score < 1.6: | |
| return image, False | |
| overlay_height = height - boundary | |
| margin = min(max(int(overlay_height * 0.25), 18), boundary) | |
| crop_row = max(boundary - margin, 0) | |
| cropped_image = image.crop((0, 0, width, crop_row)) | |
| return cropped_image, True | |