|
|
|
|
|
""" |
|
|
Popescu–Farid CFA Interpolation Forensics (End-to-End) |
|
|
|
|
|
Implements the algorithm from: |
|
|
|
|
|
A. C. Popescu and H. Farid, |
|
|
"Exposing Digital Forgeries in Color Filter Array Interpolated Images," |
|
|
IEEE Trans. Signal Processing, 2005. |
|
|
|
|
|
Features: |
|
|
- EM estimation of the linear correlation model for each color channel |
|
|
(single-channel Gaussian vs uniform mixture). |
|
|
- Posterior probability map per channel. |
|
|
- Synthetic CFA maps s_r, s_g, s_b for a Bayer pattern. |
|
|
- Fourier-domain similarity M(p_c, s_c) per channel. |
|
|
- Sliding-window analysis with 50% overlap. |
|
|
- Threshold calibration on a set of negative images to get ~0% FPs. |
|
|
- Multi-channel fusion (default): window authentic if ANY channel is CFA; |
|
|
optional green-only mode: window authentic if GREEN channel is CFA. |
|
|
|
|
|
Usage: |
|
|
|
|
|
# 1) Calibrate thresholds on negative (non-CFA / tampered) images |
|
|
python pf_cfa_detector.py calibrate \ |
|
|
--neg-dir path/to/negative_images \ |
|
|
--output thresholds.json |
|
|
|
|
|
# 2) Detect on a single image (all channels, PF-style fusion) |
|
|
python pf_cfa_detector.py detect \ |
|
|
--image path/to/test_image.png \ |
|
|
--thresholds thresholds.json |
|
|
|
|
|
# 3) Detect on a directory, green-only mode |
|
|
python pf_cfa_detector.py detect \ |
|
|
--image-dir path/to/test_images \ |
|
|
--thresholds thresholds.json \ |
|
|
--green-only |
|
|
""" |
|
|
|
|
|
import argparse |
|
|
import json |
|
|
import math |
|
|
import os |
|
|
from typing import Dict, List, Sequence, Tuple |
|
|
|
|
|
import numpy as np |
|
|
from numpy.fft import fft2 |
|
|
from PIL import Image |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
IMG_EXTS = (".jpg", ".jpeg", ".png", ".bmp", ".tif", ".tiff") |
|
|
|
|
|
|
|
|
def list_image_files(directory: str) -> List[str]: |
|
|
"""Return all image files in a directory (non-recursive) with known extensions.""" |
|
|
paths = [] |
|
|
for name in os.listdir(directory): |
|
|
if name.lower().endswith(IMG_EXTS): |
|
|
paths.append(os.path.join(directory, name)) |
|
|
paths.sort() |
|
|
return paths |
|
|
|
|
|
|
|
|
def load_rgb_image(path: str) -> np.ndarray: |
|
|
""" |
|
|
Load an image from disk and return as float64 RGB array in [0,1]. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
path : str |
|
|
Path to image file. |
|
|
|
|
|
Returns |
|
|
------- |
|
|
img : np.ndarray, shape (H, W, 3), dtype float64 |
|
|
RGB image, intensities in [0, 1]. |
|
|
""" |
|
|
im = Image.open(path).convert("RGB") |
|
|
arr = np.asarray(im, dtype=np.float64) |
|
|
if arr.ndim != 3 or arr.shape[2] != 3: |
|
|
raise ValueError(f"Expected an RGB image at {path}") |
|
|
return arr / 255.0 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def em_probability_map( |
|
|
f: np.ndarray, |
|
|
N: int = 1, |
|
|
sigma0: float = 0.0075, |
|
|
p0: float = 1.0 / 256.0, |
|
|
max_iter: int = 50, |
|
|
tol: float = 1e-5, |
|
|
seed: int = 0, |
|
|
) -> Tuple[np.ndarray, np.ndarray]: |
|
|
""" |
|
|
Estimate the posterior probability map w(x,y) that each sample belongs to |
|
|
the linearly correlated model M1 via EM (Gaussian vs uniform mixture). |
|
|
|
|
|
Model for a single channel f(x,y): |
|
|
f(x,y) = sum_{u,v} alpha_{u,v} f(x+u, y+v) + n(x,y) |
|
|
where n(x,y) ~ N(0, sigma^2), alpha_{0,0} = 0. |
|
|
""" |
|
|
|
|
|
if f.ndim != 2: |
|
|
raise ValueError("em_probability_map expects a single 2D channel.") |
|
|
|
|
|
f = f.astype(np.float64, copy=False) |
|
|
H, W = f.shape |
|
|
if H <= 2 * N or W <= 2 * N: |
|
|
raise ValueError("Image too small for N = %d neighborhood." % N) |
|
|
|
|
|
|
|
|
OFFSETS: List[Tuple[int, int]] = [] |
|
|
for dy in range(-N, N + 1): |
|
|
for dx in range(-N, N + 1): |
|
|
if dy == 0 and dx == 0: |
|
|
continue |
|
|
OFFSETS.append((dy, dx)) |
|
|
K = len(OFFSETS) |
|
|
|
|
|
|
|
|
num_pixels = (H - 2 * N) * (W - 2 * N) |
|
|
X = np.empty((num_pixels, K), dtype=np.float64) |
|
|
y = np.empty((num_pixels, 1), dtype=np.float64) |
|
|
|
|
|
idx = 0 |
|
|
for yy in range(N, H - N): |
|
|
for xx in range(N, W - N): |
|
|
y[idx, 0] = f[yy, xx] |
|
|
for k, (dy, dx) in enumerate(OFFSETS): |
|
|
X[idx, k] = f[yy + dy, xx + dx] |
|
|
idx += 1 |
|
|
|
|
|
rng = np.random.default_rng(seed) |
|
|
alpha = rng.normal(scale=0.01, size=(K, 1)) |
|
|
|
|
|
|
|
|
EPS_SIGMA = 1e-6 |
|
|
sigma = float(max(sigma0, EPS_SIGMA)) |
|
|
|
|
|
for _ in range(max_iter): |
|
|
|
|
|
r = y - X @ alpha |
|
|
|
|
|
|
|
|
if sigma < EPS_SIGMA: |
|
|
sigma = EPS_SIGMA |
|
|
|
|
|
coef = 1.0 / (sigma * math.sqrt(2.0 * math.pi)) |
|
|
P = coef * np.exp(-0.5 * (r / sigma) ** 2) |
|
|
|
|
|
|
|
|
w = P / (P + p0) |
|
|
|
|
|
|
|
|
WX = X * w |
|
|
A = X.T @ WX |
|
|
b = X.T @ (w * y) |
|
|
|
|
|
try: |
|
|
alpha_new = np.linalg.solve(A, b) |
|
|
except np.linalg.LinAlgError: |
|
|
alpha_new, *_ = np.linalg.lstsq(A, b, rcond=None) |
|
|
|
|
|
|
|
|
r = y - X @ alpha_new |
|
|
num = float(np.sum(w * (r ** 2))) |
|
|
den = float(np.sum(w)) |
|
|
|
|
|
if den > 0.0: |
|
|
sigma_new = math.sqrt(num / den) |
|
|
else: |
|
|
sigma_new = sigma |
|
|
|
|
|
|
|
|
if sigma_new < EPS_SIGMA: |
|
|
sigma_new = EPS_SIGMA |
|
|
|
|
|
|
|
|
diff = np.linalg.norm(alpha_new - alpha) |
|
|
norm = np.linalg.norm(alpha) |
|
|
if norm > 0 and diff < tol * norm: |
|
|
alpha = alpha_new |
|
|
sigma = sigma_new |
|
|
break |
|
|
|
|
|
alpha = alpha_new |
|
|
sigma = sigma_new |
|
|
|
|
|
|
|
|
if sigma < EPS_SIGMA: |
|
|
sigma = EPS_SIGMA |
|
|
|
|
|
r = y - X @ alpha |
|
|
coef = 1.0 / (sigma * math.sqrt(2.0 * math.pi)) |
|
|
P = coef * np.exp(-0.5 * (r / sigma) ** 2) |
|
|
w = P / (P + p0) |
|
|
|
|
|
prob_map = np.zeros_like(f) |
|
|
idx = 0 |
|
|
for yy in range(N, H - N): |
|
|
for xx in range(N, W - N): |
|
|
prob_map[yy, xx] = w[idx, 0] |
|
|
idx += 1 |
|
|
|
|
|
return prob_map, alpha.ravel() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def synthetic_cfa_map( |
|
|
shape: Tuple[int, int], |
|
|
channel: str, |
|
|
pattern: str = "RGGB", |
|
|
) -> np.ndarray: |
|
|
""" |
|
|
Build the synthetic binary map s_c(x,y) for a Bayer pattern: |
|
|
|
|
|
s_c(x,y) = 0 if CFA at (x,y) is color c |
|
|
= 1 otherwise |
|
|
|
|
|
Bayer patterns are specified as a 4-character string: |
|
|
'RGGB', 'BGGR', 'GRBG', 'GBRG', etc. |
|
|
|
|
|
pattern[0] -> (row%2==0, col%2==0) |
|
|
pattern[1] -> (row%2==0, col%2==1) |
|
|
pattern[2] -> (row%2==1, col%2==0) |
|
|
pattern[3] -> (row%2==1, col%2==1) |
|
|
""" |
|
|
H, W = shape |
|
|
if len(pattern) != 4: |
|
|
raise ValueError("Bayer pattern string must have length 4, e.g. 'RGGB'.") |
|
|
|
|
|
channel = channel.upper() |
|
|
if channel not in ("R", "G", "B"): |
|
|
raise ValueError("channel must be one of 'R', 'G', 'B'.") |
|
|
|
|
|
tile = np.array(list(pattern), dtype="<U1").reshape(2, 2) |
|
|
s = np.ones((H, W), dtype=np.float64) |
|
|
|
|
|
for y in range(H): |
|
|
for x in range(W): |
|
|
cy = y % 2 |
|
|
cx = x % 2 |
|
|
if tile[cy, cx] == channel: |
|
|
s[y, x] = 0.0 |
|
|
|
|
|
return s |
|
|
|
|
|
|
|
|
def similarity_measure(prob_map: np.ndarray, synthetic_map: np.ndarray) -> float: |
|
|
""" |
|
|
Phase-insensitive similarity between a probability map and its CFA |
|
|
synthetic map: |
|
|
|
|
|
M(p,s) = sum |F(p)| * |F(s)| |
|
|
""" |
|
|
if prob_map.shape != synthetic_map.shape: |
|
|
raise ValueError("prob_map and synthetic_map must have the same shape.") |
|
|
|
|
|
Fp = fft2(prob_map) |
|
|
Fs = fft2(synthetic_map) |
|
|
return float(np.sum(np.abs(Fp) * np.abs(Fs))) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def sliding_window_indices(H: int, W: int, window: int) -> List[Tuple[int, int]]: |
|
|
""" |
|
|
Generate (y,x) indices for sliding windows with 50% overlap. |
|
|
stride = window // 2 along each axis. |
|
|
""" |
|
|
if window > H or window > W: |
|
|
return [(0, 0)] |
|
|
|
|
|
stride = max(1, window // 2) |
|
|
indices = [] |
|
|
y = 0 |
|
|
while y + window <= H: |
|
|
x = 0 |
|
|
while x + window <= W: |
|
|
indices.append((y, x)) |
|
|
x += stride |
|
|
y += stride |
|
|
return indices |
|
|
|
|
|
|
|
|
def analyze_window( |
|
|
window: np.ndarray, |
|
|
pattern: str = "RGGB", |
|
|
em_kwargs: Dict = None, |
|
|
) -> Dict[str, Dict[str, np.ndarray]]: |
|
|
""" |
|
|
Run EM + CFA similarity on a single RGB window. |
|
|
|
|
|
Returns per-channel: |
|
|
'prob_map', 'synthetic', 'M', 'alpha' |
|
|
""" |
|
|
if window.ndim != 3 or window.shape[2] != 3: |
|
|
raise ValueError("Expected RGB window of shape (H,W,3).") |
|
|
|
|
|
if em_kwargs is None: |
|
|
em_kwargs = {} |
|
|
|
|
|
H, W, _ = window.shape |
|
|
channels = { |
|
|
"R": window[:, :, 0], |
|
|
"G": window[:, :, 1], |
|
|
"B": window[:, :, 2], |
|
|
} |
|
|
|
|
|
result: Dict[str, Dict[str, np.ndarray]] = {} |
|
|
for cname, ch in channels.items(): |
|
|
prob_map, alpha = em_probability_map(ch, **em_kwargs) |
|
|
syn = synthetic_cfa_map((H, W), cname, pattern=pattern) |
|
|
M = similarity_measure(prob_map, syn) |
|
|
result[cname] = { |
|
|
"prob_map": prob_map, |
|
|
"synthetic": syn, |
|
|
"M": np.array(M, dtype=np.float64), |
|
|
"alpha": alpha, |
|
|
} |
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
def analyze_image_windows( |
|
|
img: np.ndarray, |
|
|
window: int = 256, |
|
|
pattern: str = "RGGB", |
|
|
em_kwargs: Dict = None, |
|
|
) -> List[Dict]: |
|
|
""" |
|
|
Apply CFA EM analysis to all sliding windows of an image. |
|
|
|
|
|
If image is smaller than `window`, the entire image is treated as one window. |
|
|
""" |
|
|
H, W, C = img.shape |
|
|
if C != 3: |
|
|
raise ValueError("Expected RGB image with 3 channels.") |
|
|
|
|
|
if em_kwargs is None: |
|
|
em_kwargs = {} |
|
|
|
|
|
if H < window or W < window: |
|
|
windows = [(0, 0)] |
|
|
w_h, w_w = H, W |
|
|
else: |
|
|
windows = sliding_window_indices(H, W, window) |
|
|
w_h = w_w = window |
|
|
|
|
|
results = [] |
|
|
for (yy, xx) in windows: |
|
|
sub = img[yy : yy + w_h, xx : xx + w_w, :] |
|
|
res = analyze_window(sub, pattern=pattern, em_kwargs=em_kwargs) |
|
|
entry = {"y": yy, "x": xx, "h": sub.shape[0], "w": sub.shape[1]} |
|
|
entry.update(res) |
|
|
results.append(entry) |
|
|
|
|
|
return results |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def calibrate_thresholds( |
|
|
negative_image_paths: Sequence[str], |
|
|
window: int = 256, |
|
|
pattern: str = "RGGB", |
|
|
em_kwargs: Dict = None, |
|
|
) -> Dict[str, float]: |
|
|
""" |
|
|
Estimate per-channel thresholds T_R, T_G, T_B to obtain ~0% false positives |
|
|
on a negative set (non-CFA / tampered images). |
|
|
|
|
|
Threshold per channel is defined as the maximum M value observed for that |
|
|
channel over all windows of all negative images. |
|
|
""" |
|
|
if em_kwargs is None: |
|
|
em_kwargs = {} |
|
|
|
|
|
Ms = {"R": [], "G": [], "B": []} |
|
|
|
|
|
for path in negative_image_paths: |
|
|
img = load_rgb_image(path) |
|
|
window_results = analyze_image_windows( |
|
|
img, |
|
|
window=window, |
|
|
pattern=pattern, |
|
|
em_kwargs=em_kwargs, |
|
|
) |
|
|
for r in window_results: |
|
|
Ms["R"].append(float(r["R"]["M"])) |
|
|
Ms["G"].append(float(r["G"]["M"])) |
|
|
Ms["B"].append(float(r["B"]["M"])) |
|
|
|
|
|
thresholds: Dict[str, float] = {} |
|
|
for c in ("R", "G", "B"): |
|
|
values = Ms[c] |
|
|
if not values: |
|
|
raise RuntimeError(f"No M values collected for channel {c}.") |
|
|
thresholds[c] = max(values) |
|
|
|
|
|
return thresholds |
|
|
|
|
|
|
|
|
def classify_windows( |
|
|
window_results: List[Dict], |
|
|
thresholds: Dict[str, float], |
|
|
green_only: bool = False, |
|
|
) -> List[Dict]: |
|
|
""" |
|
|
Add CFA/tampered labels to each window based on per-channel thresholds. |
|
|
|
|
|
If green_only=False (default, PF-style): |
|
|
channel is CFA-interpolated <=> M_c > T_c |
|
|
window authentic <=> any channel is CFA-interpolated |
|
|
|
|
|
If green_only=True: |
|
|
channel flags are still computed, but |
|
|
window authentic <=> GREEN channel is CFA-interpolated. |
|
|
""" |
|
|
classified = [] |
|
|
for r in window_results: |
|
|
M_R = float(r["R"]["M"]) |
|
|
M_G = float(r["G"]["M"]) |
|
|
M_B = float(r["B"]["M"]) |
|
|
|
|
|
chan_cfa = { |
|
|
"R": M_R > thresholds["R"], |
|
|
"G": M_G > thresholds["G"], |
|
|
"B": M_B > thresholds["B"], |
|
|
} |
|
|
|
|
|
if green_only: |
|
|
authentic = chan_cfa["G"] |
|
|
else: |
|
|
authentic = chan_cfa["R"] or chan_cfa["G"] or chan_cfa["B"] |
|
|
|
|
|
out = dict(r) |
|
|
out["channel_cfa"] = chan_cfa |
|
|
out["authentic"] = authentic |
|
|
classified.append(out) |
|
|
|
|
|
return classified |
|
|
|
|
|
|
|
|
def classify_image( |
|
|
img_path: str, |
|
|
thresholds: Dict[str, float], |
|
|
window: int = 256, |
|
|
pattern: str = "RGGB", |
|
|
em_kwargs: Dict = None, |
|
|
green_only: bool = False, |
|
|
) -> Dict: |
|
|
""" |
|
|
Run full sliding-window Popescu–Farid-style detector on a single image. |
|
|
|
|
|
Returns: |
|
|
{ |
|
|
"image_path": str, |
|
|
"windows": [...], |
|
|
"image_authentic": bool, |
|
|
} |
|
|
""" |
|
|
img = load_rgb_image(img_path) |
|
|
window_results = analyze_image_windows( |
|
|
img, |
|
|
window=window, |
|
|
pattern=pattern, |
|
|
em_kwargs=em_kwargs, |
|
|
) |
|
|
classified = classify_windows(window_results, thresholds, green_only=green_only) |
|
|
|
|
|
image_authentic = any(w["authentic"] for w in classified) |
|
|
|
|
|
return { |
|
|
"image_path": img_path, |
|
|
"windows": classified, |
|
|
"image_authentic": image_authentic, |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def add_em_args(parser: argparse.ArgumentParser) -> None: |
|
|
"""Add EM-related arguments to a subparser.""" |
|
|
parser.add_argument( |
|
|
"--N", |
|
|
type=int, |
|
|
default=1, |
|
|
help="Neighborhood radius N for EM (default: 1).", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--sigma0", |
|
|
type=float, |
|
|
default=0.0075, |
|
|
help="Initial sigma_0 for EM (default: 0.0075).", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--p0", |
|
|
type=float, |
|
|
default=1.0 / 256.0, |
|
|
help=( |
|
|
"Outlier likelihood p0. Default 1/256 (PF-style for 8-bit data). " |
|
|
"For a uniform on [0,1], consider --p0 1.0." |
|
|
), |
|
|
) |
|
|
parser.add_argument( |
|
|
"--max-iter", |
|
|
type=int, |
|
|
default=50, |
|
|
help="Maximum EM iterations (default: 50).", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--tol", |
|
|
type=float, |
|
|
default=1e-5, |
|
|
help="Relative convergence tolerance on alpha (default: 1e-5).", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--seed", |
|
|
type=int, |
|
|
default=0, |
|
|
help="Random seed for EM initialization (default: 0).", |
|
|
) |
|
|
|
|
|
|
|
|
def em_args_to_kwargs(args: argparse.Namespace) -> Dict: |
|
|
"""Convert parsed EM args into kwargs dict for em_probability_map.""" |
|
|
return dict( |
|
|
N=args.N, |
|
|
sigma0=args.sigma0, |
|
|
p0=args.p0, |
|
|
max_iter=args.max_iter, |
|
|
tol=args.tol, |
|
|
seed=args.seed, |
|
|
) |
|
|
|
|
|
|
|
|
def main(): |
|
|
parser = argparse.ArgumentParser( |
|
|
description="Popescu–Farid CFA interpolation detector (EM + spectral similarity)." |
|
|
) |
|
|
subparsers = parser.add_subparsers(dest="command", required=True) |
|
|
|
|
|
|
|
|
cal_parser = subparsers.add_parser( |
|
|
"calibrate", |
|
|
help="Calibrate per-channel thresholds on negative (non-CFA) images.", |
|
|
) |
|
|
cal_parser.add_argument( |
|
|
"--neg-dir", |
|
|
required=True, |
|
|
help="Directory with negative (non-CFA / tampered) images for calibration.", |
|
|
) |
|
|
cal_parser.add_argument( |
|
|
"--window", |
|
|
type=int, |
|
|
default=256, |
|
|
help="Window size (square) for analysis (default: 256).", |
|
|
) |
|
|
cal_parser.add_argument( |
|
|
"--pattern", |
|
|
type=str, |
|
|
default="RGGB", |
|
|
help="Bayer pattern string (default: RGGB).", |
|
|
) |
|
|
cal_parser.add_argument( |
|
|
"--output", |
|
|
type=str, |
|
|
default=None, |
|
|
help="Path to JSON file to save thresholds (optional).", |
|
|
) |
|
|
add_em_args(cal_parser) |
|
|
|
|
|
|
|
|
det_parser = subparsers.add_parser( |
|
|
"detect", |
|
|
help="Run detector on images using pre-calibrated thresholds.", |
|
|
) |
|
|
det_parser.add_argument( |
|
|
"--image", |
|
|
type=str, |
|
|
default=None, |
|
|
help="Path to a single image.", |
|
|
) |
|
|
det_parser.add_argument( |
|
|
"--image-dir", |
|
|
type=str, |
|
|
default=None, |
|
|
help="Directory with images to process.", |
|
|
) |
|
|
det_parser.add_argument( |
|
|
"--thresholds", |
|
|
type=str, |
|
|
required=True, |
|
|
help="Path to JSON file with thresholds from 'calibrate'.", |
|
|
) |
|
|
det_parser.add_argument( |
|
|
"--window", |
|
|
type=int, |
|
|
default=256, |
|
|
help="Window size (square) for analysis (default: 256).", |
|
|
) |
|
|
det_parser.add_argument( |
|
|
"--pattern", |
|
|
type=str, |
|
|
default="RGGB", |
|
|
help="Bayer pattern string (default: RGGB).", |
|
|
) |
|
|
det_parser.add_argument( |
|
|
"--green-only", |
|
|
action="store_true", |
|
|
help="Use only GREEN channel for window/image decisions.", |
|
|
) |
|
|
add_em_args(det_parser) |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
if args.command == "calibrate": |
|
|
neg_files = list_image_files(args.neg_dir) |
|
|
if not neg_files: |
|
|
raise SystemExit(f"No images found in negative directory: {args.neg_dir}") |
|
|
|
|
|
em_kwargs = em_args_to_kwargs(args) |
|
|
thresholds = calibrate_thresholds( |
|
|
negative_image_paths=neg_files, |
|
|
window=args.window, |
|
|
pattern=args.pattern, |
|
|
em_kwargs=em_kwargs, |
|
|
) |
|
|
|
|
|
|
|
|
print("# Calibrated thresholds (0%% FPs on provided negatives):") |
|
|
for c in ("R", "G", "B"): |
|
|
print(f"T_{c} = {thresholds[c]:.6e}") |
|
|
|
|
|
|
|
|
if args.output is not None: |
|
|
out_obj = { |
|
|
"thresholds": thresholds, |
|
|
"pattern": args.pattern, |
|
|
"window": args.window, |
|
|
"em_params": em_kwargs, |
|
|
} |
|
|
with open(args.output, "w", encoding="utf-8") as f: |
|
|
json.dump(out_obj, f, indent=2) |
|
|
print(f"# Saved thresholds to {args.output}") |
|
|
|
|
|
elif args.command == "detect": |
|
|
|
|
|
with open(args.thresholds, "r", encoding="utf-8") as f: |
|
|
data = json.load(f) |
|
|
|
|
|
if "thresholds" in data: |
|
|
thresholds = data["thresholds"] |
|
|
else: |
|
|
thresholds = data |
|
|
|
|
|
|
|
|
for c in ("R", "G", "B"): |
|
|
if c not in thresholds: |
|
|
raise SystemExit(f"Thresholds JSON missing channel '{c}'.") |
|
|
|
|
|
|
|
|
images: List[str] = [] |
|
|
if args.image is not None and args.image_dir is not None: |
|
|
raise SystemExit("Specify either --image or --image-dir, not both.") |
|
|
elif args.image is not None: |
|
|
images = [args.image] |
|
|
elif args.image_dir is not None: |
|
|
images = list_image_files(args.image_dir) |
|
|
if not images: |
|
|
raise SystemExit( |
|
|
f"No images found in directory: {args.image_dir}" |
|
|
) |
|
|
else: |
|
|
raise SystemExit("You must specify either --image or --image-dir.") |
|
|
|
|
|
em_kwargs = em_args_to_kwargs(args) |
|
|
green_only = bool(args.green_only) |
|
|
|
|
|
for img_path in images: |
|
|
result = classify_image( |
|
|
img_path, |
|
|
thresholds=thresholds, |
|
|
window=args.window, |
|
|
pattern=args.pattern, |
|
|
em_kwargs=em_kwargs, |
|
|
green_only=green_only, |
|
|
) |
|
|
|
|
|
label = "AUTHENTIC" if result["image_authentic"] else "TAMPERED" |
|
|
mode = "GREEN_ONLY" if green_only else "ALL_CHANNELS" |
|
|
print(f"\n# Image: {result['image_path']}") |
|
|
print(f"# Overall label: {label} (mode={mode})") |
|
|
print("# y x h w M_R M_G M_B CFA_R CFA_G CFA_B WINDOW_LABEL") |
|
|
|
|
|
for w in result["windows"]: |
|
|
y = w["y"] |
|
|
x = w["x"] |
|
|
h = w["h"] |
|
|
wd = w["w"] |
|
|
M_R = float(w["R"]["M"]) |
|
|
M_G = float(w["G"]["M"]) |
|
|
M_B = float(w["B"]["M"]) |
|
|
cfaR = w["channel_cfa"]["R"] |
|
|
cfaG = w["channel_cfa"]["G"] |
|
|
cfaB = w["channel_cfa"]["B"] |
|
|
wlabel = "AUTH" if w["authentic"] else "TAMP" |
|
|
|
|
|
print( |
|
|
f"{y:5d} {x:5d} {h:4d} {wd:4d} " |
|
|
f"{M_R:.6e} {M_G:.6e} {M_B:.6e} " |
|
|
f"{int(cfaR)} {int(cfaG)} {int(cfaB)} {wlabel}" |
|
|
) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|