| import streamlit as st
|
| from PIL import Image, ImageChops, ImageEnhance
|
| import numpy as np
|
| import cv2
|
| import os
|
| import torch
|
| import timm
|
| import cv2 as cv
|
| from mtcnn import MTCNN
|
| from tensorflow.keras.models import load_model
|
| from tensorflow.keras.preprocessing import image as keras_image
|
| from torchvision import transforms
|
| import keras
|
|
|
|
|
| @st.cache_resource
|
| def load_image_forgery_model():
|
| return load_model("imageforgerydetection.h5")
|
|
|
| @st.cache_resource
|
| def load_deepfake_image_model():
|
| return load_model("deepfake_image_detection.h5")
|
|
|
| @st.cache_resource
|
| def load_video_forgery_model():
|
| return load_model("videoforgerydetection.keras")
|
|
|
|
|
| IMG_SIZE = 224
|
| MAX_SEQ_LENGTH = 20
|
| NUM_FEATURES = 2048
|
|
|
| @st.cache_resource
|
| def load_deepfake_model():
|
| return load_model('video_classifier_full_model.h5')
|
|
|
|
|
| deepfake_model = load_deepfake_model()
|
| vocabulary2 = np.load('label_processor_vocabulary.npy', allow_pickle=True)
|
| label_processor2 = keras.layers.StringLookup(num_oov_indices=0, vocabulary=vocabulary2.tolist())
|
|
|
|
|
|
|
|
|
| def convert_to_ela_image(image, quality=90):
|
| temp_filename = 'temp_file_name.jpg'
|
| ela_filename = 'temp_ela.png'
|
|
|
| if image.mode != 'RGB':
|
| image = image.convert('RGB')
|
|
|
| image.save(temp_filename, 'JPEG', quality=quality)
|
| temp_image = Image.open(temp_filename)
|
|
|
| ela_image = ImageChops.difference(image, temp_image)
|
| extrema = ela_image.getextrema()
|
| max_diff = max([ex[1] for ex in extrema])
|
| max_diff = max_diff if max_diff != 0 else 1
|
| scale = 255.0 / max_diff
|
|
|
| ela_image = ImageEnhance.Brightness(ela_image).enhance(scale)
|
| return ela_image
|
|
|
| def prepare_image_for_forgery(image):
|
| ela_image = convert_to_ela_image(image, 90).resize((128, 128))
|
| return np.array(ela_image).flatten() / 255.0
|
|
|
|
|
| def predict_deepfake_image(image_path, model):
|
| img = keras_image.load_img(image_path, target_size=(256, 256))
|
| img_array = keras_image.img_to_array(img) / 255.0
|
| img_array = np.expand_dims(img_array, axis=0)
|
| prediction = model.predict(img_array)
|
| return 'Real' if prediction[0] > 0.5 else 'Fake'
|
|
|
|
|
| target_height, target_width = 240, 320
|
|
|
| def predict_video_forgery(video_path, model):
|
| vid = []
|
| sumframes = 0
|
| cap = cv2.VideoCapture(video_path)
|
|
|
| while cap.isOpened():
|
| ret, frame = cap.read()
|
| if not ret:
|
| break
|
|
|
|
|
| frame = cv2.resize(frame, (target_width, target_height))
|
|
|
| sumframes += 1
|
| vid.append(frame)
|
|
|
| cap.release()
|
| st.write(f"No. Of Frames in the Video: {sumframes}")
|
|
|
| Xtest = np.array(vid)
|
| output = model.predict(Xtest)
|
| output = output.reshape((-1))
|
|
|
| results = []
|
| for i in output:
|
| if i>0.5:
|
| results.append(1)
|
| else:
|
| results.append(0)
|
|
|
|
|
| forge_flag = 0
|
| for i in results:
|
| if i == 1:
|
| forge_flag = 1
|
| break
|
|
|
| forge_flag = any(results)
|
|
|
| if forge_flag == 0:
|
| return "The video is not forged", 0, sumframes
|
| else:
|
| return "The video is forged", sum(results), sumframes
|
|
|
|
|
|
|
| def build_feature_extractor():
|
| feature_extractor = keras.applications.InceptionV3(
|
| weights="imagenet",
|
| include_top=False,
|
| pooling="avg",
|
| input_shape=(IMG_SIZE, IMG_SIZE, 3),
|
| )
|
| preprocess_input = keras.applications.inception_v3.preprocess_input
|
|
|
| inputs = keras.Input((IMG_SIZE, IMG_SIZE, 3))
|
| preprocessed = preprocess_input(inputs)
|
| outputs = feature_extractor(preprocessed)
|
| return keras.Model(inputs, outputs, name="feature_extractor")
|
|
|
| feature_extractor = build_feature_extractor()
|
| detector = MTCNN()
|
|
|
| def load_video(path, max_frames=0, resize=(IMG_SIZE, IMG_SIZE), skip_frames=2):
|
| cap = cv.VideoCapture(path)
|
| frames = []
|
| frame_count = 0
|
| previous_box = None
|
|
|
| while True:
|
| ret, frame = cap.read()
|
| if not ret:
|
| break
|
|
|
| if frame_count % skip_frames == 0:
|
| frame, previous_box = get_face_region_first_frame(frame, previous_box)
|
| if frame is not None:
|
| frame = cv.resize(frame, resize)
|
| frame = frame[:, :, [2, 1, 0]]
|
| frames.append(frame)
|
|
|
| if len(frames) == max_frames:
|
| break
|
| frame_count += 1
|
|
|
| while len(frames) < max_frames and frames:
|
| frames.append(frames[-1])
|
|
|
| cap.release()
|
| return np.array(frames)
|
|
|
| def get_face_region_first_frame(frame, previous_box=None):
|
| if previous_box is None:
|
| detections = detector.detect_faces(frame)
|
| if detections:
|
| x, y, width, height = detections[0]['box']
|
| previous_box = (x, y, width, height)
|
| else:
|
| return None, None
|
| else:
|
| x, y, width, height = previous_box
|
|
|
| face_region = frame[y:y+height, x:x+width]
|
| return face_region, previous_box
|
|
|
| def prepare_single_video(frames):
|
| frames = frames[None, ...]
|
| frame_mask = np.zeros(shape=(1, MAX_SEQ_LENGTH,), dtype="bool")
|
| frame_features = np.zeros(shape=(1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32")
|
|
|
| for i, batch in enumerate(frames):
|
| video_length = batch.shape[0]
|
| length = min(MAX_SEQ_LENGTH, video_length)
|
| for j in range(length):
|
| frame_features[i, j, :] = feature_extractor.predict(batch[None, j, :])
|
| frame_mask[i, :length] = 1
|
|
|
| return frame_features, frame_mask
|
|
|
| def sequence_prediction(video_path):
|
| class_vocab = label_processor2.get_vocabulary()
|
| frames = load_video(video_path)
|
| if len(frames) == 0:
|
| st.error("Could not process video. Please try another file.")
|
| return None
|
|
|
| frame_features, frame_mask = prepare_single_video(frames)
|
| probabilities = deepfake_model.predict([frame_features, frame_mask])[0]
|
|
|
| predictions = {class_vocab[i]: probabilities[i] * 100 for i in np.argsort(probabilities)[::-1]}
|
| return predictions
|
|
|
|
|
|
|
| st.title("Fraudulent Image and Video Detection System")
|
|
|
|
|
| task = st.sidebar.selectbox("Choose a detection task:", [
|
| "Image Forgery Detection",
|
| "Deepfake Image Detection",
|
| "Video Forgery Detection",
|
| "Deepfake Video Detection"
|
| ])
|
|
|
| if task == "Image Forgery Detection":
|
| uploaded_file = st.file_uploader("Upload an image", type=['jpg', 'jpeg', 'png'])
|
| if uploaded_file:
|
| image = Image.open(uploaded_file)
|
| st.image(image, caption="Uploaded Image", use_column_width=True)
|
|
|
| prepared_image = prepare_image_for_forgery(image).reshape(-1, 128, 128, 3)
|
| model = load_image_forgery_model()
|
| prediction = model.predict(prepared_image)
|
| confidence_real = prediction[0][1] * 100
|
| confidence_fake = prediction[0][0] * 100
|
|
|
| if confidence_real > confidence_fake:
|
| st.success(f"Result: Real Image with {confidence_real:.2f}% confidence")
|
| else:
|
| st.error(f"Result: Forged Image with {confidence_fake:.2f}% confidence")
|
|
|
| elif task == "Deepfake Image Detection":
|
| uploaded_file = st.file_uploader("Upload an image", type=['jpg', 'jpeg', 'png'])
|
| if uploaded_file:
|
| with open("temp_image.jpg", "wb") as f:
|
| f.write(uploaded_file.getbuffer())
|
|
|
| st.image(uploaded_file, caption="Uploaded Image", use_column_width=True)
|
| model = load_deepfake_image_model()
|
| result = predict_deepfake_image("temp_image.jpg", model)
|
|
|
| if result == 'Real':
|
| st.success("Prediction: Real")
|
| else:
|
| st.error("Prediction: Fake")
|
|
|
| os.remove("temp_image.jpg")
|
|
|
| if task == "Video Forgery Detection":
|
| uploaded_file = st.file_uploader("Upload a video", type=['mp4', 'avi', 'mov', 'mkv'])
|
| if uploaded_file:
|
| with open("temp_video.mp4", "wb") as f:
|
| f.write(uploaded_file.getbuffer())
|
|
|
| st.video("temp_video.mp4")
|
| st.write("Analyzing the video for forgery...")
|
|
|
| model = load_video_forgery_model()
|
| result_message, forged_frames, total_frames = predict_video_forgery("temp_video.mp4", model)
|
|
|
| if forged_frames == 0:
|
| st.success(result_message)
|
| else:
|
| st.error(result_message)
|
|
|
| st.write(f"Forged Frames: {forged_frames}/{total_frames}")
|
| os.remove("temp_video.mp4")
|
|
|
|
|
| elif task == "Deepfake Video Detection":
|
| uploaded_file = st.file_uploader("Upload a video", type=["mp4", "avi", "mov"])
|
| if uploaded_file is not None:
|
| with open("temp_video.mp4", "wb") as f:
|
| f.write(uploaded_file.read())
|
|
|
| st.video("temp_video.mp4")
|
| st.write("Analyzing the video...")
|
|
|
| frames = load_video("temp_video.mp4")
|
| if len(frames) == 0:
|
| st.error("Could not process video. Please try another file.")
|
| else:
|
| frame_features, frame_mask = prepare_single_video(frames)
|
| probabilities = deepfake_model.predict([frame_features, frame_mask])[0]
|
|
|
| predictions = {label_processor2.get_vocabulary()[i]: probabilities[i] * 100 for i in np.argsort(probabilities)[::-1]}
|
|
|
| if predictions:
|
| highest_label = max(predictions, key=predictions.get)
|
| highest_prob = predictions[highest_label]
|
|
|
| if highest_label.lower() == "real":
|
| st.success(f"The video is real with a confidence of {highest_prob:.2f}%.")
|
| elif highest_label.lower() == "fake":
|
| st.error(f"This video is a deepfake with a confidence of {highest_prob:.2f}%.")
|
| else:
|
| st.warning(f"Uncertain prediction: {highest_label} with {highest_prob:.2f}% confidence.")
|
|
|