|
|
""" |
|
|
Popescu–Farid CFA Consistency Analyzer for the agent. |
|
|
|
|
|
This tool analyzes Color Filter Array (CFA) demosaicing artifacts to detect |
|
|
inconsistencies within an image. It is designed for SPLICE DETECTION and |
|
|
SOURCE CONSISTENCY analysis, NOT for whole-image authenticity classification. |
|
|
|
|
|
Scientific basis: |
|
|
- Real camera images have CFA interpolation artifacts from Bayer demosaicing |
|
|
- Spliced regions from different sources (AI, screenshots, different cameras) |
|
|
may have different or absent CFA patterns |
|
|
- By analyzing the DISTRIBUTION of CFA metrics across windows, we can identify |
|
|
regions that are inconsistent with the rest of the image |
|
|
|
|
|
What this tool DOES: |
|
|
- Detects CFA pattern consistency across image regions |
|
|
- Identifies outlier windows that differ from the image baseline |
|
|
- Provides distribution analysis (unimodal vs bimodal) |
|
|
|
|
|
What this tool does NOT do: |
|
|
- Classify whole images as "authentic" or "fake" |
|
|
- Work reliably on heavily compressed images |
|
|
- Detect AI-generated images (use TruFor for that) |
|
|
|
|
|
Supports two modes: |
|
|
- analyze: run CFA consistency analysis on a single image |
|
|
- calibrate: optional; build reference thresholds from a set of camera images |
|
|
""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import json |
|
|
import sys |
|
|
from pathlib import Path |
|
|
from typing import Any, Dict, List, Sequence, Tuple |
|
|
|
|
|
import numpy as np |
|
|
|
|
|
|
|
|
|
|
|
ROOT = Path(__file__).resolve().parents[3] |
|
|
if str(ROOT) not in sys.path: |
|
|
sys.path.append(str(ROOT)) |
|
|
|
|
|
try: |
|
|
from example_tools import cfa |
|
|
except Exception as exc: |
|
|
raise ImportError( |
|
|
"Unable to import example_tools.cfa. Ensure repository root is on sys.path." |
|
|
) from exc |
|
|
|
|
|
|
|
|
DEFAULT_PATTERN = "RGGB" |
|
|
DEFAULT_WINDOW = 256 |
|
|
DEFAULT_TOP_K = 5 |
|
|
DEFAULT_OUTLIER_ZSCORE = 2.0 |
|
|
|
|
|
|
|
|
def _parse_request(input_str: str) -> Dict[str, Any]: |
|
|
"""Parse JSON or treat input_str as image_path for analyze mode.""" |
|
|
try: |
|
|
data = json.loads(input_str) |
|
|
if isinstance(data, dict): |
|
|
return data |
|
|
if isinstance(data, str): |
|
|
return {"mode": "analyze", "image_path": data} |
|
|
except Exception: |
|
|
pass |
|
|
return {"mode": "analyze", "image_path": input_str} |
|
|
|
|
|
|
|
|
def _compute_stats(values: Sequence[float]) -> Dict[str, float]: |
|
|
"""Compute basic statistics for a list of values.""" |
|
|
arr = np.asarray(values, dtype=np.float64) |
|
|
if arr.size == 0: |
|
|
return {"min": 0.0, "max": 0.0, "mean": 0.0, "median": 0.0, "std": 0.0} |
|
|
return { |
|
|
"min": float(np.min(arr)), |
|
|
"max": float(np.max(arr)), |
|
|
"mean": float(np.mean(arr)), |
|
|
"median": float(np.median(arr)), |
|
|
"std": float(np.std(arr)), |
|
|
} |
|
|
|
|
|
|
|
|
def _detect_bimodality(values: Sequence[float]) -> Dict[str, Any]: |
|
|
""" |
|
|
Detect if the distribution of values is bimodal using Hartigan's dip test |
|
|
approximation and coefficient of bimodality. |
|
|
|
|
|
Returns: |
|
|
Dictionary with bimodality analysis results |
|
|
""" |
|
|
arr = np.asarray(values, dtype=np.float64) |
|
|
if arr.size < 10: |
|
|
return { |
|
|
"is_bimodal": False, |
|
|
"bimodality_coefficient": 0.0, |
|
|
"distribution_type": "insufficient_data", |
|
|
"note": "Need at least 10 windows for distribution analysis", |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
mean = np.mean(arr) |
|
|
std = np.std(arr) |
|
|
if std < 1e-10: |
|
|
return { |
|
|
"is_bimodal": False, |
|
|
"bimodality_coefficient": 0.0, |
|
|
"distribution_type": "constant", |
|
|
"note": "All values are nearly identical", |
|
|
} |
|
|
|
|
|
normalized = (arr - mean) / std |
|
|
skewness = float(np.mean(normalized ** 3)) |
|
|
kurtosis = float(np.mean(normalized ** 4)) |
|
|
|
|
|
|
|
|
excess_kurtosis = kurtosis - 3.0 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
bc = (skewness ** 2 + 1) / (kurtosis + 3 * ((arr.size - 1) ** 2) / ((arr.size - 2) * (arr.size - 3))) |
|
|
bc = float(bc) |
|
|
|
|
|
|
|
|
cv = std / mean if mean > 0 else 0.0 |
|
|
|
|
|
|
|
|
if bc > 0.6: |
|
|
dist_type = "bimodal" |
|
|
is_bimodal = True |
|
|
elif bc > 0.5: |
|
|
dist_type = "possibly_bimodal" |
|
|
is_bimodal = False |
|
|
elif cv > 0.3: |
|
|
dist_type = "high_variance" |
|
|
is_bimodal = False |
|
|
else: |
|
|
dist_type = "unimodal" |
|
|
is_bimodal = False |
|
|
|
|
|
return { |
|
|
"is_bimodal": is_bimodal, |
|
|
"bimodality_coefficient": bc, |
|
|
"coefficient_of_variation": float(cv), |
|
|
"skewness": skewness, |
|
|
"excess_kurtosis": excess_kurtosis, |
|
|
"distribution_type": dist_type, |
|
|
} |
|
|
|
|
|
|
|
|
def _find_outliers( |
|
|
values: Sequence[float], |
|
|
positions: Sequence[Tuple[int, int, int, int]], |
|
|
z_threshold: float = DEFAULT_OUTLIER_ZSCORE, |
|
|
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]: |
|
|
""" |
|
|
Find outlier windows based on z-score from median. |
|
|
|
|
|
Uses median and MAD (median absolute deviation) for robustness. |
|
|
|
|
|
Args: |
|
|
values: M values for each window |
|
|
positions: (y, x, h, w) for each window |
|
|
z_threshold: Z-score threshold for outlier detection |
|
|
|
|
|
Returns: |
|
|
Tuple of (low_outliers, high_outliers) - windows with unusually low/high M values |
|
|
""" |
|
|
arr = np.asarray(values, dtype=np.float64) |
|
|
if arr.size < 5: |
|
|
return [], [] |
|
|
|
|
|
median = float(np.median(arr)) |
|
|
mad = float(np.median(np.abs(arr - median))) |
|
|
|
|
|
mad_scaled = mad * 1.4826 if mad > 0 else 1e-10 |
|
|
|
|
|
low_outliers = [] |
|
|
high_outliers = [] |
|
|
|
|
|
for i, (val, pos) in enumerate(zip(values, positions)): |
|
|
z_score = (val - median) / mad_scaled |
|
|
if z_score < -z_threshold: |
|
|
|
|
|
if val < 1e-6: |
|
|
interp = "Zero CFA signal - likely flat/uniform region (sky, wall) or synthetic" |
|
|
else: |
|
|
interp = "Weak CFA signal - possible splice, synthetic region, or heavy processing" |
|
|
low_outliers.append({ |
|
|
"y": pos[0], |
|
|
"x": pos[1], |
|
|
"h": pos[2], |
|
|
"w": pos[3], |
|
|
"M_value": float(val), |
|
|
"z_score": float(z_score), |
|
|
"interpretation": interp, |
|
|
}) |
|
|
elif z_score > z_threshold: |
|
|
high_outliers.append({ |
|
|
"y": pos[0], |
|
|
"x": pos[1], |
|
|
"h": pos[2], |
|
|
"w": pos[3], |
|
|
"M_value": float(val), |
|
|
"z_score": float(z_score), |
|
|
"interpretation": "Unusually strong CFA - possible different camera source", |
|
|
}) |
|
|
|
|
|
|
|
|
low_outliers.sort(key=lambda x: x["z_score"]) |
|
|
high_outliers.sort(key=lambda x: -x["z_score"]) |
|
|
|
|
|
return low_outliers, high_outliers |
|
|
|
|
|
|
|
|
def _classify_window_populations( |
|
|
values: Sequence[float], |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Classify windows into populations based on M value magnitude. |
|
|
|
|
|
Real camera images typically show: |
|
|
- Low M (~0): Flat/uniform regions (sky, walls) - no texture to detect CFA |
|
|
- High M (>1e9): Textured regions with strong CFA signal |
|
|
|
|
|
This is content-dependent, not evidence of manipulation. |
|
|
Manipulation would show as textured regions WITHOUT CFA signal. |
|
|
""" |
|
|
arr = np.asarray(values, dtype=np.float64) |
|
|
if arr.size == 0: |
|
|
return {"flat_regions": 0, "textured_regions": 0, "intermediate": 0} |
|
|
|
|
|
|
|
|
|
|
|
flat_threshold = 1e6 |
|
|
textured_threshold = 1e9 |
|
|
|
|
|
flat_count = int(np.sum(arr < flat_threshold)) |
|
|
textured_count = int(np.sum(arr >= textured_threshold)) |
|
|
intermediate_count = len(arr) - flat_count - textured_count |
|
|
|
|
|
return { |
|
|
"flat_regions": flat_count, |
|
|
"textured_regions": textured_count, |
|
|
"intermediate": intermediate_count, |
|
|
"flat_pct": flat_count / len(arr) * 100, |
|
|
"textured_pct": textured_count / len(arr) * 100, |
|
|
} |
|
|
|
|
|
|
|
|
def _window_brief(entry: Dict[str, Any], channel: str = "G") -> Dict[str, Any]: |
|
|
"""Extract brief window info for a specific channel.""" |
|
|
return { |
|
|
"y": entry["y"], |
|
|
"x": entry["x"], |
|
|
"h": entry["h"], |
|
|
"w": entry["w"], |
|
|
"M_value": float(entry[channel]["M"]), |
|
|
} |
|
|
|
|
|
|
|
|
def _analyze(params: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Run CFA consistency analysis on a single image.""" |
|
|
image_path = params.get("image_path") |
|
|
if not image_path: |
|
|
return {"error": "image_path is required for analyze mode."} |
|
|
|
|
|
window = int(params.get("window", DEFAULT_WINDOW)) |
|
|
pattern = params.get("pattern", DEFAULT_PATTERN) |
|
|
em_kwargs = params.get("em") or params.get("em_kwargs") or {} |
|
|
top_k = int(params.get("top_k", DEFAULT_TOP_K)) |
|
|
channel = params.get("channel", "G").upper() |
|
|
|
|
|
if channel not in ("R", "G", "B"): |
|
|
channel = "G" |
|
|
|
|
|
try: |
|
|
img = cfa.load_rgb_image(str(image_path)) |
|
|
except Exception as e: |
|
|
return {"error": f"Failed to load image: {e}"} |
|
|
|
|
|
try: |
|
|
window_results = cfa.analyze_image_windows( |
|
|
img, window=window, pattern=pattern, em_kwargs=em_kwargs |
|
|
) |
|
|
except Exception as e: |
|
|
return {"error": f"CFA analysis failed: {e}"} |
|
|
|
|
|
if not window_results: |
|
|
return {"error": "No windows analyzed (image may be too small)."} |
|
|
|
|
|
|
|
|
m_values = [float(r[channel]["M"]) for r in window_results] |
|
|
positions = [(r["y"], r["x"], r["h"], r["w"]) for r in window_results] |
|
|
|
|
|
|
|
|
stats = _compute_stats(m_values) |
|
|
|
|
|
|
|
|
bimodality = _detect_bimodality(m_values) |
|
|
|
|
|
|
|
|
populations = _classify_window_populations(m_values) |
|
|
|
|
|
|
|
|
sorted_indices = np.argsort(m_values)[::-1] |
|
|
top_windows = [ |
|
|
_window_brief(window_results[i], channel) |
|
|
for i in sorted_indices[:top_k] |
|
|
] |
|
|
|
|
|
|
|
|
bottom_windows = [ |
|
|
_window_brief(window_results[i], channel) |
|
|
for i in sorted_indices[-top_k:][::-1] |
|
|
] |
|
|
|
|
|
|
|
|
has_cfa_signal = populations["textured_regions"] > 0 |
|
|
textured_pct = populations["textured_pct"] |
|
|
|
|
|
|
|
|
if not has_cfa_signal: |
|
|
interpretation = ( |
|
|
"No strong CFA signal detected in any region. " |
|
|
"This could indicate: (1) AI-generated image, (2) heavily processed image, " |
|
|
"(3) screenshot, or (4) image with only flat/uniform content." |
|
|
) |
|
|
elif textured_pct > 50: |
|
|
interpretation = ( |
|
|
f"Strong CFA signal detected in {textured_pct:.0f}% of windows. " |
|
|
"Consistent with camera-captured image. Flat regions (sky, walls) " |
|
|
"naturally show weaker CFA signal due to lack of texture." |
|
|
) |
|
|
elif textured_pct > 20: |
|
|
interpretation = ( |
|
|
f"CFA signal detected in {textured_pct:.0f}% of windows (textured regions). " |
|
|
"Remaining windows are flat/uniform regions where CFA cannot be detected. " |
|
|
"This distribution is normal for photos with sky or uniform backgrounds." |
|
|
) |
|
|
else: |
|
|
interpretation = ( |
|
|
f"Weak CFA signal - only {textured_pct:.0f}% of windows show strong CFA. " |
|
|
"Image may be heavily processed, low-texture, or partially synthetic." |
|
|
) |
|
|
|
|
|
|
|
|
result: Dict[str, Any] = { |
|
|
"tool": "perform_cfa_detection", |
|
|
"status": "completed", |
|
|
"image_path": str(image_path), |
|
|
"analysis_channel": channel, |
|
|
"window_size": window, |
|
|
"window_count": len(window_results), |
|
|
"pattern": pattern, |
|
|
|
|
|
|
|
|
"has_cfa_signal": has_cfa_signal, |
|
|
"interpretation": interpretation, |
|
|
|
|
|
|
|
|
"window_populations": { |
|
|
"textured_with_cfa": populations["textured_regions"], |
|
|
"flat_no_texture": populations["flat_regions"], |
|
|
"intermediate": populations["intermediate"], |
|
|
"textured_pct": populations["textured_pct"], |
|
|
"flat_pct": populations["flat_pct"], |
|
|
}, |
|
|
|
|
|
|
|
|
"distribution": { |
|
|
"type": bimodality["distribution_type"], |
|
|
"is_bimodal": bimodality["is_bimodal"], |
|
|
"bimodality_coefficient": bimodality["bimodality_coefficient"], |
|
|
"note": "Bimodal distribution is NORMAL for photos with mixed content (sky + texture)", |
|
|
}, |
|
|
|
|
|
|
|
|
"m_value_stats": stats, |
|
|
|
|
|
|
|
|
"strongest_cfa_windows": top_windows, |
|
|
"weakest_cfa_windows": bottom_windows, |
|
|
|
|
|
"note": ( |
|
|
"CFA analysis detects demosaicing artifacts from camera sensors. " |
|
|
"Flat regions (sky, walls) naturally have weak/no CFA signal. " |
|
|
"Look for TEXTURED regions with weak CFA - those may be spliced. " |
|
|
"This tool complements TruFor for localization, not whole-image classification." |
|
|
), |
|
|
} |
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
def _calibrate(params: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Calibrate reference statistics from a set of camera images.""" |
|
|
neg_dir = params.get("neg_dir") or params.get("ref_dir") |
|
|
if not neg_dir: |
|
|
return {"error": "neg_dir (or ref_dir) is required for calibrate mode."} |
|
|
|
|
|
window = int(params.get("window", DEFAULT_WINDOW)) |
|
|
pattern = params.get("pattern", DEFAULT_PATTERN) |
|
|
em_kwargs = params.get("em") or params.get("em_kwargs") or {} |
|
|
save_to = params.get("save_to") or params.get("output") |
|
|
|
|
|
neg_files = cfa.list_image_files(str(neg_dir)) |
|
|
if not neg_files: |
|
|
return {"error": f"No images found in directory: {neg_dir}"} |
|
|
|
|
|
|
|
|
all_m_values: Dict[str, List[float]] = {"R": [], "G": [], "B": []} |
|
|
|
|
|
for path in neg_files: |
|
|
try: |
|
|
img = cfa.load_rgb_image(str(path)) |
|
|
window_results = cfa.analyze_image_windows( |
|
|
img, window=window, pattern=pattern, em_kwargs=em_kwargs |
|
|
) |
|
|
for r in window_results: |
|
|
all_m_values["R"].append(float(r["R"]["M"])) |
|
|
all_m_values["G"].append(float(r["G"]["M"])) |
|
|
all_m_values["B"].append(float(r["B"]["M"])) |
|
|
except Exception: |
|
|
continue |
|
|
|
|
|
if not all_m_values["G"]: |
|
|
return {"error": "No valid windows collected from reference images."} |
|
|
|
|
|
|
|
|
reference_stats = { |
|
|
c: _compute_stats(vals) for c, vals in all_m_values.items() |
|
|
} |
|
|
|
|
|
payload = { |
|
|
"reference_stats": reference_stats, |
|
|
"pattern": pattern, |
|
|
"window": window, |
|
|
"em_params": em_kwargs, |
|
|
"num_images": len(neg_files), |
|
|
"num_windows": len(all_m_values["G"]), |
|
|
} |
|
|
|
|
|
if save_to: |
|
|
Path(save_to).write_text(json.dumps(payload, indent=2), encoding="utf-8") |
|
|
payload["saved_to"] = str(save_to) |
|
|
|
|
|
return payload |
|
|
|
|
|
|
|
|
def perform_cfa_detection(input_str: str) -> str: |
|
|
""" |
|
|
LangChain tool entrypoint for CFA consistency analysis. |
|
|
|
|
|
This tool analyzes CFA (Color Filter Array) demosaicing patterns to detect |
|
|
INCONSISTENCIES within an image. It is designed for splice detection and |
|
|
source consistency analysis. |
|
|
|
|
|
Input (JSON): |
|
|
- mode: "analyze" (default) or "calibrate" |
|
|
- image_path: required for analyze |
|
|
- window: int (default 256) |
|
|
- pattern: Bayer pattern (default RGGB) |
|
|
- channel: which channel to analyze (default "G" - green is most reliable) |
|
|
- em / em_kwargs: dict for EM params (N, sigma0, p0, max_iter, tol, seed) |
|
|
- top_k: int (default 5) - number of top/outlier windows to return |
|
|
- outlier_zscore: float (default 2.0) - z-score threshold for outlier detection |
|
|
- neg_dir/ref_dir: required for calibrate mode |
|
|
- save_to: optional path to write reference stats JSON (calibrate) |
|
|
|
|
|
Output: |
|
|
- cfa_consistency_score: 0-1 score (higher = more consistent) |
|
|
- distribution: analysis of M value distribution (unimodal/bimodal) |
|
|
- outliers: windows with unusually low/high CFA patterns |
|
|
- interpretation: human-readable summary |
|
|
""" |
|
|
params = _parse_request(input_str) |
|
|
mode = params.get("mode", "analyze").lower() |
|
|
|
|
|
|
|
|
if mode == "detect": |
|
|
mode = "analyze" |
|
|
|
|
|
if mode == "calibrate": |
|
|
result = _calibrate(params) |
|
|
elif mode == "analyze": |
|
|
result = _analyze(params) |
|
|
else: |
|
|
result = {"error": "mode must be 'analyze' or 'calibrate'."} |
|
|
|
|
|
try: |
|
|
return json.dumps(result, indent=2) |
|
|
except Exception: |
|
|
|
|
|
return json.dumps({"error": "Failed to serialize result."}, indent=2) |
|
|
|
|
|
|
|
|
__all__ = ["perform_cfa_detection"] |
|
|
|
|
|
|