df2 / ela_selftest.py
Mustafa Akcanca
Add trufor and ela
a5cb443
#!/usr/bin/env python3
"""
ELA (Error Level Analysis) Tool Self-Test
This script validates the ELA forensic tool against expected behavior
based on the Sherloq reference implementation (example_tools/sherloq/gui/ela.py).
Test cases:
1. Basic functionality and output structure
2. ELA statistics (mean, std, anomaly_score) are valid
3. ELA map generation (base64 PNG)
4. Quality parameter effects (lower quality = higher error)
5. Input parsing (plain path vs JSON)
6. Edge cases (different formats, invalid paths)
7. Conformance with Sherloq behavior
"""
import base64
import json
import os
import sys
import tempfile
import numpy as np
from PIL import Image
# Add parent to path for imports
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from src.tools.forensic import perform_ela
def create_test_image(size=(256, 256), seed=42):
"""
Create a test image with varied texture for ELA analysis.
Size is (width, height) to match PIL.Image size ordering.
"""
np.random.seed(seed)
width, height = size
# Create gradient + noise for texture
x = np.linspace(0, 255, width)
y = np.linspace(0, 255, height)
xx, yy = np.meshgrid(x, y) # shape: (height, width)
img = ((xx + yy) / 2).astype(np.uint8)
# Add some noise for texture
noise = np.random.randint(-30, 30, (height, width), dtype=np.int16)
img = np.clip(img.astype(np.int16) + noise, 0, 255).astype(np.uint8)
return Image.fromarray(img, mode='L').convert('RGB')
def save_jpeg(img, path, quality):
"""Save image as JPEG with specified quality."""
img.save(path, 'JPEG', quality=quality)
def test_basic_functionality():
"""Test that perform_ela returns expected JSON structure."""
print("\n=== Test: Basic Functionality ===")
img = create_test_image()
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as f:
tmp_path = f.name
save_jpeg(img, tmp_path, 75)
result_str = perform_ela(tmp_path)
result = json.loads(result_str)
os.unlink(tmp_path)
# Check required fields
required_fields = ['tool', 'status', 'image_path', 'quality',
'ela_mean', 'ela_std', 'ela_anomaly_score']
missing = [f for f in required_fields if f not in result]
if missing:
print(f"FAIL: Missing required fields: {missing}")
return False
if result['status'] != 'completed':
print(f"FAIL: Status is '{result['status']}', expected 'completed'")
return False
if result['tool'] != 'perform_ela':
print(f"FAIL: Tool name is '{result['tool']}', expected 'perform_ela'")
return False
print(f"PASS: Basic functionality - all required fields present")
print(f" Quality: {result['quality']}")
print(f" ELA mean: {result['ela_mean']:.4f}")
print(f" ELA std: {result['ela_std']:.4f}")
print(f" Anomaly score: {result['ela_anomaly_score']:.4f}")
return True
def test_ela_statistics_validity():
"""Test that ELA statistics are valid numbers within expected ranges."""
print("\n=== Test: ELA Statistics Validity ===")
img = create_test_image()
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as f:
tmp_path = f.name
save_jpeg(img, tmp_path, 75)
result = json.loads(perform_ela(tmp_path))
os.unlink(tmp_path)
ela_mean = result['ela_mean']
ela_std = result['ela_std']
ela_anomaly_score = result['ela_anomaly_score']
# Check types and ranges
all_pass = True
if not isinstance(ela_mean, (int, float)) or not np.isfinite(ela_mean):
print(f"FAIL: ela_mean is invalid: {ela_mean}")
all_pass = False
elif ela_mean < 0 or ela_mean > 255:
print(f"WARN: ela_mean out of expected range [0, 255]: {ela_mean}")
if not isinstance(ela_std, (int, float)) or not np.isfinite(ela_std):
print(f"FAIL: ela_std is invalid: {ela_std}")
all_pass = False
elif ela_std < 0:
print(f"WARN: ela_std is negative: {ela_std}")
if not isinstance(ela_anomaly_score, (int, float)) or not np.isfinite(ela_anomaly_score):
print(f"FAIL: ela_anomaly_score is invalid: {ela_anomaly_score}")
all_pass = False
elif ela_anomaly_score < 0:
print(f"WARN: ela_anomaly_score is negative: {ela_anomaly_score}")
if all_pass:
print(f"PASS: All statistics are valid")
print(f" Mean: {ela_mean:.4f} (range: 0-255)")
print(f" Std: {ela_std:.4f} (should be >= 0)")
print(f" Anomaly score: {ela_anomaly_score:.4f} (z-score, should be >= 0)")
return all_pass
def test_ela_map_generation():
"""Test that ELA map is generated as base64 PNG when requested."""
print("\n=== Test: ELA Map Generation ===")
img = create_test_image()
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as f:
tmp_path = f.name
save_jpeg(img, tmp_path, 75)
# Test with map requested
result = json.loads(perform_ela(
json.dumps({"path": tmp_path, "return_map": True})
))
if 'ela_map' not in result:
print("FAIL: ela_map field missing")
os.unlink(tmp_path)
return False
ela_map = result['ela_map']
if ela_map is None:
print("FAIL: ela_map is None when return_map=True")
os.unlink(tmp_path)
return False
# Check base64 data URL format
if not ela_map.startswith('data:image/png;base64,'):
print(f"FAIL: ela_map doesn't start with expected prefix: {ela_map[:30]}...")
os.unlink(tmp_path)
return False
# Decode and verify it's a valid PNG
try:
b64_data = ela_map.split(',', 1)[1]
png_data = base64.b64decode(b64_data)
# Check PNG signature
if not png_data.startswith(b'\x89PNG\r\n\x1a\n'):
print("FAIL: Decoded data is not a valid PNG")
os.unlink(tmp_path)
return False
# Try to load as PIL Image
from io import BytesIO
map_img = Image.open(BytesIO(png_data))
print(f"PASS: ELA map is valid PNG image")
print(f" Map size: {map_img.size}")
print(f" Map mode: {map_img.mode}")
if 'ela_map_size' in result:
print(f" Reported size: {result['ela_map_size']}")
except Exception as e:
print(f"FAIL: Error validating PNG: {e}")
os.unlink(tmp_path)
return False
# Test without map
result_no_map = json.loads(perform_ela(
json.dumps({"path": tmp_path, "return_map": False})
))
if result_no_map.get('ela_map') is not None:
print("FAIL: ela_map should be None when return_map=False")
os.unlink(tmp_path)
return False
os.unlink(tmp_path)
print("PASS: Map generation works correctly (with and without map)")
return True
def test_quality_parameter():
"""Test that quality parameter affects ELA results (lower quality = higher error)."""
print("\n=== Test: Quality Parameter Effects ===")
img = create_test_image()
qualities = [50, 75, 90]
results = []
for q in qualities:
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as f:
tmp_path = f.name
save_jpeg(img, tmp_path, q)
# Use the same recompression quality for ELA
result = json.loads(perform_ela(
json.dumps({"path": tmp_path, "quality": q, "return_map": False})
))
results.append((q, result['ela_mean'], result['ela_std']))
os.unlink(tmp_path)
print(f"{'Quality':>10} {'ELA Mean':>12} {'ELA Std':>12}")
print("-" * 36)
for q, mean, std in results:
print(f"{q:>10} {mean:>12.4f} {std:>12.4f}")
# Lower quality should generally produce higher error (but this depends on
# the original compression quality - if original is Q75, recompressing at Q50
# will show more error than recompressing at Q90)
# For a more reliable test, use a fixed ELA quality and vary original quality
print("\nTesting with fixed ELA quality=90:")
ela_qualities = [50, 75, 90]
ela_results = []
for orig_q in ela_qualities:
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as f:
tmp_path = f.name
save_jpeg(img, tmp_path, orig_q)
# Use fixed ELA quality
result = json.loads(perform_ela(
json.dumps({"path": tmp_path, "quality": 90, "return_map": False})
))
ela_results.append((orig_q, result['ela_mean']))
os.unlink(tmp_path)
print(f"{'Orig Q':>10} {'ELA Mean (Q90)':>15}")
print("-" * 27)
for orig_q, mean in ela_results:
print(f"{orig_q:>10} {mean:>15.4f}")
# Images compressed at lower quality should show MORE error when recompressed
# at fixed quality (they're further from the recompression quality)
if len(ela_results) >= 2:
q50_mean = ela_results[0][1]
q90_mean = ela_results[2][1]
if q50_mean > q90_mean:
print("PASS: Lower original quality produces higher ELA error (expected)")
else:
print("INFO: Quality relationship may vary based on compression artifacts")
return True
def test_input_parsing():
"""Test that both plain path and JSON input formats work."""
print("\n=== Test: Input Parsing ===")
img = create_test_image()
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as f:
tmp_path = f.name
save_jpeg(img, tmp_path, 75)
# Test plain path
result1 = json.loads(perform_ela(tmp_path))
# Test JSON with path only
result2 = json.loads(perform_ela(json.dumps({"path": tmp_path})))
# Test JSON with all parameters
result3 = json.loads(perform_ela(json.dumps({
"path": tmp_path,
"quality": 85,
"max_size": 256,
"return_map": False
})))
os.unlink(tmp_path)
# All should succeed
if result1['status'] != 'completed' or result2['status'] != 'completed' or result3['status'] != 'completed':
print("FAIL: One or more input formats failed")
return False
# Check that quality parameter was applied
if result3['quality'] != 85:
print(f"FAIL: Quality parameter not applied. Got {result3['quality']}, expected 85")
return False
print("PASS: All input formats work correctly")
print(f" Plain path: {result1['status']}")
print(f" JSON path only: {result2['status']}")
print(f" JSON with params: {result3['status']} (quality={result3['quality']})")
return True
def test_quality_clamping():
"""Test that quality values are clamped to valid range [1, 100]."""
print("\n=== Test: Quality Clamping ===")
img = create_test_image()
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as f:
tmp_path = f.name
save_jpeg(img, tmp_path, 75)
# Test out-of-range qualities
test_cases = [
(-10, 1), # Below minimum -> clamped to 1
(0, 1), # Below minimum -> clamped to 1
(150, 100), # Above maximum -> clamped to 100
(200, 100), # Above maximum -> clamped to 100
]
all_pass = True
for input_q, expected_q in test_cases:
result = json.loads(perform_ela(
json.dumps({"path": tmp_path, "quality": input_q, "return_map": False})
))
actual_q = result['quality']
if actual_q != expected_q:
print(f"FAIL: Quality {input_q} -> {actual_q}, expected {expected_q}")
all_pass = False
os.unlink(tmp_path)
if all_pass:
print("PASS: Quality values are correctly clamped to [1, 100]")
return all_pass
def test_error_handling():
"""Test error handling for invalid paths and edge cases."""
print("\n=== Test: Error Handling ===")
# Test non-existent file
result = json.loads(perform_ela("/nonexistent/path/image.jpg"))
if result['status'] != 'error':
print("FAIL: Non-existent file should return error status")
return False
if 'error' not in result:
print("FAIL: Error status should include 'error' field")
return False
print("PASS: Non-existent file handled correctly")
# Test with valid file but invalid JSON
img = create_test_image()
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as f:
tmp_path = f.name
save_jpeg(img, tmp_path, 75)
# Invalid JSON should fall back to treating as plain path
result = json.loads(perform_ela("not valid json but might be a path"))
# This might succeed if it's treated as a path, or fail if path doesn't exist
# Either way is acceptable - the function should not crash
os.unlink(tmp_path)
print("PASS: Error handling works correctly")
return True
def test_sherloq_conformance():
"""
Test conformance with Sherloq behavior.
Sherloq default: quality=75, uses sqrt(difference) for non-linear mode.
Our implementation: uses absolute difference (linear), quality=90 default.
Key expectations:
- Lower recompression quality should produce higher error
- ELA mean should be positive for any compressed image
- Anomaly score should detect localized high-error regions
"""
print("\n=== Test: Sherloq Conformance ===")
img = create_test_image()
# Test with Sherloq's default quality (75)
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as f:
tmp_path = f.name
save_jpeg(img, tmp_path, 75)
# Test different ELA qualities
ela_qualities = [50, 75, 90]
results = []
for ela_q in ela_qualities:
result = json.loads(perform_ela(
json.dumps({"path": tmp_path, "quality": ela_q, "return_map": False})
))
results.append((ela_q, result['ela_mean'], result['ela_std'], result['ela_anomaly_score']))
os.unlink(tmp_path)
print(f"{'ELA Q':>8} {'Mean':>12} {'Std':>12} {'Anomaly':>12}")
print("-" * 44)
for ela_q, mean, std, anomaly in results:
print(f"{ela_q:>8} {mean:>12.4f} {std:>12.4f} {anomaly:>12.4f}")
# Lower ELA quality should produce higher error (more compression artifacts)
if results[0][1] > results[2][1]: # Q50 mean > Q90 mean
print("PASS: Lower ELA quality produces higher error (matches Sherloq behavior)")
else:
print("INFO: Quality relationship may vary - this is acceptable")
# All means should be positive
if all(r[1] > 0 for r in results):
print("PASS: All ELA means are positive (expected for compressed images)")
else:
print("WARN: Some ELA means are zero or negative")
return True
def test_map_resizing():
"""Test that max_size parameter correctly resizes the ELA map."""
print("\n=== Test: Map Resizing ===")
img = create_test_image(size=(512, 512)) # Larger image
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as f:
tmp_path = f.name
save_jpeg(img, tmp_path, 75)
# Test with max_size=256 (should resize)
result = json.loads(perform_ela(
json.dumps({"path": tmp_path, "max_size": 256, "return_map": True})
))
if result.get('ela_map_size'):
map_size = result['ela_map_size']
max_dim = max(map_size)
if max_dim <= 256:
print(f"PASS: Map resized correctly (max dimension: {max_dim} <= 256)")
else:
print(f"FAIL: Map not resized (max dimension: {max_dim} > 256)")
os.unlink(tmp_path)
return False
# Test with max_size=1024 (should not resize)
result2 = json.loads(perform_ela(
json.dumps({"path": tmp_path, "max_size": 1024, "return_map": True})
))
if result2.get('ela_map_size'):
map_size2 = result2['ela_map_size']
# Original is 512x512, so max should be 512 (no resize needed)
max_dim2 = max(map_size2)
if max_dim2 == 512:
print(f"PASS: Map not resized when not needed (max dimension: {max_dim2})")
else:
print(f"INFO: Map size is {map_size2} (may vary due to compression)")
os.unlink(tmp_path)
return True
def test_different_image_formats():
"""Test that ELA works with different input image formats."""
print("\n=== Test: Different Image Formats ===")
img = create_test_image()
formats = [('JPEG', '.jpg'), ('PNG', '.png')]
all_pass = True
for fmt, ext in formats:
with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as f:
tmp_path = f.name
img.save(tmp_path, fmt)
result = json.loads(perform_ela(tmp_path))
if result['status'] != 'completed':
print(f"FAIL: {fmt} format failed: {result.get('error', 'unknown')}")
all_pass = False
else:
print(f"PASS: {fmt} format works (mean: {result['ela_mean']:.4f})")
os.unlink(tmp_path)
return all_pass
def main():
print("=" * 60)
print("ELA (Error Level Analysis) Tool Self-Test")
print("Reference: Sherloq implementation (example_tools/sherloq/gui/ela.py)")
print("=" * 60)
all_pass = True
all_pass &= test_basic_functionality()
all_pass &= test_ela_statistics_validity()
all_pass &= test_ela_map_generation()
all_pass &= test_quality_parameter()
all_pass &= test_input_parsing()
all_pass &= test_quality_clamping()
all_pass &= test_error_handling()
all_pass &= test_sherloq_conformance()
all_pass &= test_map_resizing()
all_pass &= test_different_image_formats()
print("\n" + "=" * 60)
if all_pass:
print("All tests passed!")
else:
print("Some tests failed - review output above")
print("=" * 60)
return 0 if all_pass else 1
if __name__ == "__main__":
sys.exit(main())