Mustafa Akcanca
Add CFA
5196d55
#!/usr/bin/env python3
"""
Popescu–Farid CFA Interpolation Forensics (End-to-End)
Implements the algorithm from:
A. C. Popescu and H. Farid,
"Exposing Digital Forgeries in Color Filter Array Interpolated Images,"
IEEE Trans. Signal Processing, 2005.
Features:
- EM estimation of the linear correlation model for each color channel
(single-channel Gaussian vs uniform mixture).
- Posterior probability map per channel.
- Synthetic CFA maps s_r, s_g, s_b for a Bayer pattern.
- Fourier-domain similarity M(p_c, s_c) per channel.
- Sliding-window analysis with 50% overlap.
- Threshold calibration on a set of negative images to get ~0% FPs.
- Multi-channel fusion (default): window authentic if ANY channel is CFA;
optional green-only mode: window authentic if GREEN channel is CFA.
Usage:
# 1) Calibrate thresholds on negative (non-CFA / tampered) images
python pf_cfa_detector.py calibrate \
--neg-dir path/to/negative_images \
--output thresholds.json
# 2) Detect on a single image (all channels, PF-style fusion)
python pf_cfa_detector.py detect \
--image path/to/test_image.png \
--thresholds thresholds.json
# 3) Detect on a directory, green-only mode
python pf_cfa_detector.py detect \
--image-dir path/to/test_images \
--thresholds thresholds.json \
--green-only
"""
import argparse
import json
import math
import os
from typing import Dict, List, Sequence, Tuple
import numpy as np
from numpy.fft import fft2
from PIL import Image
# -------------------------------------------------------------------------
# Parameters and utilities
# -------------------------------------------------------------------------
IMG_EXTS = (".jpg", ".jpeg", ".png", ".bmp", ".tif", ".tiff")
def list_image_files(directory: str) -> List[str]:
"""Return all image files in a directory (non-recursive) with known extensions."""
paths = []
for name in os.listdir(directory):
if name.lower().endswith(IMG_EXTS):
paths.append(os.path.join(directory, name))
paths.sort()
return paths
def load_rgb_image(path: str) -> np.ndarray:
"""
Load an image from disk and return as float64 RGB array in [0,1].
Parameters
----------
path : str
Path to image file.
Returns
-------
img : np.ndarray, shape (H, W, 3), dtype float64
RGB image, intensities in [0, 1].
"""
im = Image.open(path).convert("RGB")
arr = np.asarray(im, dtype=np.float64)
if arr.ndim != 3 or arr.shape[2] != 3:
raise ValueError(f"Expected an RGB image at {path}")
return arr / 255.0
# -------------------------------------------------------------------------
# EM algorithm for the linear CFA correlation model (single channel)
# -------------------------------------------------------------------------
def em_probability_map(
f: np.ndarray,
N: int = 1,
sigma0: float = 0.0075,
p0: float = 1.0 / 256.0,
max_iter: int = 50,
tol: float = 1e-5,
seed: int = 0,
) -> Tuple[np.ndarray, np.ndarray]:
"""
Estimate the posterior probability map w(x,y) that each sample belongs to
the linearly correlated model M1 via EM (Gaussian vs uniform mixture).
Model for a single channel f(x,y):
f(x,y) = sum_{u,v} alpha_{u,v} f(x+u, y+v) + n(x,y)
where n(x,y) ~ N(0, sigma^2), alpha_{0,0} = 0.
"""
if f.ndim != 2:
raise ValueError("em_probability_map expects a single 2D channel.")
f = f.astype(np.float64, copy=False)
H, W = f.shape
if H <= 2 * N or W <= 2 * N:
raise ValueError("Image too small for N = %d neighborhood." % N)
# For N=1, there are (2N+1)^2 - 1 = 8 neighbors.
OFFSETS: List[Tuple[int, int]] = []
for dy in range(-N, N + 1):
for dx in range(-N, N + 1):
if dy == 0 and dx == 0:
continue
OFFSETS.append((dy, dx))
K = len(OFFSETS)
# Build design matrix X and observation vector y.
num_pixels = (H - 2 * N) * (W - 2 * N)
X = np.empty((num_pixels, K), dtype=np.float64)
y = np.empty((num_pixels, 1), dtype=np.float64)
idx = 0
for yy in range(N, H - N):
for xx in range(N, W - N):
y[idx, 0] = f[yy, xx]
for k, (dy, dx) in enumerate(OFFSETS):
X[idx, k] = f[yy + dy, xx + dx]
idx += 1
rng = np.random.default_rng(seed)
alpha = rng.normal(scale=0.01, size=(K, 1))
# ---- NEW: enforce lower bound on sigma to avoid degeneracy ----
EPS_SIGMA = 1e-6
sigma = float(max(sigma0, EPS_SIGMA))
for _ in range(max_iter):
# E-step: residuals and posterior weights
r = y - X @ alpha # (num_pixels,1)
# Guard sigma here too
if sigma < EPS_SIGMA:
sigma = EPS_SIGMA
coef = 1.0 / (sigma * math.sqrt(2.0 * math.pi))
P = coef * np.exp(-0.5 * (r / sigma) ** 2) # Gaussian likelihood for M1
# Posterior w = P / (P + p0)
w = P / (P + p0)
# M-step: weighted LS for alpha
WX = X * w
A = X.T @ WX
b = X.T @ (w * y)
try:
alpha_new = np.linalg.solve(A, b)
except np.linalg.LinAlgError:
alpha_new, *_ = np.linalg.lstsq(A, b, rcond=None)
# Update sigma^2 = sum w_i r_i^2 / sum w_i
r = y - X @ alpha_new
num = float(np.sum(w * (r ** 2)))
den = float(np.sum(w))
if den > 0.0:
sigma_new = math.sqrt(num / den)
else:
sigma_new = sigma
# Clamp again to avoid zero
if sigma_new < EPS_SIGMA:
sigma_new = EPS_SIGMA
# Convergence check
diff = np.linalg.norm(alpha_new - alpha)
norm = np.linalg.norm(alpha)
if norm > 0 and diff < tol * norm:
alpha = alpha_new
sigma = sigma_new
break
alpha = alpha_new
sigma = sigma_new
# Final posterior with converged alpha
if sigma < EPS_SIGMA:
sigma = EPS_SIGMA
r = y - X @ alpha
coef = 1.0 / (sigma * math.sqrt(2.0 * math.pi))
P = coef * np.exp(-0.5 * (r / sigma) ** 2)
w = P / (P + p0)
prob_map = np.zeros_like(f)
idx = 0
for yy in range(N, H - N):
for xx in range(N, W - N):
prob_map[yy, xx] = w[idx, 0]
idx += 1
return prob_map, alpha.ravel()
# -------------------------------------------------------------------------
# CFA synthetic maps and Fourier-domain similarity
# -------------------------------------------------------------------------
def synthetic_cfa_map(
shape: Tuple[int, int],
channel: str,
pattern: str = "RGGB",
) -> np.ndarray:
"""
Build the synthetic binary map s_c(x,y) for a Bayer pattern:
s_c(x,y) = 0 if CFA at (x,y) is color c
= 1 otherwise
Bayer patterns are specified as a 4-character string:
'RGGB', 'BGGR', 'GRBG', 'GBRG', etc.
pattern[0] -> (row%2==0, col%2==0)
pattern[1] -> (row%2==0, col%2==1)
pattern[2] -> (row%2==1, col%2==0)
pattern[3] -> (row%2==1, col%2==1)
"""
H, W = shape
if len(pattern) != 4:
raise ValueError("Bayer pattern string must have length 4, e.g. 'RGGB'.")
channel = channel.upper()
if channel not in ("R", "G", "B"):
raise ValueError("channel must be one of 'R', 'G', 'B'.")
tile = np.array(list(pattern), dtype="<U1").reshape(2, 2)
s = np.ones((H, W), dtype=np.float64)
for y in range(H):
for x in range(W):
cy = y % 2
cx = x % 2
if tile[cy, cx] == channel:
s[y, x] = 0.0
return s
def similarity_measure(prob_map: np.ndarray, synthetic_map: np.ndarray) -> float:
"""
Phase-insensitive similarity between a probability map and its CFA
synthetic map:
M(p,s) = sum |F(p)| * |F(s)|
"""
if prob_map.shape != synthetic_map.shape:
raise ValueError("prob_map and synthetic_map must have the same shape.")
Fp = fft2(prob_map)
Fs = fft2(synthetic_map)
return float(np.sum(np.abs(Fp) * np.abs(Fs)))
# -------------------------------------------------------------------------
# Sliding-window analysis
# -------------------------------------------------------------------------
def sliding_window_indices(H: int, W: int, window: int) -> List[Tuple[int, int]]:
"""
Generate (y,x) indices for sliding windows with 50% overlap.
stride = window // 2 along each axis.
"""
if window > H or window > W:
return [(0, 0)]
stride = max(1, window // 2)
indices = []
y = 0
while y + window <= H:
x = 0
while x + window <= W:
indices.append((y, x))
x += stride
y += stride
return indices
def analyze_window(
window: np.ndarray,
pattern: str = "RGGB",
em_kwargs: Dict = None,
) -> Dict[str, Dict[str, np.ndarray]]:
"""
Run EM + CFA similarity on a single RGB window.
Returns per-channel:
'prob_map', 'synthetic', 'M', 'alpha'
"""
if window.ndim != 3 or window.shape[2] != 3:
raise ValueError("Expected RGB window of shape (H,W,3).")
if em_kwargs is None:
em_kwargs = {}
H, W, _ = window.shape
channels = {
"R": window[:, :, 0],
"G": window[:, :, 1],
"B": window[:, :, 2],
}
result: Dict[str, Dict[str, np.ndarray]] = {}
for cname, ch in channels.items():
prob_map, alpha = em_probability_map(ch, **em_kwargs)
syn = synthetic_cfa_map((H, W), cname, pattern=pattern)
M = similarity_measure(prob_map, syn)
result[cname] = {
"prob_map": prob_map,
"synthetic": syn,
"M": np.array(M, dtype=np.float64),
"alpha": alpha,
}
return result
def analyze_image_windows(
img: np.ndarray,
window: int = 256,
pattern: str = "RGGB",
em_kwargs: Dict = None,
) -> List[Dict]:
"""
Apply CFA EM analysis to all sliding windows of an image.
If image is smaller than `window`, the entire image is treated as one window.
"""
H, W, C = img.shape
if C != 3:
raise ValueError("Expected RGB image with 3 channels.")
if em_kwargs is None:
em_kwargs = {}
if H < window or W < window:
windows = [(0, 0)]
w_h, w_w = H, W
else:
windows = sliding_window_indices(H, W, window)
w_h = w_w = window
results = []
for (yy, xx) in windows:
sub = img[yy : yy + w_h, xx : xx + w_w, :]
res = analyze_window(sub, pattern=pattern, em_kwargs=em_kwargs)
entry = {"y": yy, "x": xx, "h": sub.shape[0], "w": sub.shape[1]}
entry.update(res)
results.append(entry)
return results
# -------------------------------------------------------------------------
# Threshold calibration and classification
# -------------------------------------------------------------------------
def calibrate_thresholds(
negative_image_paths: Sequence[str],
window: int = 256,
pattern: str = "RGGB",
em_kwargs: Dict = None,
) -> Dict[str, float]:
"""
Estimate per-channel thresholds T_R, T_G, T_B to obtain ~0% false positives
on a negative set (non-CFA / tampered images).
Threshold per channel is defined as the maximum M value observed for that
channel over all windows of all negative images.
"""
if em_kwargs is None:
em_kwargs = {}
Ms = {"R": [], "G": [], "B": []}
for path in negative_image_paths:
img = load_rgb_image(path)
window_results = analyze_image_windows(
img,
window=window,
pattern=pattern,
em_kwargs=em_kwargs,
)
for r in window_results:
Ms["R"].append(float(r["R"]["M"]))
Ms["G"].append(float(r["G"]["M"]))
Ms["B"].append(float(r["B"]["M"]))
thresholds: Dict[str, float] = {}
for c in ("R", "G", "B"):
values = Ms[c]
if not values:
raise RuntimeError(f"No M values collected for channel {c}.")
thresholds[c] = max(values)
return thresholds
def classify_windows(
window_results: List[Dict],
thresholds: Dict[str, float],
green_only: bool = False,
) -> List[Dict]:
"""
Add CFA/tampered labels to each window based on per-channel thresholds.
If green_only=False (default, PF-style):
channel is CFA-interpolated <=> M_c > T_c
window authentic <=> any channel is CFA-interpolated
If green_only=True:
channel flags are still computed, but
window authentic <=> GREEN channel is CFA-interpolated.
"""
classified = []
for r in window_results:
M_R = float(r["R"]["M"])
M_G = float(r["G"]["M"])
M_B = float(r["B"]["M"])
chan_cfa = {
"R": M_R > thresholds["R"],
"G": M_G > thresholds["G"],
"B": M_B > thresholds["B"],
}
if green_only:
authentic = chan_cfa["G"]
else:
authentic = chan_cfa["R"] or chan_cfa["G"] or chan_cfa["B"]
out = dict(r)
out["channel_cfa"] = chan_cfa
out["authentic"] = authentic
classified.append(out)
return classified
def classify_image(
img_path: str,
thresholds: Dict[str, float],
window: int = 256,
pattern: str = "RGGB",
em_kwargs: Dict = None,
green_only: bool = False,
) -> Dict:
"""
Run full sliding-window Popescu–Farid-style detector on a single image.
Returns:
{
"image_path": str,
"windows": [...],
"image_authentic": bool,
}
"""
img = load_rgb_image(img_path)
window_results = analyze_image_windows(
img,
window=window,
pattern=pattern,
em_kwargs=em_kwargs,
)
classified = classify_windows(window_results, thresholds, green_only=green_only)
image_authentic = any(w["authentic"] for w in classified)
return {
"image_path": img_path,
"windows": classified,
"image_authentic": image_authentic,
}
# -------------------------------------------------------------------------
# CLI plumbing
# -------------------------------------------------------------------------
def add_em_args(parser: argparse.ArgumentParser) -> None:
"""Add EM-related arguments to a subparser."""
parser.add_argument(
"--N",
type=int,
default=1,
help="Neighborhood radius N for EM (default: 1).",
)
parser.add_argument(
"--sigma0",
type=float,
default=0.0075,
help="Initial sigma_0 for EM (default: 0.0075).",
)
parser.add_argument(
"--p0",
type=float,
default=1.0 / 256.0,
help=(
"Outlier likelihood p0. Default 1/256 (PF-style for 8-bit data). "
"For a uniform on [0,1], consider --p0 1.0."
),
)
parser.add_argument(
"--max-iter",
type=int,
default=50,
help="Maximum EM iterations (default: 50).",
)
parser.add_argument(
"--tol",
type=float,
default=1e-5,
help="Relative convergence tolerance on alpha (default: 1e-5).",
)
parser.add_argument(
"--seed",
type=int,
default=0,
help="Random seed for EM initialization (default: 0).",
)
def em_args_to_kwargs(args: argparse.Namespace) -> Dict:
"""Convert parsed EM args into kwargs dict for em_probability_map."""
return dict(
N=args.N,
sigma0=args.sigma0,
p0=args.p0,
max_iter=args.max_iter,
tol=args.tol,
seed=args.seed,
)
def main():
parser = argparse.ArgumentParser(
description="Popescu–Farid CFA interpolation detector (EM + spectral similarity)."
)
subparsers = parser.add_subparsers(dest="command", required=True)
# Calibrate subcommand
cal_parser = subparsers.add_parser(
"calibrate",
help="Calibrate per-channel thresholds on negative (non-CFA) images.",
)
cal_parser.add_argument(
"--neg-dir",
required=True,
help="Directory with negative (non-CFA / tampered) images for calibration.",
)
cal_parser.add_argument(
"--window",
type=int,
default=256,
help="Window size (square) for analysis (default: 256).",
)
cal_parser.add_argument(
"--pattern",
type=str,
default="RGGB",
help="Bayer pattern string (default: RGGB).",
)
cal_parser.add_argument(
"--output",
type=str,
default=None,
help="Path to JSON file to save thresholds (optional).",
)
add_em_args(cal_parser)
# Detect subcommand
det_parser = subparsers.add_parser(
"detect",
help="Run detector on images using pre-calibrated thresholds.",
)
det_parser.add_argument(
"--image",
type=str,
default=None,
help="Path to a single image.",
)
det_parser.add_argument(
"--image-dir",
type=str,
default=None,
help="Directory with images to process.",
)
det_parser.add_argument(
"--thresholds",
type=str,
required=True,
help="Path to JSON file with thresholds from 'calibrate'.",
)
det_parser.add_argument(
"--window",
type=int,
default=256,
help="Window size (square) for analysis (default: 256).",
)
det_parser.add_argument(
"--pattern",
type=str,
default="RGGB",
help="Bayer pattern string (default: RGGB).",
)
det_parser.add_argument(
"--green-only",
action="store_true",
help="Use only GREEN channel for window/image decisions.",
)
add_em_args(det_parser)
args = parser.parse_args()
if args.command == "calibrate":
neg_files = list_image_files(args.neg_dir)
if not neg_files:
raise SystemExit(f"No images found in negative directory: {args.neg_dir}")
em_kwargs = em_args_to_kwargs(args)
thresholds = calibrate_thresholds(
negative_image_paths=neg_files,
window=args.window,
pattern=args.pattern,
em_kwargs=em_kwargs,
)
# Print thresholds
print("# Calibrated thresholds (0%% FPs on provided negatives):")
for c in ("R", "G", "B"):
print(f"T_{c} = {thresholds[c]:.6e}")
# Optionally save to JSON
if args.output is not None:
out_obj = {
"thresholds": thresholds,
"pattern": args.pattern,
"window": args.window,
"em_params": em_kwargs,
}
with open(args.output, "w", encoding="utf-8") as f:
json.dump(out_obj, f, indent=2)
print(f"# Saved thresholds to {args.output}")
elif args.command == "detect":
# Load thresholds JSON
with open(args.thresholds, "r", encoding="utf-8") as f:
data = json.load(f)
if "thresholds" in data:
thresholds = data["thresholds"]
else:
thresholds = data # assume raw
# Sanity check
for c in ("R", "G", "B"):
if c not in thresholds:
raise SystemExit(f"Thresholds JSON missing channel '{c}'.")
# Decide images to process
images: List[str] = []
if args.image is not None and args.image_dir is not None:
raise SystemExit("Specify either --image or --image-dir, not both.")
elif args.image is not None:
images = [args.image]
elif args.image_dir is not None:
images = list_image_files(args.image_dir)
if not images:
raise SystemExit(
f"No images found in directory: {args.image_dir}"
)
else:
raise SystemExit("You must specify either --image or --image-dir.")
em_kwargs = em_args_to_kwargs(args)
green_only = bool(args.green_only)
for img_path in images:
result = classify_image(
img_path,
thresholds=thresholds,
window=args.window,
pattern=args.pattern,
em_kwargs=em_kwargs,
green_only=green_only,
)
label = "AUTHENTIC" if result["image_authentic"] else "TAMPERED"
mode = "GREEN_ONLY" if green_only else "ALL_CHANNELS"
print(f"\n# Image: {result['image_path']}")
print(f"# Overall label: {label} (mode={mode})")
print("# y x h w M_R M_G M_B CFA_R CFA_G CFA_B WINDOW_LABEL")
for w in result["windows"]:
y = w["y"]
x = w["x"]
h = w["h"]
wd = w["w"]
M_R = float(w["R"]["M"])
M_G = float(w["G"]["M"])
M_B = float(w["B"]["M"])
cfaR = w["channel_cfa"]["R"]
cfaG = w["channel_cfa"]["G"]
cfaB = w["channel_cfa"]["B"]
wlabel = "AUTH" if w["authentic"] else "TAMP"
print(
f"{y:5d} {x:5d} {h:4d} {wd:4d} "
f"{M_R:.6e} {M_G:.6e} {M_B:.6e} "
f"{int(cfaR)} {int(cfaG)} {int(cfaB)} {wlabel}"
)
if __name__ == "__main__":
main()