|
|
import streamlit as st |
|
|
from PIL import Image, ImageChops, ImageEnhance, ImageDraw, ImageFilter |
|
|
import numpy as np |
|
|
import matplotlib.pyplot as plt |
|
|
import matplotlib.pyplot as plt |
|
|
import matplotlib.patches as patches |
|
|
from scipy import ndimage |
|
|
from skimage import feature, measure |
|
|
import io |
|
|
import cv2 |
|
|
import os |
|
|
import cv2 as cv |
|
|
from mtcnn import MTCNN |
|
|
from tensorflow.keras.models import load_model |
|
|
from tensorflow.keras.preprocessing import image as keras_image |
|
|
import keras |
|
|
|
|
|
|
|
|
@st.cache_resource |
|
|
def load_image_forgery_model(): |
|
|
return load_model("imageforgerydetection.h5") |
|
|
|
|
|
@st.cache_resource |
|
|
def load_deepfake_image_model(): |
|
|
return load_model("deepfake_image_detection.h5") |
|
|
|
|
|
@st.cache_resource |
|
|
def load_video_forgery_model(): |
|
|
return load_model("videoforgerydetection.keras") |
|
|
|
|
|
|
|
|
IMG_SIZE = 224 |
|
|
MAX_SEQ_LENGTH = 20 |
|
|
NUM_FEATURES = 2048 |
|
|
|
|
|
@st.cache_resource |
|
|
def load_deepfake_model(): |
|
|
return load_model('video_classifier_full_model.h5') |
|
|
|
|
|
|
|
|
deepfake_model = load_deepfake_model() |
|
|
vocabulary2 = np.load('label_processor_vocabulary.npy', allow_pickle=True) |
|
|
label_processor2 = keras.layers.StringLookup(num_oov_indices=0, vocabulary=vocabulary2.tolist()) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def convert_to_ela_image(image, quality=90): |
|
|
temp_filename = 'temp_file_name.jpg' |
|
|
ela_filename = 'temp_ela.png' |
|
|
|
|
|
if image.mode != 'RGB': |
|
|
image = image.convert('RGB') |
|
|
|
|
|
image.save(temp_filename, 'JPEG', quality=quality) |
|
|
temp_image = Image.open(temp_filename) |
|
|
|
|
|
ela_image = ImageChops.difference(image, temp_image) |
|
|
extrema = ela_image.getextrema() |
|
|
max_diff = max([ex[1] for ex in extrema]) |
|
|
max_diff = max_diff if max_diff != 0 else 1 |
|
|
scale = 255.0 / max_diff |
|
|
|
|
|
ela_image = ImageEnhance.Brightness(ela_image).enhance(scale) |
|
|
return ela_image |
|
|
|
|
|
def prepare_image_for_forgery(image): |
|
|
ela_image = convert_to_ela_image(image, 90).resize((128, 128)) |
|
|
return np.array(ela_image).flatten() / 255.0 |
|
|
|
|
|
|
|
|
|
|
|
def create_ela_analysis(image): |
|
|
"""Create ELA analysis visualization""" |
|
|
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 6)) |
|
|
fig.suptitle('Error Level Analysis (ELA)', fontsize=14, fontweight='bold') |
|
|
|
|
|
|
|
|
ax1.imshow(image) |
|
|
ax1.set_title('Original Image') |
|
|
ax1.axis('off') |
|
|
|
|
|
|
|
|
ela_image = convert_to_ela_image(image, 90) |
|
|
ax2.imshow(ela_image) |
|
|
ax2.set_title('ELA Result (Bright areas indicate potential editing)') |
|
|
ax2.axis('off') |
|
|
|
|
|
plt.tight_layout() |
|
|
|
|
|
buffer = io.BytesIO() |
|
|
plt.savefig(buffer, format='png', dpi=150, bbox_inches='tight') |
|
|
buffer.seek(0) |
|
|
plt.close() |
|
|
|
|
|
return buffer |
|
|
|
|
|
|
|
|
|
|
|
def predict_deepfake_image(image_path, model): |
|
|
img = keras_image.load_img(image_path, target_size=(256, 256)) |
|
|
img_array = keras_image.img_to_array(img) / 255.0 |
|
|
img_array = np.expand_dims(img_array, axis=0) |
|
|
prediction = model.predict(img_array) |
|
|
return 'Real' if prediction[0] > 0.5 else 'Fake' |
|
|
|
|
|
|
|
|
|
|
|
target_height, target_width = 240, 320 |
|
|
threshold = 30 |
|
|
|
|
|
def predict_video_forgery_cnn(video_path, model): |
|
|
"""CNN-based video forgery detection""" |
|
|
vid = [] |
|
|
sumframes = 0 |
|
|
cap = cv2.VideoCapture(video_path) |
|
|
|
|
|
while cap.isOpened(): |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
|
|
|
|
|
|
frame = cv2.resize(frame, (target_width, target_height)) |
|
|
sumframes += 1 |
|
|
vid.append(frame) |
|
|
|
|
|
cap.release() |
|
|
|
|
|
if sumframes == 0: |
|
|
return False, 0, 0 |
|
|
|
|
|
Xtest = np.array(vid) |
|
|
output = model.predict(Xtest) |
|
|
output = output.reshape((-1)) |
|
|
|
|
|
|
|
|
forged_frames = sum(1 for i in output if i > 0.5) |
|
|
is_forged = any(i > 0.5 for i in output) |
|
|
|
|
|
return is_forged, forged_frames, sumframes |
|
|
|
|
|
def analyze_video_tampering(video_path): |
|
|
"""Frame difference analysis for tampering detection""" |
|
|
cap = cv2.VideoCapture(video_path) |
|
|
if not cap.isOpened(): |
|
|
return False, [], [] |
|
|
|
|
|
prev_frame = None |
|
|
frame_differences = [] |
|
|
suspected_frames = [] |
|
|
|
|
|
while cap.isOpened(): |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
|
|
|
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) |
|
|
|
|
|
if prev_frame is not None: |
|
|
diff = cv2.absdiff(gray, prev_frame) |
|
|
non_zero = np.count_nonzero(diff) |
|
|
frame_differences.append(non_zero) |
|
|
|
|
|
if non_zero < threshold: |
|
|
current_frame = int(cap.get(cv2.CAP_PROP_POS_FRAMES)) |
|
|
suspected_frames.append(current_frame) |
|
|
|
|
|
prev_frame = gray |
|
|
|
|
|
cap.release() |
|
|
|
|
|
|
|
|
is_tampered = len(suspected_frames) > 0 |
|
|
|
|
|
return is_tampered, frame_differences, suspected_frames |
|
|
|
|
|
def plot_frame_analysis(frame_differences): |
|
|
"""Create a simple plot of frame differences""" |
|
|
plt.figure(figsize=(10, 4)) |
|
|
plt.plot(frame_differences, color='blue', linewidth=1) |
|
|
plt.axhline(y=threshold, color='red', linestyle='--', label=f"Threshold ({threshold})") |
|
|
plt.xlabel("Frame Number") |
|
|
plt.ylabel("Pixel Differences") |
|
|
plt.title("Frame Difference Analysis") |
|
|
plt.legend() |
|
|
plt.grid(True, alpha=0.3) |
|
|
|
|
|
|
|
|
if frame_differences: |
|
|
mean_val = np.mean(frame_differences) |
|
|
std_val = np.std(frame_differences) |
|
|
plt.text(0.02, 0.98, f"Mean: {mean_val:.1f}\nStd: {std_val:.1f}", |
|
|
transform=plt.gca().transAxes, verticalalignment='top', |
|
|
bbox=dict(boxstyle='round', facecolor='white', alpha=0.8)) |
|
|
|
|
|
return plt |
|
|
|
|
|
def combined_video_forgery_detection(video_path, model): |
|
|
"""Combined detection using both CNN and frame analysis""" |
|
|
|
|
|
|
|
|
cnn_forged, cnn_forged_frames, total_frames = predict_video_forgery_cnn(video_path, model) |
|
|
|
|
|
|
|
|
frame_tampered, frame_differences, suspected_frames = analyze_video_tampering(video_path) |
|
|
|
|
|
|
|
|
results = { |
|
|
'cnn_forged': cnn_forged, |
|
|
'cnn_forged_frames': cnn_forged_frames, |
|
|
'frame_tampered': frame_tampered, |
|
|
'suspected_frames': len(suspected_frames), |
|
|
'total_frames': total_frames, |
|
|
'frame_differences': frame_differences |
|
|
} |
|
|
|
|
|
|
|
|
if cnn_forged and frame_tampered: |
|
|
verdict = "FORGED - Detected by both CNN and Frame Analysis" |
|
|
confidence = "High" |
|
|
elif cnn_forged: |
|
|
verdict = "FORGED - Detected by CNN" |
|
|
confidence = "Medium" |
|
|
elif frame_tampered: |
|
|
verdict = "FORGED - Detected by Frame Analysis" |
|
|
confidence = "Medium" |
|
|
else: |
|
|
verdict = "NOT TAMPERED - No Forgery detected" |
|
|
confidence = "High" |
|
|
|
|
|
return verdict, confidence, results |
|
|
|
|
|
|
|
|
def build_feature_extractor(): |
|
|
feature_extractor = keras.applications.InceptionV3( |
|
|
weights="imagenet", |
|
|
include_top=False, |
|
|
pooling="avg", |
|
|
input_shape=(IMG_SIZE, IMG_SIZE, 3), |
|
|
) |
|
|
preprocess_input = keras.applications.inception_v3.preprocess_input |
|
|
|
|
|
inputs = keras.Input((IMG_SIZE, IMG_SIZE, 3)) |
|
|
preprocessed = preprocess_input(inputs) |
|
|
outputs = feature_extractor(preprocessed) |
|
|
return keras.Model(inputs, outputs, name="feature_extractor") |
|
|
|
|
|
feature_extractor = build_feature_extractor() |
|
|
detector = MTCNN() |
|
|
|
|
|
def load_video(path, max_frames=0, resize=(IMG_SIZE, IMG_SIZE), skip_frames=2): |
|
|
cap = cv.VideoCapture(path) |
|
|
frames = [] |
|
|
frame_count = 0 |
|
|
previous_box = None |
|
|
|
|
|
while True: |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
|
|
|
if frame_count % skip_frames == 0: |
|
|
frame, previous_box = get_face_region_first_frame(frame, previous_box) |
|
|
if frame is not None: |
|
|
frame = cv.resize(frame, resize) |
|
|
frame = frame[:, :, [2, 1, 0]] |
|
|
frames.append(frame) |
|
|
|
|
|
if len(frames) == max_frames: |
|
|
break |
|
|
frame_count += 1 |
|
|
|
|
|
while len(frames) < max_frames and frames: |
|
|
frames.append(frames[-1]) |
|
|
|
|
|
cap.release() |
|
|
return np.array(frames) |
|
|
|
|
|
def get_face_region_first_frame(frame, previous_box=None): |
|
|
if previous_box is None: |
|
|
detections = detector.detect_faces(frame) |
|
|
if detections: |
|
|
x, y, width, height = detections[0]['box'] |
|
|
previous_box = (x, y, width, height) |
|
|
else: |
|
|
return None, None |
|
|
else: |
|
|
x, y, width, height = previous_box |
|
|
|
|
|
face_region = frame[y:y+height, x:x+width] |
|
|
return face_region, previous_box |
|
|
|
|
|
def prepare_single_video(frames): |
|
|
frames = frames[None, ...] |
|
|
frame_mask = np.zeros(shape=(1, MAX_SEQ_LENGTH,), dtype="bool") |
|
|
frame_features = np.zeros(shape=(1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32") |
|
|
|
|
|
for i, batch in enumerate(frames): |
|
|
video_length = batch.shape[0] |
|
|
length = min(MAX_SEQ_LENGTH, video_length) |
|
|
for j in range(length): |
|
|
frame_features[i, j, :] = feature_extractor.predict(batch[None, j, :]) |
|
|
frame_mask[i, :length] = 1 |
|
|
|
|
|
return frame_features, frame_mask |
|
|
|
|
|
def sequence_prediction(video_path): |
|
|
class_vocab = label_processor2.get_vocabulary() |
|
|
frames = load_video(video_path) |
|
|
if len(frames) == 0: |
|
|
st.error("Could not process video. Please try another file.") |
|
|
return None |
|
|
|
|
|
frame_features, frame_mask = prepare_single_video(frames) |
|
|
probabilities = deepfake_model.predict([frame_features, frame_mask])[0] |
|
|
|
|
|
predictions = {class_vocab[i]: probabilities[i] * 100 for i in np.argsort(probabilities)[::-1]} |
|
|
return predictions |
|
|
|
|
|
|
|
|
|
|
|
st.title("Fraudulent Image and Video Detection System") |
|
|
|
|
|
|
|
|
task = st.sidebar.selectbox("Choose a detection task:", [ |
|
|
"Image Forgery Detection", |
|
|
"Deepfake Image Detection", |
|
|
"Video Forgery Detection", |
|
|
"Deepfake Video Detection" |
|
|
]) |
|
|
|
|
|
|
|
|
if task == "Image Forgery Detection": |
|
|
uploaded_file = st.file_uploader("Upload an image", type=['jpg', 'jpeg', 'png']) |
|
|
|
|
|
if uploaded_file: |
|
|
image = Image.open(uploaded_file) |
|
|
|
|
|
st.image(image, caption="Uploaded Image", width=400) |
|
|
|
|
|
|
|
|
prepared_image = prepare_image_for_forgery(image).reshape(-1, 128, 128, 3) |
|
|
model = load_image_forgery_model() |
|
|
prediction = model.predict(prepared_image) |
|
|
confidence_real = prediction[0][1] * 100 |
|
|
confidence_fake = prediction[0][0] * 100 |
|
|
|
|
|
if confidence_real > confidence_fake: |
|
|
st.success(f"Result: Real Image with {confidence_real:.2f}% confidence") |
|
|
else: |
|
|
st.error(f"Result: Forged Image with {confidence_fake:.2f}% confidence") |
|
|
|
|
|
|
|
|
st.markdown("---") |
|
|
st.subheader("π Additional Analysis") |
|
|
|
|
|
|
|
|
show_ela = st.checkbox("View Error Level Analysis (ELA)", value=False) |
|
|
|
|
|
if show_ela: |
|
|
st.markdown("### Error Level Analysis") |
|
|
st.info("**ELA**: Reveals compression artifacts. Bright areas indicate potential editing or manipulation.") |
|
|
|
|
|
col1, col2 = st.columns([1, 3]) |
|
|
|
|
|
with col1: |
|
|
analyze_button = st.button("Run ELA Analysis", type="primary", use_container_width=True) |
|
|
|
|
|
if analyze_button: |
|
|
with st.spinner("Running Error Level Analysis..."): |
|
|
try: |
|
|
analysis_buffer = create_ela_analysis(image) |
|
|
|
|
|
|
|
|
st.image(analysis_buffer, caption="ELA Analysis Results", width=500) |
|
|
|
|
|
|
|
|
st.download_button( |
|
|
label="Download ELA Results", |
|
|
data=analysis_buffer.getvalue(), |
|
|
file_name="ela_analysis.png", |
|
|
mime="image/png", |
|
|
use_container_width=True |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
st.error(f"Error during ELA analysis: {str(e)}") |
|
|
st.info("ELA analysis may not work with all image types. Try with a different image if needed.") |
|
|
|
|
|
elif task == "Deepfake Image Detection": |
|
|
uploaded_file = st.file_uploader("Upload an image", type=['jpg', 'jpeg', 'png']) |
|
|
if uploaded_file: |
|
|
with open("temp_image.jpg", "wb") as f: |
|
|
f.write(uploaded_file.getbuffer()) |
|
|
|
|
|
|
|
|
st.image(uploaded_file, caption="Uploaded Image", width=400) |
|
|
model = load_deepfake_image_model() |
|
|
result = predict_deepfake_image("temp_image.jpg", model) |
|
|
|
|
|
if result == 'Real': |
|
|
st.success("Prediction: Real") |
|
|
else: |
|
|
st.error("Prediction: Fake") |
|
|
|
|
|
os.remove("temp_image.jpg") |
|
|
|
|
|
if task == "Video Forgery Detection": |
|
|
uploaded_file = st.file_uploader("Upload a video", type=['mp4', 'avi', 'mov', 'mkv']) |
|
|
|
|
|
if uploaded_file: |
|
|
|
|
|
with open("temp_video.mp4", "wb") as f: |
|
|
f.write(uploaded_file.getbuffer()) |
|
|
|
|
|
st.video("temp_video.mp4") |
|
|
st.write("Analyzing the video for forgery...") |
|
|
|
|
|
|
|
|
model = load_video_forgery_model() |
|
|
verdict, confidence, results = combined_video_forgery_detection("temp_video.mp4", model) |
|
|
|
|
|
|
|
|
if "FORGED" in verdict: |
|
|
st.error(f"π¨ {verdict}") |
|
|
else: |
|
|
st.success(f"β
{verdict}") |
|
|
|
|
|
st.write(f"**Confidence Level:** {confidence}") |
|
|
|
|
|
|
|
|
col1, col2 = st.columns(2) |
|
|
|
|
|
with col1: |
|
|
st.write("**CNN Analysis:**") |
|
|
if results['cnn_forged']: |
|
|
st.write(f"- Status: Forged β") |
|
|
st.write(f"- Forged Frames: {results['cnn_forged_frames']}/{results['total_frames']}") |
|
|
else: |
|
|
st.write(f"- Status: Not Forged β
") |
|
|
|
|
|
with col2: |
|
|
st.write("**Frame Analysis:**") |
|
|
if results['frame_tampered']: |
|
|
st.write(f"- Status: Tampered β") |
|
|
st.write(f"- Suspected Frames: {results['suspected_frames']}") |
|
|
else: |
|
|
st.write(f"- Status: Not Tampered β
") |
|
|
|
|
|
|
|
|
if results['frame_differences']: |
|
|
st.write("**Frame Difference Analysis:**") |
|
|
fig = plot_frame_analysis(results['frame_differences']) |
|
|
st.pyplot(fig) |
|
|
plt.close() |
|
|
|
|
|
|
|
|
os.remove("temp_video.mp4") |
|
|
|
|
|
|
|
|
elif task == "Deepfake Video Detection": |
|
|
uploaded_file = st.file_uploader("Upload a video", type=["mp4", "avi", "mov"]) |
|
|
if uploaded_file is not None: |
|
|
with open("temp_video.mp4", "wb") as f: |
|
|
f.write(uploaded_file.read()) |
|
|
|
|
|
st.video("temp_video.mp4") |
|
|
st.write("Analyzing the video...") |
|
|
|
|
|
frames = load_video("temp_video.mp4") |
|
|
if len(frames) == 0: |
|
|
st.error("Could not process video. Please try another file.") |
|
|
else: |
|
|
frame_features, frame_mask = prepare_single_video(frames) |
|
|
probabilities = deepfake_model.predict([frame_features, frame_mask])[0] |
|
|
|
|
|
predictions = {label_processor2.get_vocabulary()[i]: probabilities[i] * 100 for i in np.argsort(probabilities)[::-1]} |
|
|
|
|
|
if predictions: |
|
|
highest_label = max(predictions, key=predictions.get) |
|
|
highest_prob = predictions[highest_label] |
|
|
|
|
|
if highest_label.lower() == "real": |
|
|
st.success(f"The video is real with a confidence of {highest_prob:.2f}%.") |
|
|
elif highest_label.lower() == "fake": |
|
|
st.error(f"This video is a deepfake with a confidence of {highest_prob:.2f}%.") |
|
|
else: |
|
|
st.warning(f"Uncertain prediction: {highest_label} with {highest_prob:.2f}% confidence.") |
|
|
|