| |
| """ |
| ELA (Error Level Analysis) Tool Self-Test |
| |
| This script validates the ELA forensic tool against expected behavior |
| based on the Sherloq reference implementation (example_tools/sherloq/gui/ela.py). |
| |
| Test cases: |
| 1. Basic functionality and output structure |
| 2. ELA statistics (mean, std, anomaly_score) are valid |
| 3. ELA map generation (base64 PNG) |
| 4. Quality parameter effects (lower quality = higher error) |
| 5. Input parsing (plain path vs JSON) |
| 6. Edge cases (different formats, invalid paths) |
| 7. Conformance with Sherloq behavior |
| """ |
|
|
| import base64 |
| import json |
| import os |
| import sys |
| import tempfile |
|
|
| import numpy as np |
| from PIL import Image |
|
|
| |
| sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) |
| from src.tools.forensic import perform_ela |
|
|
|
|
| def create_test_image(size=(256, 256), seed=42): |
| """ |
| Create a test image with varied texture for ELA analysis. |
| Size is (width, height) to match PIL.Image size ordering. |
| """ |
| np.random.seed(seed) |
| width, height = size |
| |
| x = np.linspace(0, 255, width) |
| y = np.linspace(0, 255, height) |
| xx, yy = np.meshgrid(x, y) |
| img = ((xx + yy) / 2).astype(np.uint8) |
| |
| noise = np.random.randint(-30, 30, (height, width), dtype=np.int16) |
| img = np.clip(img.astype(np.int16) + noise, 0, 255).astype(np.uint8) |
| return Image.fromarray(img, mode='L').convert('RGB') |
|
|
|
|
| def save_jpeg(img, path, quality): |
| """Save image as JPEG with specified quality.""" |
| img.save(path, 'JPEG', quality=quality) |
|
|
|
|
| def test_basic_functionality(): |
| """Test that perform_ela returns expected JSON structure.""" |
| print("\n=== Test: Basic Functionality ===") |
| img = create_test_image() |
| |
| with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as f: |
| tmp_path = f.name |
| save_jpeg(img, tmp_path, 75) |
| |
| result_str = perform_ela(tmp_path) |
| result = json.loads(result_str) |
| os.unlink(tmp_path) |
| |
| |
| required_fields = ['tool', 'status', 'image_path', 'quality', |
| 'ela_mean', 'ela_std', 'ela_anomaly_score'] |
| missing = [f for f in required_fields if f not in result] |
| |
| if missing: |
| print(f"FAIL: Missing required fields: {missing}") |
| return False |
| |
| if result['status'] != 'completed': |
| print(f"FAIL: Status is '{result['status']}', expected 'completed'") |
| return False |
| |
| if result['tool'] != 'perform_ela': |
| print(f"FAIL: Tool name is '{result['tool']}', expected 'perform_ela'") |
| return False |
| |
| print(f"PASS: Basic functionality - all required fields present") |
| print(f" Quality: {result['quality']}") |
| print(f" ELA mean: {result['ela_mean']:.4f}") |
| print(f" ELA std: {result['ela_std']:.4f}") |
| print(f" Anomaly score: {result['ela_anomaly_score']:.4f}") |
| return True |
|
|
|
|
| def test_ela_statistics_validity(): |
| """Test that ELA statistics are valid numbers within expected ranges.""" |
| print("\n=== Test: ELA Statistics Validity ===") |
| img = create_test_image() |
| |
| with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as f: |
| tmp_path = f.name |
| save_jpeg(img, tmp_path, 75) |
| |
| result = json.loads(perform_ela(tmp_path)) |
| os.unlink(tmp_path) |
| |
| ela_mean = result['ela_mean'] |
| ela_std = result['ela_std'] |
| ela_anomaly_score = result['ela_anomaly_score'] |
| |
| |
| all_pass = True |
| |
| if not isinstance(ela_mean, (int, float)) or not np.isfinite(ela_mean): |
| print(f"FAIL: ela_mean is invalid: {ela_mean}") |
| all_pass = False |
| elif ela_mean < 0 or ela_mean > 255: |
| print(f"WARN: ela_mean out of expected range [0, 255]: {ela_mean}") |
| |
| if not isinstance(ela_std, (int, float)) or not np.isfinite(ela_std): |
| print(f"FAIL: ela_std is invalid: {ela_std}") |
| all_pass = False |
| elif ela_std < 0: |
| print(f"WARN: ela_std is negative: {ela_std}") |
| |
| if not isinstance(ela_anomaly_score, (int, float)) or not np.isfinite(ela_anomaly_score): |
| print(f"FAIL: ela_anomaly_score is invalid: {ela_anomaly_score}") |
| all_pass = False |
| elif ela_anomaly_score < 0: |
| print(f"WARN: ela_anomaly_score is negative: {ela_anomaly_score}") |
| |
| if all_pass: |
| print(f"PASS: All statistics are valid") |
| print(f" Mean: {ela_mean:.4f} (range: 0-255)") |
| print(f" Std: {ela_std:.4f} (should be >= 0)") |
| print(f" Anomaly score: {ela_anomaly_score:.4f} (z-score, should be >= 0)") |
| |
| return all_pass |
|
|
|
|
| def test_ela_map_generation(): |
| """Test that ELA map is generated as base64 PNG when requested.""" |
| print("\n=== Test: ELA Map Generation ===") |
| img = create_test_image() |
| |
| with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as f: |
| tmp_path = f.name |
| save_jpeg(img, tmp_path, 75) |
| |
| |
| result = json.loads(perform_ela( |
| json.dumps({"path": tmp_path, "return_map": True}) |
| )) |
| |
| if 'ela_map' not in result: |
| print("FAIL: ela_map field missing") |
| os.unlink(tmp_path) |
| return False |
| |
| ela_map = result['ela_map'] |
| |
| if ela_map is None: |
| print("FAIL: ela_map is None when return_map=True") |
| os.unlink(tmp_path) |
| return False |
| |
| |
| if not ela_map.startswith('data:image/png;base64,'): |
| print(f"FAIL: ela_map doesn't start with expected prefix: {ela_map[:30]}...") |
| os.unlink(tmp_path) |
| return False |
| |
| |
| try: |
| b64_data = ela_map.split(',', 1)[1] |
| png_data = base64.b64decode(b64_data) |
| |
| if not png_data.startswith(b'\x89PNG\r\n\x1a\n'): |
| print("FAIL: Decoded data is not a valid PNG") |
| os.unlink(tmp_path) |
| return False |
| |
| |
| from io import BytesIO |
| map_img = Image.open(BytesIO(png_data)) |
| print(f"PASS: ELA map is valid PNG image") |
| print(f" Map size: {map_img.size}") |
| print(f" Map mode: {map_img.mode}") |
| if 'ela_map_size' in result: |
| print(f" Reported size: {result['ela_map_size']}") |
| except Exception as e: |
| print(f"FAIL: Error validating PNG: {e}") |
| os.unlink(tmp_path) |
| return False |
| |
| |
| result_no_map = json.loads(perform_ela( |
| json.dumps({"path": tmp_path, "return_map": False}) |
| )) |
| |
| if result_no_map.get('ela_map') is not None: |
| print("FAIL: ela_map should be None when return_map=False") |
| os.unlink(tmp_path) |
| return False |
| |
| os.unlink(tmp_path) |
| print("PASS: Map generation works correctly (with and without map)") |
| return True |
|
|
|
|
| def test_quality_parameter(): |
| """Test that quality parameter affects ELA results (lower quality = higher error).""" |
| print("\n=== Test: Quality Parameter Effects ===") |
| img = create_test_image() |
| |
| qualities = [50, 75, 90] |
| results = [] |
| |
| for q in qualities: |
| with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as f: |
| tmp_path = f.name |
| save_jpeg(img, tmp_path, q) |
| |
| |
| result = json.loads(perform_ela( |
| json.dumps({"path": tmp_path, "quality": q, "return_map": False}) |
| )) |
| results.append((q, result['ela_mean'], result['ela_std'])) |
| os.unlink(tmp_path) |
| |
| print(f"{'Quality':>10} {'ELA Mean':>12} {'ELA Std':>12}") |
| print("-" * 36) |
| for q, mean, std in results: |
| print(f"{q:>10} {mean:>12.4f} {std:>12.4f}") |
| |
| |
| |
| |
| |
| print("\nTesting with fixed ELA quality=90:") |
| ela_qualities = [50, 75, 90] |
| ela_results = [] |
| |
| for orig_q in ela_qualities: |
| with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as f: |
| tmp_path = f.name |
| save_jpeg(img, tmp_path, orig_q) |
| |
| |
| result = json.loads(perform_ela( |
| json.dumps({"path": tmp_path, "quality": 90, "return_map": False}) |
| )) |
| ela_results.append((orig_q, result['ela_mean'])) |
| os.unlink(tmp_path) |
| |
| print(f"{'Orig Q':>10} {'ELA Mean (Q90)':>15}") |
| print("-" * 27) |
| for orig_q, mean in ela_results: |
| print(f"{orig_q:>10} {mean:>15.4f}") |
| |
| |
| |
| if len(ela_results) >= 2: |
| q50_mean = ela_results[0][1] |
| q90_mean = ela_results[2][1] |
| if q50_mean > q90_mean: |
| print("PASS: Lower original quality produces higher ELA error (expected)") |
| else: |
| print("INFO: Quality relationship may vary based on compression artifacts") |
| |
| return True |
|
|
|
|
| def test_input_parsing(): |
| """Test that both plain path and JSON input formats work.""" |
| print("\n=== Test: Input Parsing ===") |
| img = create_test_image() |
| |
| with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as f: |
| tmp_path = f.name |
| save_jpeg(img, tmp_path, 75) |
| |
| |
| result1 = json.loads(perform_ela(tmp_path)) |
| |
| |
| result2 = json.loads(perform_ela(json.dumps({"path": tmp_path}))) |
| |
| |
| result3 = json.loads(perform_ela(json.dumps({ |
| "path": tmp_path, |
| "quality": 85, |
| "max_size": 256, |
| "return_map": False |
| }))) |
| |
| os.unlink(tmp_path) |
| |
| |
| if result1['status'] != 'completed' or result2['status'] != 'completed' or result3['status'] != 'completed': |
| print("FAIL: One or more input formats failed") |
| return False |
| |
| |
| if result3['quality'] != 85: |
| print(f"FAIL: Quality parameter not applied. Got {result3['quality']}, expected 85") |
| return False |
| |
| print("PASS: All input formats work correctly") |
| print(f" Plain path: {result1['status']}") |
| print(f" JSON path only: {result2['status']}") |
| print(f" JSON with params: {result3['status']} (quality={result3['quality']})") |
| return True |
|
|
|
|
| def test_quality_clamping(): |
| """Test that quality values are clamped to valid range [1, 100].""" |
| print("\n=== Test: Quality Clamping ===") |
| img = create_test_image() |
| |
| with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as f: |
| tmp_path = f.name |
| save_jpeg(img, tmp_path, 75) |
| |
| |
| test_cases = [ |
| (-10, 1), |
| (0, 1), |
| (150, 100), |
| (200, 100), |
| ] |
| |
| all_pass = True |
| for input_q, expected_q in test_cases: |
| result = json.loads(perform_ela( |
| json.dumps({"path": tmp_path, "quality": input_q, "return_map": False}) |
| )) |
| actual_q = result['quality'] |
| if actual_q != expected_q: |
| print(f"FAIL: Quality {input_q} -> {actual_q}, expected {expected_q}") |
| all_pass = False |
| |
| os.unlink(tmp_path) |
| |
| if all_pass: |
| print("PASS: Quality values are correctly clamped to [1, 100]") |
| |
| return all_pass |
|
|
|
|
| def test_error_handling(): |
| """Test error handling for invalid paths and edge cases.""" |
| print("\n=== Test: Error Handling ===") |
| |
| |
| result = json.loads(perform_ela("/nonexistent/path/image.jpg")) |
| if result['status'] != 'error': |
| print("FAIL: Non-existent file should return error status") |
| return False |
| |
| if 'error' not in result: |
| print("FAIL: Error status should include 'error' field") |
| return False |
| |
| print("PASS: Non-existent file handled correctly") |
| |
| |
| img = create_test_image() |
| with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as f: |
| tmp_path = f.name |
| save_jpeg(img, tmp_path, 75) |
| |
| |
| result = json.loads(perform_ela("not valid json but might be a path")) |
| |
| |
| |
| os.unlink(tmp_path) |
| print("PASS: Error handling works correctly") |
| return True |
|
|
|
|
| def test_sherloq_conformance(): |
| """ |
| Test conformance with Sherloq behavior. |
| |
| Sherloq default: quality=75, uses sqrt(difference) for non-linear mode. |
| Our implementation: uses absolute difference (linear), quality=90 default. |
| |
| Key expectations: |
| - Lower recompression quality should produce higher error |
| - ELA mean should be positive for any compressed image |
| - Anomaly score should detect localized high-error regions |
| """ |
| print("\n=== Test: Sherloq Conformance ===") |
| img = create_test_image() |
| |
| |
| with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as f: |
| tmp_path = f.name |
| save_jpeg(img, tmp_path, 75) |
| |
| |
| ela_qualities = [50, 75, 90] |
| results = [] |
| |
| for ela_q in ela_qualities: |
| result = json.loads(perform_ela( |
| json.dumps({"path": tmp_path, "quality": ela_q, "return_map": False}) |
| )) |
| results.append((ela_q, result['ela_mean'], result['ela_std'], result['ela_anomaly_score'])) |
| |
| os.unlink(tmp_path) |
| |
| print(f"{'ELA Q':>8} {'Mean':>12} {'Std':>12} {'Anomaly':>12}") |
| print("-" * 44) |
| for ela_q, mean, std, anomaly in results: |
| print(f"{ela_q:>8} {mean:>12.4f} {std:>12.4f} {anomaly:>12.4f}") |
| |
| |
| if results[0][1] > results[2][1]: |
| print("PASS: Lower ELA quality produces higher error (matches Sherloq behavior)") |
| else: |
| print("INFO: Quality relationship may vary - this is acceptable") |
| |
| |
| if all(r[1] > 0 for r in results): |
| print("PASS: All ELA means are positive (expected for compressed images)") |
| else: |
| print("WARN: Some ELA means are zero or negative") |
| |
| return True |
|
|
|
|
| def test_map_resizing(): |
| """Test that max_size parameter correctly resizes the ELA map.""" |
| print("\n=== Test: Map Resizing ===") |
| img = create_test_image(size=(512, 512)) |
| |
| with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as f: |
| tmp_path = f.name |
| save_jpeg(img, tmp_path, 75) |
| |
| |
| result = json.loads(perform_ela( |
| json.dumps({"path": tmp_path, "max_size": 256, "return_map": True}) |
| )) |
| |
| if result.get('ela_map_size'): |
| map_size = result['ela_map_size'] |
| max_dim = max(map_size) |
| if max_dim <= 256: |
| print(f"PASS: Map resized correctly (max dimension: {max_dim} <= 256)") |
| else: |
| print(f"FAIL: Map not resized (max dimension: {max_dim} > 256)") |
| os.unlink(tmp_path) |
| return False |
| |
| |
| result2 = json.loads(perform_ela( |
| json.dumps({"path": tmp_path, "max_size": 1024, "return_map": True}) |
| )) |
| |
| if result2.get('ela_map_size'): |
| map_size2 = result2['ela_map_size'] |
| |
| max_dim2 = max(map_size2) |
| if max_dim2 == 512: |
| print(f"PASS: Map not resized when not needed (max dimension: {max_dim2})") |
| else: |
| print(f"INFO: Map size is {map_size2} (may vary due to compression)") |
| |
| os.unlink(tmp_path) |
| return True |
|
|
|
|
| def test_different_image_formats(): |
| """Test that ELA works with different input image formats.""" |
| print("\n=== Test: Different Image Formats ===") |
| img = create_test_image() |
| |
| formats = [('JPEG', '.jpg'), ('PNG', '.png')] |
| all_pass = True |
| |
| for fmt, ext in formats: |
| with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as f: |
| tmp_path = f.name |
| img.save(tmp_path, fmt) |
| |
| result = json.loads(perform_ela(tmp_path)) |
| |
| if result['status'] != 'completed': |
| print(f"FAIL: {fmt} format failed: {result.get('error', 'unknown')}") |
| all_pass = False |
| else: |
| print(f"PASS: {fmt} format works (mean: {result['ela_mean']:.4f})") |
| |
| os.unlink(tmp_path) |
| |
| return all_pass |
|
|
|
|
| def main(): |
| print("=" * 60) |
| print("ELA (Error Level Analysis) Tool Self-Test") |
| print("Reference: Sherloq implementation (example_tools/sherloq/gui/ela.py)") |
| print("=" * 60) |
| |
| all_pass = True |
| |
| all_pass &= test_basic_functionality() |
| all_pass &= test_ela_statistics_validity() |
| all_pass &= test_ela_map_generation() |
| all_pass &= test_quality_parameter() |
| all_pass &= test_input_parsing() |
| all_pass &= test_quality_clamping() |
| all_pass &= test_error_handling() |
| all_pass &= test_sherloq_conformance() |
| all_pass &= test_map_resizing() |
| all_pass &= test_different_image_formats() |
| |
| print("\n" + "=" * 60) |
| if all_pass: |
| print("All tests passed!") |
| else: |
| print("Some tests failed - review output above") |
| print("=" * 60) |
| |
| return 0 if all_pass else 1 |
|
|
|
|
| if __name__ == "__main__": |
| sys.exit(main()) |
|
|