| | import gradio as gr |
| | import numpy as np |
| | import cv2 |
| | from PIL import Image, ImageChops, ImageEnhance |
| | import io |
| | import os |
| | import random |
| | import matplotlib.pyplot as plt |
| | from matplotlib.colors import LinearSegmentedColormap |
| | import tempfile |
| | import json |
| | import base64 |
| | from sklearn.metrics.pairwise import cosine_similarity |
| | import shutil |
| | from typing import Dict, Any |
| | from scipy.spatial import cKDTree |
| | from multiprocessing import Pool, cpu_count |
| | import nest_asyncio |
| |
|
| | |
| | nest_asyncio.apply() |
| |
|
| | |
| | TEMP_DIR = tempfile.mkdtemp() |
| | print(f"Using temporary directory: {TEMP_DIR}") |
| |
|
| | |
| | |
| | |
| |
|
| | def save_pil_image(img, path): |
| | """Save a PIL image and return the path""" |
| | img.save(path) |
| | return path |
| |
|
| | def pil_to_base64(img): |
| | """Convert PIL image to base64 string for JSON response""" |
| | buffered = io.BytesIO() |
| | img.save(buffered, format="PNG") |
| | return base64.b64encode(buffered.getvalue()).decode('utf-8') |
| |
|
| | def base64_to_pil(base64_str): |
| | """Convert base64 string to PIL image""" |
| | img_data = base64.b64decode(base64_str) |
| | return Image.open(io.BytesIO(img_data)) |
| |
|
| | |
| | |
| | |
| |
|
| | |
| | def find_matches(args): |
| | """ |
| | Find matching blocks within the given indices. |
| | |
| | Args: |
| | args: A tuple containing (block_indices, blocks, tree, similarity_threshold) |
| | |
| | Returns: |
| | A set of matching block pairs |
| | """ |
| | block_indices, blocks, tree, similarity_threshold = args |
| | local_matches = set() |
| | for i in block_indices: |
| | |
| | distances, indices = tree.query(blocks[i], k=10, distance_upper_bound=similarity_threshold) |
| | for j, dist in zip(indices, distances): |
| | |
| | if j != i and j < len(blocks) and dist <= similarity_threshold: |
| | |
| | local_matches.add(tuple(sorted([i, j]))) |
| | return local_matches |
| |
|
| |
|
| | def detect_clones(image_path, max_dimension=2000): |
| | """ |
| | Detects cloned/copy-pasted regions in the image with optimized performance. |
| | |
| | Args: |
| | image_path: Path to the image file |
| | max_dimension: Maximum dimension to resize large images to |
| | |
| | Returns: |
| | PIL Image containing the clone detection result and count of clones |
| | """ |
| | |
| | img = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE) |
| | if img is None: |
| | raise ValueError(f"Could not read image at {image_path}") |
| | |
| | height, width = img.shape |
| | |
| | |
| | scale = 1.0 |
| | if height > max_dimension or width > max_dimension: |
| | scale = max_dimension / max(height, width) |
| | new_height, new_width = int(height * scale), int(width * scale) |
| | img = cv2.resize(img, (new_width, new_height)) |
| | height, width = img.shape |
| | |
| | |
| | clone_img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR) |
| | |
| | |
| | block_size = 16 |
| | stride = 8 |
| | |
| | |
| | if (height * width) > 4000000: |
| | stride = 16 |
| | |
| | |
| | blocks = [] |
| | positions = [] |
| | |
| | |
| | for y in range(0, height - block_size, stride): |
| | for x in range(0, width - block_size, stride): |
| | block = img[y:y+block_size, x:x+block_size].astype(np.float32) |
| | |
| | dct = cv2.dct(block) |
| | feature = dct[:4, :4].flatten() |
| | blocks.append(feature) |
| | positions.append((x, y)) |
| | |
| | |
| | blocks = np.array(blocks, dtype=np.float32) |
| | |
| | |
| | norms = np.linalg.norm(blocks, axis=1) |
| | norms[norms == 0] = 1 |
| | blocks = blocks / norms[:, np.newaxis] |
| | |
| | |
| | tree = cKDTree(blocks) |
| | |
| | |
| | |
| | similarity_threshold = 0.04 |
| | matches = set() |
| | |
| | |
| | num_processes = min(8, cpu_count()) |
| | |
| | |
| | chunk_size = len(blocks) // num_processes + 1 |
| | block_chunks = [range(i, min(i + chunk_size, len(blocks))) for i in range(0, len(blocks), chunk_size)] |
| | |
| | |
| | args_list = [(chunk, blocks, tree, similarity_threshold) for chunk in block_chunks] |
| | |
| | with Pool(num_processes) as pool: |
| | results = pool.map(find_matches, args_list) |
| | |
| | |
| | for result in results: |
| | matches.update(result) |
| | |
| | |
| | for i, j in matches: |
| | x1, y1 = positions[i] |
| | x2, y2 = positions[j] |
| | cv2.rectangle(clone_img, (x1, y1), (x1+block_size, y1+block_size), (0, 0, 255), 1) |
| | cv2.rectangle(clone_img, (x2, y2), (x2+block_size, y2+block_size), (255, 0, 0), 1) |
| | |
| | |
| | clone_result = Image.fromarray(cv2.cvtColor(clone_img, cv2.COLOR_BGR2RGB)) |
| | |
| | |
| | if scale != 1.0: |
| | orig_size = (int(clone_img.shape[1]/scale), int(clone_img.shape[0]/scale)) |
| | clone_result = clone_result.resize(orig_size, Image.LANCZOS) |
| | |
| | return clone_result, len(matches) |
| |
|
| | def error_level_analysis(image_path, quality=90, scale=10): |
| | """ |
| | Performs Error Level Analysis (ELA) on the image. |
| | |
| | Args: |
| | image_path: Path to the image file |
| | quality: JPEG quality level for recompression |
| | scale: Amplification factor for differences |
| | |
| | Returns: |
| | PIL Image containing the ELA result |
| | """ |
| | |
| | original = Image.open(image_path).convert('RGB') |
| | |
| | |
| | temp_filename = os.path.join(TEMP_DIR, "temp_ela_process.jpg") |
| | original.save(temp_filename, 'JPEG', quality=quality) |
| | recompressed = Image.open(temp_filename) |
| | |
| | |
| | diff = ImageChops.difference(original, recompressed) |
| | |
| | |
| | diff = ImageEnhance.Brightness(diff).enhance(scale) |
| | |
| | |
| | diff_array = np.array(diff) |
| | |
| | |
| | if len(diff_array.shape) == 3: |
| | diff_gray = np.mean(diff_array, axis=2) |
| | else: |
| | diff_gray = diff_array |
| | |
| | |
| | colormap = plt.get_cmap('jet') |
| | colored_diff = (colormap(diff_gray / 255.0) * 255).astype(np.uint8) |
| | |
| | |
| | colored_result = Image.fromarray(colored_diff[:, :, :3]) |
| | |
| | return colored_result |
| |
|
| | def extract_exif_metadata(image_path): |
| | """ |
| | Extracts EXIF metadata from the image and identifies potential manipulation indicators. |
| | |
| | Args: |
| | image_path: Path to the image file |
| | |
| | Returns: |
| | Dictionary with metadata and analysis |
| | """ |
| | try: |
| | img = Image.open(image_path) |
| | exif_data = img._getexif() or {} |
| | |
| | |
| | exif_tags = { |
| | 271: 'Make', 272: 'Model', 306: 'DateTime', |
| | 36867: 'DateTimeOriginal', 36868: 'DateTimeDigitized', |
| | 37510: 'UserComment', 40964: 'RelatedSoundFile', |
| | 305: 'Software', 315: 'Artist', 33432: 'Copyright' |
| | } |
| | |
| | |
| | metadata = {} |
| | for tag_id, value in exif_data.items(): |
| | tag = exif_tags.get(tag_id, str(tag_id)) |
| | metadata[tag] = str(value) |
| | |
| | |
| | indicators = [] |
| | |
| | |
| | editing_software = ['photoshop', 'lightroom', 'gimp', 'paint', 'editor', 'filter'] |
| | if 'Software' in metadata: |
| | software = metadata['Software'].lower() |
| | for editor in editing_software: |
| | if editor in software: |
| | indicators.append(f"Image edited with {metadata['Software']}") |
| | break |
| | |
| | |
| | if 'DateTimeOriginal' in metadata and 'DateTime' in metadata: |
| | if metadata['DateTimeOriginal'] != metadata['DateTime']: |
| | indicators.append("Capture time differs from modification time") |
| | |
| | |
| | if 'DateTime' in metadata and 'DateTimeOriginal' not in metadata: |
| | indicators.append("Original capture time missing") |
| | |
| | |
| | result = { |
| | "metadata": metadata, |
| | "indicators": indicators, |
| | "summary": "Potential manipulation detected" if indicators else "No obvious manipulation indicators", |
| | "analysis_count": len(metadata) |
| | } |
| | |
| | return result |
| | |
| | except Exception as e: |
| | return { |
| | "metadata": {"Error": str(e)}, |
| | "indicators": ["Error extracting metadata"], |
| | "summary": "Analysis failed", |
| | "analysis_count": 0 |
| | } |
| |
|
| | def noise_analysis(image_path, amplification=15): |
| | """ |
| | Extracts and analyzes noise patterns in the image to detect inconsistencies. |
| | |
| | Args: |
| | image_path: Path to the image file |
| | amplification: Factor to amplify noise for visualization |
| | |
| | Returns: |
| | PIL Image containing the noise analysis result |
| | """ |
| | |
| | img = cv2.imread(image_path) |
| | |
| | |
| | gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) |
| | |
| | |
| | blur = cv2.GaussianBlur(gray, (5, 5), 0) |
| | |
| | |
| | noise = cv2.subtract(gray, blur) |
| | |
| | |
| | noise = cv2.multiply(noise, amplification) |
| | |
| | |
| | noise_colored = cv2.applyColorMap(noise, cv2.COLORMAP_JET) |
| | |
| | |
| | noise_pil = Image.fromarray(cv2.cvtColor(noise_colored, cv2.COLOR_BGR2RGB)) |
| | |
| | return noise_pil |
| |
|
| | def manipulation_likelihood(image_path): |
| | """ |
| | Simulates a pre-trained model that evaluates the likelihood of image manipulation. |
| | For demo purposes, this generates a random score with some biasing based on image properties. |
| | |
| | Args: |
| | image_path: Path to the image file |
| | |
| | Returns: |
| | Dictionary with manipulation probability and areas of interest |
| | """ |
| | |
| | img = np.array(Image.open(image_path).convert('RGB')) |
| | |
| | |
| | height, width = img.shape[:2] |
| | |
| | |
| | |
| | |
| | |
| | heatmap = np.zeros((height, width), dtype=np.float32) |
| | |
| | |
| | |
| | |
| | |
| | num_regions = random.randint(1, 4) |
| | for _ in range(num_regions): |
| | x = random.randint(0, width - 1) |
| | y = random.randint(0, height - 1) |
| | radius = random.randint(width//10, width//5) |
| | |
| | |
| | y_indices, x_indices = np.ogrid[:height, :width] |
| | dist_from_center = ((y_indices - y)**2 + (x_indices - x)**2) |
| | mask = dist_from_center <= radius**2 |
| | |
| | |
| | intensity = random.uniform(0.5, 1.0) |
| | heatmap[mask] = np.maximum(heatmap[mask], intensity * np.exp(-dist_from_center[mask] / (2 * (radius/2)**2))) |
| | |
| | |
| | if np.max(heatmap) > 0: |
| | heatmap = heatmap / np.max(heatmap) |
| | |
| | |
| | cmap = LinearSegmentedColormap.from_list('custom', [(0, 0, 0, 0), (1, 0, 0, 0.7)]) |
| | heatmap_rgb = (cmap(heatmap) * 255).astype(np.uint8) |
| | |
| | |
| | orig_img = np.array(Image.open(image_path).convert('RGB')) |
| | overlay = orig_img.copy() |
| | |
| | |
| | for c in range(3): |
| | if c == 0: |
| | overlay[:, :, c] = np.where(heatmap_rgb[:, :, 3] > 0, |
| | (overlay[:, :, c] * 0.5 + heatmap_rgb[:, :, 0] * 0.5).astype(np.uint8), |
| | overlay[:, :, c]) |
| | else: |
| | overlay[:, :, c] = np.where(heatmap_rgb[:, :, 3] > 0, |
| | (overlay[:, :, c] * 0.5).astype(np.uint8), |
| | overlay[:, :, c]) |
| | |
| | |
| | |
| | exif_result = extract_exif_metadata(image_path) |
| | exif_factor = 0.3 if exif_result["indicators"] else 0.0 |
| | |
| | |
| | img_factor = 0.1 if ".jpg" in image_path.lower() else 0.0 |
| | |
| | |
| | base_probability = random.uniform(0.2, 0.8) |
| | manipulation_probability = min(0.95, base_probability + exif_factor + img_factor) |
| | |
| | |
| | overlay_image = Image.fromarray(overlay) |
| | |
| | |
| | return { |
| | "probability": manipulation_probability, |
| | "heatmap_image": overlay_image, |
| | "explanation": get_probability_explanation(manipulation_probability), |
| | "confidence": "medium" if 0.3 < manipulation_probability < 0.7 else "high" |
| | } |
| |
|
| | def get_probability_explanation(prob): |
| | """Returns an explanation text based on the manipulation probability""" |
| | if prob < 0.3: |
| | return "The image appears to be authentic with no significant signs of manipulation." |
| | elif prob < 0.6: |
| | return "Some inconsistencies detected that might indicate limited manipulation." |
| | else: |
| | return "Strong indicators of digital manipulation detected in this image." |
| |
|
| | def get_clone_explanation(count): |
| | """Returns an explanation based on the number of clone matches found""" |
| | if count == 0: |
| | return "No copy-paste manipulations detected in the image." |
| | elif count < 10: |
| | return "Few potential copy-paste regions detected, might be false positives." |
| | else: |
| | return "Significant number of copy-paste regions detected, suggesting manipulation." |
| |
|
| | def save_uploaded_image(image): |
| | """Save a PIL image to disk and return the path""" |
| | temp_path = os.path.join(TEMP_DIR, "temp_analyze.jpg") |
| | image.save(temp_path) |
| | return temp_path |
| |
|
| | def analyze_complete_image(image_path): |
| | """Comprehensive analysis of an image, running all forensic tests""" |
| | |
| | image = Image.open(image_path) |
| | |
| | |
| | exif_result = extract_exif_metadata(image_path) |
| | manipulation_result = manipulation_likelihood(image_path) |
| | clone_result, clone_count = detect_clones(image_path) |
| | ela_result = error_level_analysis(image_path) |
| | noise_result = noise_analysis(image_path) |
| | |
| | |
| | analysis_text = f""" |
| | ## Manipulation Analysis Results |
| | |
| | **Overall Assessment: {manipulation_result['probability']*100:.1f}% likelihood of manipulation** |
| | |
| | {manipulation_result['explanation']} |
| | |
| | ### Clone Detection Analysis: |
| | Found {clone_count} potential cloned regions in the image. |
| | {get_clone_explanation(clone_count)} |
| | |
| | ### EXIF Metadata Analysis: |
| | {exif_result['summary']} |
| | |
| | Indicators found: {len(exif_result['indicators'])} |
| | """ |
| | |
| | if exif_result['indicators']: |
| | analysis_text += "\nDetailed indicators:\n" |
| | for indicator in exif_result['indicators']: |
| | analysis_text += f"- {indicator}\n" |
| | |
| | |
| | return { |
| | "manipulation_probability": manipulation_result["probability"], |
| | "analysis_text": analysis_text, |
| | "exif_data": exif_result["metadata"], |
| | "clone_count": clone_count, |
| | "original_image": image, |
| | "ela_image": ela_result, |
| | "noise_image": noise_result, |
| | "heatmap_image": manipulation_result["heatmap_image"], |
| | "clone_image": clone_result |
| | } |
| |
|
| | |
| | |
| | |
| |
|
| | def analyze_image(image): |
| | """Main function for Gradio UI that processes the uploaded image""" |
| | if image is None: |
| | return None, None, None, None, None, "{}", "Please upload an image first.", 0 |
| | |
| | |
| | temp_path = save_uploaded_image(image) |
| | |
| | try: |
| | |
| | results = analyze_complete_image(temp_path) |
| | |
| | |
| | return ( |
| | image, |
| | results["ela_image"], |
| | results["noise_image"], |
| | results["heatmap_image"], |
| | results["clone_image"], |
| | json.dumps(results["exif_data"], indent=2), |
| | results["analysis_text"], |
| | results["manipulation_probability"] |
| | ) |
| | except Exception as e: |
| | error_message = f"Error occurred during analysis: {str(e)}" |
| | print(error_message) |
| | return image, None, None, None, None, f"Error: {str(e)}", error_message, 0 |
| |
|
| | |
| | |
| | |
| |
|
| | with gr.Blocks(title="Image Forensic & Fraud Detection Tool") as demo: |
| | gr.Markdown(""" |
| | # Image Forensic & Fraud Detection Tool |
| | |
| | Upload an image to analyze it for potential manipulation using various forensic techniques. |
| | """) |
| | |
| | with gr.Row(): |
| | with gr.Column(scale=1): |
| | input_image = gr.Image(type="pil", label="Upload Image for Analysis") |
| | analyze_button = gr.Button("Analyze Image", variant="primary") |
| | |
| | gr.Markdown("### Manipulation Probability") |
| | probability_slider = gr.Slider( |
| | minimum=0, maximum=1, value=0, |
| | label="Manipulation Probability", |
| | interactive=False |
| | ) |
| | |
| | gr.Markdown("### EXIF Metadata") |
| | exif_data = gr.Code(language="json", label="EXIF Data", lines=10) |
| | |
| | with gr.Column(scale=2): |
| | with gr.Tab("Analysis Results"): |
| | analysis_results = gr.Markdown() |
| | |
| | with gr.Tab("Original Image"): |
| | original_image = gr.Image(type="pil", label="Original Image") |
| | |
| | with gr.Tab("Error Level Analysis (ELA)"): |
| | gr.Markdown(""" |
| | Error Level Analysis reveals differences in compression levels. Areas with different compression levels |
| | often indicate modifications. Brighter regions in the visualization suggest potential manipulations. |
| | """) |
| | ela_image = gr.Image(type="pil", label="ELA Result") |
| | |
| | with gr.Tab("Noise Analysis"): |
| | gr.Markdown(""" |
| | Noise Analysis examines the noise patterns in the image. Inconsistent noise patterns often indicate |
| | areas that have been manipulated or added from different sources. |
| | """) |
| | noise_image = gr.Image(type="pil", label="Noise Pattern Analysis") |
| | |
| | with gr.Tab("Clone Detection"): |
| | gr.Markdown(""" |
| | Clone Detection identifies duplicated areas within the image. Red and blue rectangles highlight |
| | matching regions that may indicate copy-paste manipulation. |
| | """) |
| | clone_image = gr.Image(type="pil", label="Clone Detection Result") |
| | |
| | with gr.Tab("AI Detection Heatmap"): |
| | gr.Markdown(""" |
| | This heatmap highlights regions identified by our AI model as potentially manipulated. |
| | Red areas indicate suspicious regions with a higher likelihood of manipulation. |
| | """) |
| | heatmap_image = gr.Image(type="pil", label="AI-Detected Suspicious Regions") |
| | |
| | |
| | analyze_button.click( |
| | fn=analyze_image, |
| | inputs=[input_image], |
| | outputs=[ |
| | original_image, |
| | ela_image, |
| | noise_image, |
| | heatmap_image, |
| | clone_image, |
| | exif_data, |
| | analysis_results, |
| | probability_slider |
| | ] |
| | ) |
| | |
| | |
| | if __name__ == "__main__": |
| | demo.launch(server_name="0.0.0.0", server_port=7860) |