Spaces:
Runtime error
Runtime error
| from PIL import Image | |
| import numpy as np | |
| import base64 | |
| from transformers import AutoImageProcessor, Mask2FormerForUniversalSegmentation | |
| from flask import Flask, request, jsonify | |
| from flask_cors import CORS | |
| import matplotlib | |
| matplotlib.use('Agg') | |
| import matplotlib.pyplot as plt | |
| import google.generativeai as genai | |
| from langchain_core.messages import HumanMessage | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| from reportlab.lib.utils import ImageReader | |
| from flask import send_file, jsonify, request | |
| from reportlab.pdfgen import canvas | |
| from reportlab.lib.pagesizes import A4 | |
| from reportlab.lib.units import inch | |
| import io, torch, os | |
| os.environ["MPLCONFIGDIR"] = "/tmp" | |
| from reportlab.lib import colors | |
| from datetime import datetime | |
| os.environ['GOOGLE_API_KEY'] = "AIzaSyCv2dNQMCD3-9s3E5Th7bDy4ko0dyucRCc" | |
| genai.configure(api_key=os.environ['GOOGLE_API_KEY']) | |
| # Setup | |
| app = Flask(__name__) | |
| CORS(app) | |
| # Initialize device | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| # Load model and processor | |
| processor = AutoImageProcessor.from_pretrained("facebook/mask2former-swin-tiny-ade-semantic") | |
| model = Mask2FormerForUniversalSegmentation.from_pretrained("facebook/mask2former-swin-tiny-ade-semantic") | |
| # model.load_state_dict(torch.load(r"E:\FYP Work\FYP_code\backend\mask2former-ade-(splicing1_2).pth", map_location=device)) | |
| model.load_state_dict(torch.load(r"mask2former-ade-(splicing1_2).pth", map_location=device)) | |
| model = model.to(device) | |
| model.eval() | |
| # ========== Flask routes ========== | |
| def home(): | |
| return "Backend is running!" | |
| def predict(): | |
| if 'image' not in request.files: | |
| return jsonify({"error": "No image uploaded"}), 400 | |
| try: | |
| file = request.files['image'] | |
| image = Image.open(io.BytesIO(file.read())) | |
| # Convert to RGB if needed | |
| if image.mode != 'RGB': | |
| image = image.convert('RGB') | |
| # Encode original image to base64 | |
| original_image_buffer = io.BytesIO() | |
| image.save(original_image_buffer, format="PNG") | |
| original_image_base64 = base64.b64encode(original_image_buffer.getvalue()).decode("utf-8") | |
| # Process image using Mask2Former processor | |
| inputs = processor(images=image, return_tensors="pt").to(device) | |
| # Predict | |
| with torch.no_grad(): | |
| outputs = model(**inputs) | |
| # Process outputs | |
| predicted_segmentation = processor.post_process_semantic_segmentation( | |
| outputs, target_sizes=[image.size[::-1]] | |
| )[0] | |
| # Convert to numpy array for visualization | |
| segmentation_mask = predicted_segmentation.cpu().numpy() | |
| # ========== Create visualizations ========== | |
| # Create side-by-side plot | |
| fig, axes = plt.subplots(1, 2, figsize=(10, 5)) | |
| axes[0].imshow(image) | |
| axes[0].set_title("Input Image") | |
| axes[1].imshow(segmentation_mask) | |
| axes[1].set_title("Prediction") | |
| for ax in axes: | |
| ax.axis("off") | |
| plt.tight_layout() | |
| # Save visualization to buffer | |
| buf = io.BytesIO() | |
| plt.savefig(buf, format="png", bbox_inches='tight', pad_inches=0) | |
| buf.seek(0) | |
| visualization_base64 = base64.b64encode(buf.read()).decode('utf-8') | |
| plt.close() | |
| # ========== Encode mask separately ========== | |
| # Normalize mask to 0-255 range | |
| mask_normalized = (segmentation_mask - segmentation_mask.min()) * (255.0 / (segmentation_mask.max() - segmentation_mask.min())) | |
| mask_image = Image.fromarray(mask_normalized.astype(np.uint8)) | |
| mask_buffer = io.BytesIO() | |
| mask_image.save(mask_buffer, format="PNG") | |
| mask_base64 = base64.b64encode(mask_buffer.getvalue()).decode("utf-8") | |
| #VLM code | |
| llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash") | |
| # Create multimodal message | |
| message = HumanMessage( | |
| content=[ | |
| { | |
| "type": "text", | |
| #"text": "Please explain briefly where the manipulation has been occured, don't use mask" | |
| "text": " This is an image and its predicted binary mask showing manipulated regions in white. " | |
| "Please explain briefly in 2-3 lines where the manipulation occurred and what might have been altered." | |
| }, | |
| { | |
| "type": "image_url", | |
| "image_url": { | |
| "url": f"data:image/jpeg;base64,{original_image_base64}" | |
| }, | |
| }, | |
| { | |
| "type": "image_url", | |
| "image_url": { | |
| "url": f"data:image/png;base64,{mask_base64}" | |
| }, | |
| }, | |
| ] | |
| ) | |
| # Get response | |
| response = llm.invoke([message]) | |
| print(response.content) | |
| return jsonify({ | |
| "original_image": original_image_base64, | |
| "mask": mask_base64, | |
| "visualization": visualization_base64, | |
| "message": response.content | |
| }) | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| import json | |
| from threading import Lock | |
| counter_file = "counter.json" | |
| counter_lock = Lock() | |
| def get_case_id(): | |
| today = datetime.now().strftime('%Y%m%d') | |
| with counter_lock: | |
| if os.path.exists(counter_file): | |
| with open(counter_file, "r") as f: | |
| data = json.load(f) | |
| else: | |
| data = {} | |
| count = data.get(today, 0) + 1 | |
| data[today] = count | |
| with open(counter_file, "w") as f: | |
| json.dump(data, f) | |
| return f"DFD-{today}-{count:03d}" | |
| def download_report(): | |
| try: | |
| file = request.files['image'] | |
| image = Image.open(io.BytesIO(file.read())).convert("RGB") | |
| # === Process Image === | |
| inputs = processor(images=image, return_tensors="pt").to(device) | |
| with torch.no_grad(): | |
| outputs = model(**inputs) | |
| predicted_segmentation = processor.post_process_semantic_segmentation( | |
| outputs, target_sizes=[image.size[::-1]] | |
| )[0] | |
| segmentation_mask = predicted_segmentation.cpu().numpy() | |
| # === Create Mask Image === | |
| mask_normalized = (segmentation_mask - segmentation_mask.min()) * (255.0 / (segmentation_mask.max() - segmentation_mask.min())) | |
| mask_image = Image.fromarray(mask_normalized.astype(np.uint8)).convert("L") | |
| # === Prepare Images === | |
| image.save("temp_input.png") | |
| mask_image.save("temp_mask.png") | |
| # === Get LLM Analysis === | |
| # Encode images for LLM | |
| original_buffer = io.BytesIO() | |
| image.save(original_buffer, format="PNG") | |
| original_base64 = base64.b64encode(original_buffer.getvalue()).decode("utf-8") | |
| mask_buffer = io.BytesIO() | |
| mask_image.save(mask_buffer, format="PNG") | |
| mask_base64 = base64.b64encode(mask_buffer.getvalue()).decode("utf-8") | |
| # Get professional analysis from Gemini | |
| llm = ChatGoogleGenerativeAI(model="gemini-1.5-flash") | |
| message = HumanMessage( | |
| content=[ | |
| { | |
| "type": "text", | |
| "text": " This is an image and its predicted binary mask showing manipulated regions in white. " | |
| "Please explain briefly where the manipulation occurred and what might have been altered." | |
| }, | |
| { | |
| "type": "image_url", | |
| "image_url": {"url": f"data:image/jpeg;base64,{original_base64}"}, | |
| }, | |
| { | |
| "type": "image_url", | |
| "image_url": {"url": f"data:image/png;base64,{mask_base64}"}, | |
| }, | |
| ] | |
| ) | |
| llm_response = llm.invoke([message]).content | |
| # === Generate PDF Report === | |
| buffer = io.BytesIO() | |
| c = canvas.Canvas(buffer, pagesize=A4) | |
| width, height = A4 | |
| # === Professional Report Design === | |
| # Light blue background | |
| c.setFillColorRGB(0.96, 0.96, 1) | |
| c.rect(0, 0, width, height, fill=1, stroke=0) | |
| # Dark blue header | |
| c.setFillColorRGB(0, 0.2, 0.4) | |
| c.rect(0, height-80, width, 80, fill=1, stroke=0) | |
| # Title | |
| c.setFillColorRGB(1, 1, 1) | |
| c.setFont("Helvetica-Bold", 18) | |
| c.drawCentredString(width/2, height-50, "DIGITAL IMAGE AUTHENTICITY REPORT") | |
| c.setFont("Helvetica", 10) | |
| c.drawCentredString(width/2, height-70, "Forensic Analysis Report") | |
| # Metadata | |
| c.setFillColorRGB(0, 0, 0) | |
| c.setFont("Helvetica", 9) | |
| c.drawString(40, height-100, f"Report Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
| case_id = get_case_id() | |
| c.drawString(width-200, height-100, f"Case ID: {case_id}") | |
| # Divider | |
| c.setStrokeColorRGB(0, 0.4, 0.6) | |
| c.setLineWidth(1) | |
| c.line(40, height-110, width-40, height-110) | |
| # === Analysis Summary === | |
| c.setFillColorRGB(0, 0.3, 0.6) | |
| c.setFont("Helvetica-Bold", 12) | |
| c.drawString(40, height-140, "EXECUTIVE SUMMARY") | |
| c.setFillColorRGB(0, 0, 0) | |
| c.setFont("Helvetica", 10) | |
| summary_text = [ | |
| "This report presents forensic analysis of potential digital manipulations", | |
| "using state-of-the-art AI detection models. Key findings are summarized below." | |
| ] | |
| text_object = c.beginText(40, height-160) | |
| text_object.setFont("Helvetica", 10) | |
| text_object.setLeading(14) | |
| for line in summary_text: | |
| text_object.textLine(line) | |
| c.drawText(text_object) | |
| # === Image Evidence === | |
| img_y = height-420 | |
| img_width = 220 | |
| img_height = 220 | |
| # Original Image | |
| c.drawImage("temp_input.png", 40, img_y, width=img_width, height=img_height) | |
| c.setFillColorRGB(0, 0.3, 0.6) | |
| c.setFont("Helvetica-Bold", 10) | |
| c.drawString(40, img_y-20, "ORIGINAL IMAGE") | |
| # Detection Result | |
| c.drawImage("temp_mask.png", width-260, img_y, width=img_width, height=img_height) | |
| c.drawString(width-260, img_y-20, "DETECTION HEATMAP") | |
| # === AI Analysis Section === | |
| c.setFillColorRGB(0, 0.3, 0.6) | |
| c.setFont("Helvetica-Bold", 12) | |
| c.drawString(40, img_y-50, "AI FORENSIC ANALYSIS") | |
| # Format LLM response with proper line breaks | |
| from textwrap import wrap | |
| analysis_lines = [] | |
| for paragraph in llm_response.split('\n'): | |
| analysis_lines.extend(wrap(paragraph, width=90)) | |
| text_object = c.beginText(40, img_y-70) | |
| text_object.setFont("Helvetica", 10) | |
| text_object.setLeading(14) | |
| # Show first 10 lines (adjust based on space) | |
| for line in analysis_lines[:10]: | |
| text_object.textLine(line) | |
| if len(analysis_lines) > 10: | |
| text_object.textLine("\n[Full analysis available in digital report]") | |
| c.drawText(text_object) | |
| # === Technical Details === | |
| c.setFillColorRGB(0, 0.3, 0.6) | |
| c.setFont("Helvetica-Bold", 12) | |
| c.drawString(40, img_y-180, "TECHNICAL SPECIFICATIONS") | |
| c.setFillColorRGB(0, 0, 0) | |
| c.setFont("Helvetica", 10) | |
| tech_details = [ | |
| f"Analysis Model: Mask2Former-Swin (ADE20K Fine-tuned)", | |
| #f"Detection Threshold: {segmentation_mask.max():.2f} confidence", | |
| f"Processing Date: {datetime.now().strftime('%Y-%m-%d')}", | |
| "Report Version: 1.1" | |
| ] | |
| text_object = c.beginText(40, img_y-200) | |
| text_object.setFont("Helvetica", 10) | |
| text_object.setLeading(14) | |
| for line in tech_details: | |
| text_object.textLine(line) | |
| c.drawText(text_object) | |
| # === Footer === | |
| c.setFillColorRGB(0, 0.2, 0.4) | |
| c.rect(0, 40, width, 40, fill=1, stroke=0) | |
| c.setFillColorRGB(1, 1, 1) | |
| c.setFont("Helvetica", 8) | |
| c.drawCentredString(width/2, 65, "This report was generated by AI forensic tools and should be verified by human experts") | |
| c.drawCentredString(width/2, 55, "Sukkur IBA University | Digital Forensics Lab | © 2024 Deepfake Research Project") | |
| c.save() | |
| buffer.seek(0) | |
| # Cleanup | |
| os.remove("temp_input.png") | |
| os.remove("temp_mask.png") | |
| return send_file( | |
| buffer, | |
| mimetype='application/pdf', | |
| as_attachment=True, | |
| download_name=f"forensic_report_{datetime.now().strftime('%Y%m%d_%H%M')}.pdf" | |
| ) | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| if __name__ == '__main__': | |
| app.run(host='0.0.0.0', port=7860, debug=False) |