import os import numpy as np import cv2 import torch import dlib import face_recognition from torchvision import transforms from tqdm import tqdm from dataset.loader import normalize_data from .config import load_config from .genconvit import GenConViT from decord import VideoReader, cpu import glob from PIL import Image device = "cuda" if torch.cuda.is_available() else "cpu" def load_genconvit(config, net, ed_weight, vae_weight, fp16): model = GenConViT( config, ed= ed_weight, vae= vae_weight, net=net, fp16=fp16 ) model.to(device) model.eval() if fp16: model.half() return model def face_rec(frames): temp_face = np.zeros((len(frames), 224, 224, 3), dtype=np.uint8) count = 0 mod = "cnn" if dlib.DLIB_USE_CUDA else "hog" for _, frame in tqdm(enumerate(frames), total=len(frames)): frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) face_locations = face_recognition.face_locations( frame, number_of_times_to_upsample=0, model=mod ) for face_location in face_locations: if count < len(frames): top, right, bottom, left = face_location face_image = frame[top:bottom, left:right] face_image = cv2.resize( face_image, (224, 224), interpolation=cv2.INTER_AREA ) face_image = cv2.cvtColor(face_image, cv2.COLOR_BGR2RGB) temp_face[count] = face_image count += 1 else: break return ([], 0) if count == 0 else (temp_face[:count], count) def preprocess_frame(frame): df_tensor = torch.tensor(frame, device=device).float() df_tensor = df_tensor.permute((0, 3, 1, 2)) for i in range(len(df_tensor)): df_tensor[i] = normalize_data()["vid"](df_tensor[i] / 255.0) return df_tensor def pred_vid(df, model, net=None): with torch.no_grad(): output = model(df, net=net).squeeze() if len(output.shape) == 1: output = output.unsqueeze(0) # Apply softmax to get probabilities probabilities = torch.softmax(output, dim=1) return max_prediction_value(probabilities) def max_prediction_value(y_pred): # Finds the index and value of the maximum prediction value. mean_val = torch.mean(y_pred, dim=0) max_val, max_idx = torch.max(mean_val, dim=0) return max_idx.item(), max_val.item() def real_or_fake(prediction): return {0: "FAKE", 1: "REAL"}[prediction] def extract_frames(video_file, num_frames=15): vr = VideoReader(video_file, ctx=cpu(0)) total_frames = len(vr) if num_frames == -1: # if -1, get all frames indices = np.arange(total_frames).astype(int) else: indices = np.linspace(0, total_frames -1, num_frames, dtype=int) return vr.get_batch(indices).asnumpy() # seek frames with step_size def df_face_from_folder(vid, num_frames): img_list = glob.glob(vid+"/*") img = [] for f in img_list: try: im = Image.open(f).convert('RGB') img.append(np.asarray(im)) except: pass face, count = face_rec(img[:num_frames]) return preprocess_frame(face) if count > 0 else [] def df_face(vid, num_frames): img = extract_frames(vid, num_frames) face, count = face_rec(img) return preprocess_frame(face) if count > 0 else [] def is_video(vid): return os.path.isfile(vid) and vid.endswith( tuple([".avi", ".mp4", ".mpg", ".mpeg", ".mov"]) ) def is_video_folder(vid_folder): img_list = glob.glob(vid_folder+"/*") return len(img_list)>=1 and img_list[0].endswith(tuple(["png", "jpeg","jpg"])) def set_result(): return { "video": { "name": [], "pred": [], "klass": [], "pred_label": [], "correct_label": [], } } def store_result( result, filename, y, y_val, klass, correct_label=None, compression=None ): result["video"]["name"].append(filename) result["video"]["pred"].append(y_val) result["video"]["klass"].append(klass.lower()) result["video"]["pred_label"].append(real_or_fake(y)) if correct_label is not None: result["video"]["correct_label"].append(correct_label) if compression is not None: result["video"]["compression"].append(compression) return result