df2 / src /tools /forensic /jpeg_tools.py
Mustafa Akcanca
Tidy up first tooling
29b829b
"""
JPEG-focused forensic helpers.
Implements quantization-step estimation and double-compression cues aligned
with the MATLAB reference toolbox (factor histograms, Sac score, block maps).
References:
- Factor histogram quantization step: fh_jpgstep.m
Source: example_tools/matlab-forensics/00 - JPEG Quantization estimation/fh_jpgstep.m
- Sac/JPEGness score: fh_jpgdetect.m
Source: example_tools/matlab-forensics/00 - JPEG Quantization estimation/fh_jpgdetect.m
Note: Sac score measures "JPEGness" - higher values indicate stronger JPEG compression
artifacts. This is used to detect if an image was previously JPEG-compressed, NOT
directly for double-compression detection.
- Block-level tamper probability map: Extract_Features_JPEG.m
Source: example_tools/matlab-forensics/01 - Fast, automatic and fine-grained tampered
JPEG image detection via DCT coefficient analysis/Extract_Features_JPEG.m
- Zig-zag order: jpeg.py
Source: example_tools/sherloq/gui/jpeg.py
Dependencies:
- jpeglib (optional): For raw JPEG DCT coefficient access, matching MATLAB's jpeg_read().
Install with: pip install jpeglib
Supports libjpeg 6b-9e, libjpeg-turbo, and mozjpeg.
When jpeglib is not available, falls back to pixel-domain DCT (less accurate for
factor histogram estimation, especially DC coefficient).
"""
import json
from typing import Dict, List, Optional, Tuple
import numpy as np
from scipy.fftpack import dct
# Try to import jpeglib for raw coefficient access (like MATLAB's jpeg_read)
# https://github.com/martinbenes1996/jpeglib
try:
import jpeglib
_HAS_JPEGLIB = True
except ImportError:
_HAS_JPEGLIB = False
# Zig-zag order from sherloq/gui/jpeg.py (ZIG_ZAG constant, line 6-71)
_ZIG_ZAG: List[Tuple[int, int]] = [
(0, 0),
(0, 1),
(1, 0),
(2, 0),
(1, 1),
(0, 2),
(0, 3),
(1, 2),
(2, 1),
(3, 0),
(4, 0),
(3, 1),
(2, 2),
(1, 3),
(0, 4),
(0, 5),
(1, 4),
(2, 3),
(3, 2),
(4, 1),
(5, 0),
(6, 0),
(5, 1),
(4, 2),
(3, 3),
(2, 4),
(1, 5),
(0, 6),
(0, 7),
(1, 6),
(2, 5),
(3, 4),
(4, 3),
(5, 2),
(6, 1),
(7, 0),
(7, 1),
(6, 2),
(5, 3),
(4, 4),
(3, 5),
(2, 6),
(1, 7),
(2, 7),
(3, 6),
(4, 5),
(5, 4),
(6, 3),
(7, 2),
(7, 3),
(6, 4),
(5, 5),
(4, 6),
(3, 7),
(4, 7),
(5, 6),
(6, 5),
(7, 4),
(7, 5),
(6, 6),
(5, 7),
(6, 7),
(7, 6),
(7, 7),
]
def _std_luma_table() -> Tuple[int, ...]:
"""ITU-T81 standard luminance quantization table."""
return (
16, 11, 10, 16, 24, 40, 51, 61,
12, 12, 14, 19, 26, 58, 60, 55,
14, 13, 16, 24, 40, 57, 69, 56,
14, 17, 22, 29, 51, 87, 80, 62,
18, 22, 37, 56, 68, 109, 103, 77,
24, 35, 55, 64, 81, 104, 113, 92,
49, 64, 78, 87, 103, 121, 120, 101,
72, 92, 95, 98, 112, 100, 103, 99,
)
def _read_jpeg_raw(image_path: str) -> Optional[Dict]:
"""
Read raw JPEG DCT coefficients using jpeglib.
This is the Python equivalent of MATLAB's jpeg_read() from the JPEG Toolbox.
Reference: https://github.com/martinbenes1996/jpeglib
Uses libjpeg internally, supporting versions 6b-9e, libjpeg-turbo, mozjpeg.
Returns dict with:
- Y: Luminance DCT coefficients - already quantized integers
jpeglib returns Y as 4D array (h_blocks, w_blocks, 8, 8)
- Cb, Cr: Chrominance DCT coefficients (may be None for grayscale)
- qt: Quantization tables list
- height, width: Image dimensions in pixels
Returns None if jpeglib is not available or read fails.
"""
if not _HAS_JPEGLIB:
return None
try:
im = jpeglib.read_dct(image_path)
# Verify we have the expected attributes
if not hasattr(im, 'Y') or im.Y is None:
return None
# jpeglib returns Y as 4D array: (h_blocks, w_blocks, 8, 8)
# Convert to our expected format: (n_blocks, 8, 8)
y_shape = im.Y.shape
if len(y_shape) == 4:
# Shape is (h_blocks, w_blocks, 8, 8)
h_blocks, w_blocks = y_shape[0], y_shape[1]
# Reshape to (n_blocks, 8, 8)
y_blocks = im.Y.reshape(-1, 8, 8)
# Calculate actual image dimensions
height = h_blocks * 8
width = w_blocks * 8
elif len(y_shape) == 2:
# Fallback: if it's 2D, treat as (h, w) and we'll blockify later
height, width = y_shape
y_blocks = None # Will need to blockify
else:
return None
# Verify quantization tables exist
if not hasattr(im, 'qt') or im.qt is None:
return None
# Convert qt to list format for consistency with our API
# jpeglib returns qt as numpy array with shape (n_tables, 8, 8)
qt_list = []
try:
qt_array = np.asarray(im.qt, dtype=np.int32)
if qt_array.size == 0:
return None
# Handle qt array - jpeglib returns (n_tables, 8, 8)
if qt_array.ndim == 3 and qt_array.shape[1] == 8 and qt_array.shape[2] == 8:
# Shape (n_tables, 8, 8) - multiple tables
qt_list = [qt_array[i] for i in range(qt_array.shape[0])]
elif qt_array.ndim == 2 and qt_array.shape == (8, 8):
# Single 8x8 table
qt_list = [qt_array]
else:
# Try to extract 8x8 blocks
flat = qt_array.flatten()
n_tables = flat.size // 64
if n_tables > 0:
qt_list = [flat[i*64:(i+1)*64].reshape(8, 8) for i in range(n_tables)]
if not qt_list:
return None
except Exception:
# If qt conversion fails, return None (fall back to pixel domain)
return None
# Return data with Y as blocks (n_blocks, 8, 8)
return {
"Y": y_blocks, # Luminance DCT blocks (n_blocks, 8, 8) - quantized coefficients
"Cb": im.Cb, # Chrominance Cb (may be None or 4D like Y)
"Cr": im.Cr, # Chrominance Cr (may be None or 4D like Y)
"qt": qt_list, # List of quantization tables (8x8 arrays)
"height": height,
"width": width,
"grid_shape": (h_blocks, w_blocks) if len(y_shape) == 4 else None, # Store grid shape for convenience
}
except Exception as e:
# Return None on any error (file not found, invalid JPEG, etc.)
# In production, you might want to log this: import logging; logging.debug(f"jpeglib read failed: {e}")
return None
def _coef_array_to_blocks(coef_array: np.ndarray) -> Tuple[np.ndarray, Tuple[int, int]]:
"""
Convert jpeglib coefficient array format to per-block format.
jpeglib stores DCT coefficients in a 2D array where blocks are arranged
spatially (each 8x8 region is one DCT block). This matches MATLAB's
jpeg_read().coef_arrays{1} format.
Returns:
- blocks: (n_blocks, 8, 8) array of DCT coefficients
- grid_shape: (h_blocks, w_blocks) tuple
"""
h, w = coef_array.shape
h_blocks = h // 8
w_blocks = w // 8
if h_blocks == 0 or w_blocks == 0:
return np.empty((0, 8, 8), dtype=coef_array.dtype), (0, 0)
# Crop to exact multiple of 8
cropped = coef_array[:h_blocks * 8, :w_blocks * 8]
# Reshape to blocks: (h_blocks, 8, w_blocks, 8) -> (h_blocks, w_blocks, 8, 8) -> (n_blocks, 8, 8)
blocks = cropped.reshape(h_blocks, 8, w_blocks, 8).transpose(0, 2, 1, 3).reshape(-1, 8, 8)
return blocks, (h_blocks, w_blocks)
def _blockify(gray: np.ndarray):
"""Crop to 8x8 grid and return blocks, grid shape, and cropped shape."""
blk = 8
h, w = gray.shape
h_crop = h - (h % blk)
w_crop = w - (w % blk)
if h_crop < blk or w_crop < blk:
return None, (0, 0), (0, 0)
cropped = gray[:h_crop, :w_crop]
blocks = cropped.reshape(h_crop // blk, blk, w_crop // blk, blk).transpose(0, 2, 1, 3).reshape(-1, blk, blk)
return blocks, (h_crop // blk, w_crop // blk), (h_crop, w_crop)
def _parse_request(input_str: str):
"""
Allow either plain path or JSON string:
{
"path": "/path/to/image.jpg",
"include": {
"primary_table": true,
"truncation_mask": true,
"block_map": true,
"per_frequency": false
}
}
"""
try:
data = json.loads(input_str)
path = data.get("path", input_str.strip())
include = data.get("include", {}) if isinstance(data, dict) else {}
include = include if isinstance(include, dict) else {}
return path, include
except Exception:
return input_str.strip(), {}
def _truncation_mask_from_pixels(image: np.ndarray, grid_shape: Tuple[int, int]) -> np.ndarray:
"""
True for blocks that are NOT truncated (no 0/255 clipping).
Reference: fh_jpgstep.m lines 17-29, fh_jpgdetect.m lines 10-23
"""
blk = 8
h_blocks, w_blocks = grid_shape
if h_blocks == 0 or w_blocks == 0:
return np.zeros((h_blocks, w_blocks), dtype=bool)
cropped = image[: h_blocks * blk, : w_blocks * blk]
blocks = cropped.reshape(h_blocks, blk, w_blocks, blk).transpose(0, 2, 1, 3)
block_max = blocks.max(axis=(2, 3))
block_min = blocks.min(axis=(2, 3))
# MATLAB: if pmax == 255 || pmin == 0 then exclude
return np.logical_and(block_max < 255, block_min > 0)
def _block_dcts_from_pixels(gray: np.ndarray) -> Tuple[np.ndarray, Tuple[int, int]]:
"""
Compute block DCTs from pixel values.
Reference: bdct() in MATLAB.
Note: This is a fallback when jpegio is not available.
"""
blocks, grid_shape, _ = _blockify(gray)
if blocks is None:
return np.empty((0, 8, 8), dtype=np.float32), grid_shape
# MATLAB: bdct(pmtx - 128)
dcts = dct(dct(blocks.astype(np.float32) - 128.0, axis=1, norm="ortho"), axis=2, norm="ortho")
return dcts, grid_shape
def _estimate_quality(qtable) -> Dict[str, object]:
"""
Estimate JPEG quality from quantization table by matching against
standard tables scaled per IJG formula.
Reference: sherloq/gui/jpeg.py get_tables() and quality.py
"""
base = np.array(_std_luma_table(), dtype=np.int32).reshape(8, 8)
best_q = None
best_err = float("inf")
best_table = None
for quality in range(1, 101):
# IJG quality scaling formula
scale = 5000 / quality if quality < 50 else 200 - quality * 2
cand = np.floor((base * scale + 50) / 100).astype(np.int32)
cand = np.clip(cand, 1, 255)
err = float(np.mean(np.abs(cand - qtable)))
if err < best_err:
best_err = err
best_q = quality
best_table = cand
return {
"estimated_quality": int(best_q) if best_q is not None else None,
"mean_abs_error": best_err if best_q is not None else None,
"exact_match": bool(best_err == 0.0) if best_q is not None else False,
"best_fit_table": best_table.tolist() if best_table is not None else None,
}
def _factor_histogram_step(coeffs: np.ndarray, threshold: float = 0.7) -> int:
"""
Estimate quantization step via factor histogram.
Reference: fh_jpgstep.m lines 32-52
MATLAB code:
fhcell = coefhist(dctmtx, mask, 'factor_histogram');
cfh = cfh / cfh(1);
step(invpos(i)) = find(cfh>=t, 1, 'last');
"""
samples = np.abs(np.round(coeffs)).astype(np.int64)
# MATLAB coefhist.m line 31: samples = samples(samples>1)
samples = samples[samples > 1]
if samples.size == 0:
return 0
maxel = int(samples.max())
if maxel <= 1:
return 1
qsmax = min(100, maxel)
# MATLAB: mode_hist = hist(samples(:), 1:maxel)
# Creates array where mode_hist(1) = count of value 1, mode_hist(2) = count of value 2, etc.
# Since samples only contains values >= 2, mode_hist(1) = 0.
# Python: hist_full = np.bincount(samples) creates hist_full[0] = count(0), hist_full[1] = count(1), etc.
# So mode_hist(q) corresponds to hist_full[q] for q >= 1.
hist_full = np.bincount(samples, minlength=maxel + 1)
# Check if we have any samples (values >= 2)
if np.sum(hist_full[2:]) == 0:
return 1
# MATLAB: fh(q) = sum(mode_hist(q:q:end)) for q = 1:fhlen
# mode_hist(q:q:end) accesses indices q, 2q, 3q, ... up to maxel (MATLAB 1-based)
# In Python, hist_full[q::q] accesses indices q, 2q, 3q, ... which correspond to the same coefficient values
fhlen = min(qsmax, maxel)
fh = np.array([hist_full[q::q].sum() for q in range(1, fhlen + 1)], dtype=np.float64)
if fh[0] == 0:
return 1
fh /= fh[0]
# MATLAB: find(cfh>=t, 1, 'last')
above = np.where(fh >= threshold)[0]
if above.size == 0:
return 1
return int(above[-1] + 1)
def _estimate_primary_qtable_raw(coef_blocks: np.ndarray, grid_shape: Tuple[int, int],
gray: Optional[np.ndarray] = None,
threshold: float = 0.7) -> Dict[str, object]:
"""
Per-frequency quantization step estimation using factor histograms.
Reference: fh_jpgstep.m
Uses raw DCT coefficients from jpegio for accurate estimation.
"""
if coef_blocks.size == 0:
return {"table": None, "mask": None}
h_blocks, w_blocks = grid_shape
# Truncation mask from pixels if available
if gray is not None:
mask_valid = _truncation_mask_from_pixels(gray, grid_shape).reshape(-1)
else:
mask_valid = np.ones(coef_blocks.shape[0], dtype=bool)
steps = np.zeros((8, 8), dtype=np.int32)
for idx, (u, v) in enumerate(_ZIG_ZAG):
coeffs = coef_blocks[:, u, v].astype(np.float64)
if mask_valid.size == coeffs.size:
coeffs = coeffs[mask_valid]
step = _factor_histogram_step(coeffs, threshold=threshold)
steps[u, v] = step
return {"table": steps.tolist(), "mask": mask_valid.reshape(h_blocks, w_blocks).tolist()}
def _sac_score_raw(coef_blocks: np.ndarray) -> Dict[str, object]:
"""
Sac/JPEGness score using raw DCT coefficients.
Reference: fh_jpgdetect.m
Higher score = stronger JPEG artifacts = more likely to be JPEG-compressed.
MATLAB code (fh_jpgdetect.m):
dctmtx(1:8:end,1:8:end) = 0; % only AC coefficients
samples = abs(round(dctmtx(:)));
samples = samples(samples>1); % exclude 0, -1, and 1
coef_histo = hist(samples, 1:maxel);
fh(q) = sum(coef_histo(q:q:end)); % factor histogram
fh = fh / fh(1); % normalize
deriv1 = fh(2:end) - fh(1:end-1); % first derivative
S = max(deriv1); % Sac score
"""
if coef_blocks.size == 0:
return {"score": None, "note": "No DCT blocks available."}
dcts = coef_blocks.copy().astype(np.float64)
# MATLAB line 27: dctmtx(1:8:end,1:8:end) = 0 (ignore DC)
dcts[:, 0, 0] = 0
samples = np.abs(np.round(dcts.reshape(-1))).astype(np.int64)
# MATLAB line 30-31: exclude 0, -1, and 1 -> samples = samples(samples>1)
samples = samples[samples > 1]
if samples.size == 0:
return {"score": 0.0, "note": "Insufficient AC energy (no |coef| > 1)."}
maxel = int(samples.max())
if maxel <= 1:
return {"score": 0.0, "note": "Max coefficient <= 1."}
# MATLAB line 34: coef_histo = hist(samples, 1:maxel)
coef_histo = np.bincount(samples, minlength=maxel + 1)[1:maxel + 1] # bins 1 to maxel
if coef_histo.size == 0:
return {"score": 0.0, "note": "Empty histogram."}
# MATLAB lines 42-46: factor histogram
qsmax = 100
fhlen = min(qsmax, maxel)
fh = np.zeros(fhlen, dtype=np.float64)
for q in range(1, fhlen + 1):
# MATLAB: fh(q) = sum(coef_histo(q:q:end))
fh[q - 1] = coef_histo[q - 1::q].sum()
if fh[0] == 0:
return {"score": 0.0, "note": "Factor histogram empty at q=1."}
# MATLAB line 49: normalize
fh /= fh[0]
# MATLAB lines 51-52: S = max(deriv1)
deriv1 = fh[1:] - fh[:-1]
score = float(np.max(deriv1)) if deriv1.size > 0 else 0.0
return {"score": score, "histogram_length": int(maxel), "source": "raw_coefficients"}
def _sac_score_pixels(gray: np.ndarray) -> Dict[str, object]:
"""
Sac/JPEGness score from pixel-domain DCT (fallback when jpegio unavailable).
Less accurate than raw coefficient version.
"""
dcts, _ = _block_dcts_from_pixels(gray)
if dcts.size == 0:
return {"score": None, "note": "Image too small for DCT grid."}
result = _sac_score_raw(dcts)
result["source"] = "pixel_domain"
return result
def _period_from_histogram(hist: np.ndarray) -> int:
"""
Find dominant period via FFT peak.
Reference: Extract_Features_JPEG.m lines 67-94
MATLAB code:
FFT=abs(fft(coeffHist));
DC=FFT(1);
FreqValley=1;
while (FreqValley<length(FFT)-1) && (FFT(FreqValley)>= FFT(FreqValley+1))
FreqValley=FreqValley+1;
end
FFT=FFT(FreqValley:floor(length(FFT)/2));
[maxPeak,FFTPeak]=max(FFT);
FFTPeak=FFTPeak+FreqValley-1-1;
if length(FFTPeak)==0 | maxPeak<DC/5 | min(FFT)/maxPeak>0.9
p_h_fft(coeffIndex)=1;
else
p_h_fft(coeffIndex)=round(length(coeffHist)/FFTPeak);
end
"""
if hist.size == 0 or hist.sum() == 0:
return 1
fft_vals = np.abs(np.fft.fft(hist))
if fft_vals.size < 3:
return 1
dc = fft_vals[0]
# Find first local minimum to remove DC peak
freq_valley = 0
while freq_valley < fft_vals.size - 1 and fft_vals[freq_valley] >= fft_vals[freq_valley + 1]:
freq_valley += 1
# MATLAB: FFT=FFT(FreqValley:floor(length(FFT)/2))
fft_slice = fft_vals[freq_valley: max(freq_valley + 1, fft_vals.size // 2)]
if fft_slice.size == 0:
return 1
max_peak = fft_slice.max()
fft_peak_local = int(np.argmax(fft_slice))
# MATLAB: FFTPeak=FFTPeak+FreqValley-1-1
fft_peak = fft_peak_local + freq_valley
# MATLAB thresholds: maxPeak<DC/5 | min(FFT)/maxPeak>0.9
if max_peak < dc / 5:
return 1
if fft_slice.size > 0 and fft_slice.min() / max(max_peak, 1e-9) > 0.9:
return 1
if fft_peak == 0:
return 1
# MATLAB: round(length(coeffHist)/FFTPeak)
period = int(round(hist.size / fft_peak))
return max(period, 1)
def _block_level_map(
coef_blocks: np.ndarray,
grid_shape: Tuple[int, int],
max_coeffs: int = 15,
include_per_frequency: bool = True,
) -> Dict[str, object]:
"""
Block-level tamper probability map.
Reference: Extract_Features_JPEG.m lines 101-143
For each DCT frequency, computes per-block probability of tampering based on
how well the coefficient matches the global histogram periodicity.
MATLAB code:
P_u=num./denom;
P_t=1./p_final(coeffIndex);
P_tampered(:,:,coeffIndex)=P_t./(P_u+P_t);
P_untampered(:,:,coeffIndex)=P_u./(P_u+P_t);
...
P_tampered_Overall=prod(P_tampered,3)./(prod(P_tampered,3)+prod(P_untampered,3));
"""
h_blocks, w_blocks = grid_shape
if h_blocks == 0 or w_blocks == 0:
return {"map": None, "per_frequency": []}
# Use log-odds for numerical stability when combining across frequencies
log_odds = np.zeros((h_blocks, w_blocks), dtype=np.float64)
per_freq_meta = []
for coeff_idx, (u, v) in enumerate(_ZIG_ZAG[:max_coeffs]):
coeff_matrix = np.round(coef_blocks[:, u, v]).reshape(h_blocks, w_blocks).astype(np.int32)
coeff_list = coeff_matrix.flatten()
if coeff_list.size == 0:
continue
min_hist = int(coeff_list.min()) - 1
max_hist = int(coeff_list.max()) + 1
if max_hist <= min_hist:
continue
# MATLAB: coeffHist=hist(coeffList,minHistValue:maxHistValue)
hist = np.bincount(coeff_list - min_hist, minlength=max_hist - min_hist + 1)
if hist.sum() == 0:
continue
period = _period_from_histogram(hist)
if period <= 1:
# No periodicity detected - neutral probability
tampered = np.full_like(coeff_matrix, 0.5, dtype=np.float64)
else:
# MATLAB lines 103-127: compute per-block probabilities
s0 = int(np.argmax(hist))
adjusted = coeff_matrix - min_hist
period_start = adjusted - ((adjusted - s0) % period)
# Gather histogram counts across one period for denominator
idxs = (period_start[..., None] + np.arange(period)) % hist.size
denom = hist[idxs].sum(axis=-1).astype(np.float64)
num = hist[np.clip(adjusted, 0, hist.size - 1)].astype(np.float64)
# MATLAB: P_u = num./denom; P_t = 1./period
pu = num / np.maximum(denom, 1e-9)
pt = 1.0 / period
# MATLAB: P_tampered = P_t./(P_u+P_t)
tampered = pt / (pu + pt)
tampered = np.clip(tampered, 1e-9, 1 - 1e-9)
# Accumulate log-odds for final sigmoid combination
log_odds += np.log(tampered) - np.log(1.0 - tampered)
if include_per_frequency:
per_freq_meta.append({"idx": coeff_idx + 1, "coord": [u, v], "period": period})
# Convert log-odds back to probability
prob_map = 1.0 / (1.0 + np.exp(-log_odds))
return {"map": prob_map.tolist(), "per_frequency": per_freq_meta}
def analyze_jpeg_compression(input_str: str) -> str:
"""
Analyze JPEG compression artifacts and quantization tables.
Returns format/mode/size plus Sac score (JPEGness indicator).
Uses raw JPEG DCT coefficients when jpegio is available for accuracy.
"""
image_path = input_str.strip()
try:
from PIL import Image
img = Image.open(image_path)
result = {
"tool": "analyze_jpeg_compression",
"status": "completed",
"format": img.format,
"mode": img.mode,
"size": img.size,
"jpeglib_available": _HAS_JPEGLIB,
}
if img.format == "JPEG":
# Try raw coefficient access first (like MATLAB's jpeg_read)
jpeg_data = _read_jpeg_raw(image_path)
if jpeg_data is not None:
# jpeglib already returns blocks in format (n_blocks, 8, 8)
coef_blocks = jpeg_data["Y"]
sac = _sac_score_raw(coef_blocks)
else:
gray = np.array(img.convert("L"), dtype=np.float32)
sac = _sac_score_pixels(gray)
result.update({"is_jpeg": True, "sac_score": sac})
else:
result.update({"is_jpeg": False, "note": f"Image format is {img.format}, not JPEG"})
return json.dumps(result)
except Exception as e: # pragma: no cover - defensive
return json.dumps(
{
"tool": "analyze_jpeg_compression",
"status": "error",
"error": str(e),
}
)
def detect_jpeg_quantization(input_str: str) -> str:
"""
Extract JPEG quantization tables, estimate quality, and optionally compute
block-level tamper probability map.
Uses raw JPEG DCT coefficients when jpegio is available for accuracy.
This matches MATLAB's jpeg_read() behavior for forensic analysis.
Accepts either a plain path string or a JSON payload:
{
"path": "...",
"include": {
"primary_table": true,
"truncation_mask": true,
"block_map": true,
"per_frequency": false
}
}
If omitted, heavy fields (primary table/mask, block map, per-frequency metadata) are skipped.
Output fields:
- quantization_tables: Extracted JPEG quantization tables
- quality_estimates: Estimated JPEG quality from quant tables
- sac_score: JPEGness indicator (higher = stronger JPEG artifacts)
- estimated_primary_quantization: (optional) Factor-histogram based quant estimation
- block_map: (optional) Per-block tamper probability map
- coefficient_source: "raw_coefficients" or "pixel_domain"
"""
image_path, include = _parse_request(input_str)
want_primary_table = bool(include.get("primary_table"))
want_trunc_mask = bool(include.get("truncation_mask"))
want_block_map = bool(include.get("block_map"))
want_per_freq = bool(include.get("per_frequency", True))
try:
from PIL import Image
img = Image.open(image_path)
result = {
"tool": "detect_jpeg_quantization",
"status": "completed",
"format": img.format,
"mode": img.mode,
"size": img.size,
"jpeglib_available": _HAS_JPEGLIB,
}
if img.format != "JPEG":
result["is_jpeg"] = False
result["note"] = f"Image format is {img.format}, not JPEG."
return json.dumps(result)
# Try raw coefficient access (like MATLAB's jpeg_read)
jpeg_data = _read_jpeg_raw(image_path)
use_raw = jpeg_data is not None
# Get quantization tables
if use_raw:
parsed_tables = {}
quality_estimates = {}
for idx, qtable in enumerate(jpeg_data["qt"]):
arr = np.array(qtable, dtype=np.int32)
if arr.size == 64:
arr = arr.reshape(8, 8)
parsed_tables[str(idx)] = arr.tolist()
if idx == 0:
quality_estimates[str(idx)] = _estimate_quality(arr)
else:
qtables = img.quantization or {}
parsed_tables = {}
quality_estimates = {}
for idx, table in qtables.items():
arr = np.array(table, dtype=np.int32).reshape(8, 8)
parsed_tables[str(idx)] = arr.tolist()
if idx == 0:
quality_estimates[str(idx)] = _estimate_quality(arr)
# Get DCT coefficients (Y channel for luminance analysis)
gray = np.array(img.convert("L"), dtype=np.float32)
# Pixel-domain DCTs (bdct) are needed for factor-histogram quant estimation,
# which in the MATLAB reference is performed on decompressed pixels, not on
# already-quantized raw coefficients.
pixel_blocks, pixel_grid = _block_dcts_from_pixels(gray)
if use_raw:
# jpeglib already returns blocks in format (n_blocks, 8, 8)
raw_blocks = jpeg_data["Y"] # Already (n_blocks, 8, 8)
raw_grid_shape = jpeg_data.get("grid_shape")
if raw_grid_shape is None:
# Fallback: calculate from blocks
n_blocks = raw_blocks.shape[0]
# Estimate grid shape (assume roughly square)
h_blocks = int(np.sqrt(n_blocks))
w_blocks = (n_blocks + h_blocks - 1) // h_blocks
raw_grid_shape = (h_blocks, w_blocks)
sac = _sac_score_raw(raw_blocks)
coef_source = "raw_coefficients"
blocks_for_map = raw_blocks
grid_for_map = raw_grid_shape
else:
sac = _sac_score_pixels(gray)
coef_source = "pixel_domain"
blocks_for_map = pixel_blocks
grid_for_map = pixel_grid
# Primary estimation always uses pixel-domain bdct per MATLAB fh_jpgstep.m
blocks_for_primary = pixel_blocks
grid_for_primary = pixel_grid
# Primary table estimation (use pixel-domain DCTs per MATLAB fh_jpgstep.m)
if want_primary_table or want_trunc_mask:
primary_q = _estimate_primary_qtable_raw(blocks_for_primary, grid_for_primary, gray=gray)
primary_out = {}
if want_primary_table:
primary_out["table"] = primary_q.get("table")
if want_trunc_mask:
primary_out["mask"] = primary_q.get("mask")
if primary_out:
result["estimated_primary_quantization"] = primary_out
# Block map
if want_block_map:
block_map = _block_level_map(blocks_for_map, grid_for_map, include_per_frequency=want_per_freq)
else:
block_map = None
result.update(
{
"is_jpeg": True,
"quantization_tables": parsed_tables,
"quality_estimates": quality_estimates,
"sac_score": sac,
"coefficient_source": coef_source,
**({"block_map": block_map} if want_block_map else {}),
}
)
return json.dumps(result)
except Exception as e: # pragma: no cover - defensive
return json.dumps(
{
"tool": "detect_jpeg_quantization",
"status": "error",
"error": str(e),
}
)
__all__ = ["analyze_jpeg_compression", "detect_jpeg_quantization"]