|
|
import os |
|
|
import cv2 |
|
|
import torch |
|
|
import zipfile |
|
|
import librosa |
|
|
import numpy as np |
|
|
import tensorflow_addons |
|
|
import tensorflow as tf |
|
|
from facenet_pytorch import MTCNN |
|
|
from rawnet import RawNet |
|
|
|
|
|
|
|
|
tf.random.set_seed(42) |
|
|
|
|
|
local_zip = "./efficientnet-b0.zip" |
|
|
zip_ref = zipfile.ZipFile(local_zip, 'r') |
|
|
zip_ref.extractall() |
|
|
zip_ref.close() |
|
|
|
|
|
|
|
|
|
|
|
model = tf.keras.models.load_model("efficientnet-b0/") |
|
|
|
|
|
|
|
|
|
|
|
class DetectionPipeline: |
|
|
"""Pipeline class for detecting faces in the frames of a video file.""" |
|
|
|
|
|
def __init__(self, n_frames=None, batch_size=60, resize=None, input_modality = 'video'): |
|
|
"""Constructor for DetectionPipeline class. |
|
|
|
|
|
Keyword Arguments: |
|
|
n_frames {int} -- Total number of frames to load. These will be evenly spaced |
|
|
throughout the video. If not specified (i.e., None), all frames will be loaded. |
|
|
(default: {None}) |
|
|
batch_size {int} -- Batch size to use with MTCNN face detector. (default: {32}) |
|
|
resize {float} -- Fraction by which to resize frames from original prior to face |
|
|
detection. A value less than 1 results in downsampling and a value greater than |
|
|
1 result in upsampling. (default: {None}) |
|
|
""" |
|
|
self.n_frames = n_frames |
|
|
self.batch_size = batch_size |
|
|
self.resize = resize |
|
|
self.input_modality = input_modality |
|
|
|
|
|
def __call__(self, filename): |
|
|
"""Load frames from an MP4 video and detect faces. |
|
|
|
|
|
Arguments: |
|
|
filename {str} -- Path to video. |
|
|
""" |
|
|
|
|
|
if self.input_modality == 'video': |
|
|
print('Input modality is video.') |
|
|
v_cap = cv2.VideoCapture(filename) |
|
|
v_len = int(v_cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
|
|
|
|
|
|
if self.n_frames is None: |
|
|
sample = np.arange(0, v_len) |
|
|
else: |
|
|
sample = np.linspace(0, v_len - 1, self.n_frames).astype(int) |
|
|
|
|
|
|
|
|
faces = [] |
|
|
frames = [] |
|
|
for j in range(v_len): |
|
|
success = v_cap.grab() |
|
|
if j in sample: |
|
|
|
|
|
success, frame = v_cap.retrieve() |
|
|
if not success: |
|
|
continue |
|
|
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
|
|
|
|
|
|
|
if self.resize is not None: |
|
|
frame = frame.resize([int(d * self.resize) for d in frame.size]) |
|
|
frames.append(frame) |
|
|
|
|
|
|
|
|
if len(frames) % self.batch_size == 0 or j == sample[-1]: |
|
|
face2 = cv2.resize(frame, (224, 224)) |
|
|
faces.append(face2) |
|
|
|
|
|
v_cap.release() |
|
|
return faces |
|
|
|
|
|
elif self.input_modality == 'image': |
|
|
print('Input modality is image.') |
|
|
|
|
|
print('Reading image') |
|
|
|
|
|
image = cv2.cvtColor(filename, cv2.COLOR_BGR2RGB) |
|
|
image = cv2.resize(image, (224, 224)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return image |
|
|
|
|
|
elif self.input_modality == 'audio': |
|
|
print("INput modality is audio.") |
|
|
|
|
|
|
|
|
x, sr = librosa.load(filename) |
|
|
x_pt = torch.Tensor(x) |
|
|
x_pt = torch.unsqueeze(x_pt, dim = 0) |
|
|
return x_pt |
|
|
|
|
|
else: |
|
|
raise ValueError("Invalid input modality. Must be either 'video' or image") |
|
|
|
|
|
detection_video_pipeline = DetectionPipeline(n_frames=5, batch_size=1, input_modality='video') |
|
|
detection_image_pipeline = DetectionPipeline(batch_size = 1, input_modality = 'image') |
|
|
|
|
|
def deepfakes_video_predict(input_video): |
|
|
|
|
|
faces = detection_video_pipeline(input_video) |
|
|
total = 0 |
|
|
real_res = [] |
|
|
fake_res = [] |
|
|
|
|
|
for face in faces: |
|
|
|
|
|
face2 = face/255 |
|
|
pred = model.predict(np.expand_dims(face2, axis=0))[0] |
|
|
real, fake = pred[0], pred[1] |
|
|
real_res.append(real) |
|
|
fake_res.append(fake) |
|
|
|
|
|
total+=1 |
|
|
|
|
|
pred2 = pred[1] |
|
|
|
|
|
if pred2 > 0.5: |
|
|
fake+=1 |
|
|
else: |
|
|
real+=1 |
|
|
real_mean = np.mean(real_res) |
|
|
fake_mean = np.mean(fake_res) |
|
|
print(f"Real Faces: {real_mean}") |
|
|
print(f"Fake Faces: {fake_mean}") |
|
|
text = "" |
|
|
|
|
|
if real_mean >= 0.5: |
|
|
text = "The video is REAL. \n Deepfakes Confidence: " + str(round(100 - (real_mean*100), 3)) + "%" |
|
|
else: |
|
|
text = "The video is FAKE. \n Deepfakes Confidence: " + str(round(fake_mean*100, 3)) + "%" |
|
|
|
|
|
return text |
|
|
|
|
|
|
|
|
def deepfakes_image_predict(input_image): |
|
|
faces = detection_image_pipeline(input_image) |
|
|
face2 = faces/255 |
|
|
pred = model.predict(np.expand_dims(face2, axis = 0))[0] |
|
|
real, fake = pred[0], pred[1] |
|
|
if real > 0.5: |
|
|
text2 = "The image is REAL. \n Deepfakes Confidence: " + str(round(100 - (real*100), 3)) + "%" |
|
|
else: |
|
|
text2 = "The image is FAKE. \n Deepfakes Confidence: " + str(round(fake*100, 3)) + "%" |
|
|
return text2 |
|
|
|
|
|
def load_audio_model(): |
|
|
d_args = { |
|
|
"nb_samp": 64600, |
|
|
"first_conv": 1024, |
|
|
"in_channels": 1, |
|
|
"filts": [20, [20, 20], [20, 128], [128, 128]], |
|
|
"blocks": [2, 4], |
|
|
"nb_fc_node": 1024, |
|
|
"gru_node": 1024, |
|
|
"nb_gru_layer": 3, |
|
|
"nb_classes": 2} |
|
|
|
|
|
model = RawNet(d_args = d_args, device='cpu') |
|
|
|
|
|
|
|
|
model_dict = model.state_dict() |
|
|
ckpt = torch.load('RawNet2.pth', map_location=torch.device('cpu')) |
|
|
model.load_state_dict(ckpt, model_dict) |
|
|
return model |
|
|
|
|
|
audio_label_map = { |
|
|
0: "Real audio", |
|
|
1: "Fake audio" |
|
|
} |
|
|
|
|
|
def deepfakes_audio_predict(input_audio): |
|
|
|
|
|
x, sr = input_audio |
|
|
x_pt = torch.Tensor(x) |
|
|
x_pt = torch.unsqueeze(x_pt, dim = 0) |
|
|
|
|
|
|
|
|
model = load_audio_model() |
|
|
|
|
|
|
|
|
grads = model(x_pt) |
|
|
|
|
|
|
|
|
grads_np = grads.detach().numpy() |
|
|
result = np.argmax(grads_np) |
|
|
|
|
|
return audio_label_map[result] |
|
|
|