Spaces:
Running
Running
| """ | |
| Video Face Manipulation Detection Through Ensemble of CNNs | |
| Image and Sound Processing Lab - Politecnico di Milano | |
| Nicolò Bonettini | |
| Edoardo Daniele Cannas | |
| Sara Mandelli | |
| Luca Bondi | |
| Paolo Bestagini | |
| """ | |
| import argparse | |
| import gc | |
| from collections import OrderedDict | |
| from pathlib import Path | |
| import albumentations as A | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| import pandas as pd | |
| import torch | |
| import torch.nn as nn | |
| from torch.utils.data import DataLoader | |
| from tqdm import tqdm | |
| from architectures import fornet | |
| from architectures.fornet import FeatureExtractor | |
| from isplutils import utils, split | |
| from isplutils.data import FrameFaceDatasetTest | |
| def main(): | |
| # Args | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument('--testsets', type=str, help='Testing datasets', nargs='+', choices=split.available_datasets, | |
| required=True) | |
| parser.add_argument('--testsplits', type=str, help='Test split', nargs='+', default=['val', 'test'], | |
| choices=['train', 'val', 'test']) | |
| parser.add_argument('--dfdc_faces_df_path', type=str, action='store', | |
| help='Path to the Pandas Dataframe obtained from extract_faces.py on the DFDC dataset. ' | |
| 'Required for training/validating on the DFDC dataset.') | |
| parser.add_argument('--dfdc_faces_dir', type=str, action='store', | |
| help='Path to the directory containing the faces extracted from the DFDC dataset. ' | |
| 'Required for training/validating on the DFDC dataset.') | |
| parser.add_argument('--ffpp_faces_df_path', type=str, action='store', | |
| help='Path to the Pandas Dataframe obtained from extract_faces.py on the FF++ dataset. ' | |
| 'Required for training/validating on the FF++ dataset.') | |
| parser.add_argument('--ffpp_faces_dir', type=str, action='store', | |
| help='Path to the directory containing the faces extracted from the FF++ dataset. ' | |
| 'Required for training/validating on the FF++ dataset.') | |
| # Specify trained model path | |
| parser.add_argument('--model_path', type=Path, help='Full path of the trained model', required=True) | |
| # Common params | |
| parser.add_argument('--batch', type=int, help='Batch size to fit in GPU memory', default=128) | |
| parser.add_argument('--workers', type=int, help='Num workers for data loaders', default=6) | |
| parser.add_argument('--device', type=int, help='GPU id', default=0) | |
| parser.add_argument('--debug', action='store_true', help='Debug flag', ) | |
| parser.add_argument('--num_video', type=int, help='Number of real-fake videos to test') | |
| parser.add_argument('--results_dir', type=Path, help='Output folder', | |
| default='results/') | |
| parser.add_argument('--override', action='store_true', help='Override existing results', ) | |
| args = parser.parse_args() | |
| device = torch.device('cuda:{}'.format(args.device)) if torch.cuda.is_available() else torch.device('cpu') | |
| num_workers: int = args.workers | |
| batch_size: int = args.batch | |
| max_num_videos_per_label: int = args.num_video # number of real-fake videos to test | |
| model_path: Path = args.model_path | |
| results_dir: Path = args.results_dir | |
| debug: bool = args.debug | |
| override: bool = args.override | |
| test_sets = args.testsets | |
| test_splits = args.testsplits | |
| dfdc_df_path = args.dfdc_faces_df_path | |
| ffpp_df_path = args.ffpp_faces_df_path | |
| dfdc_faces_dir = args.dfdc_faces_dir | |
| ffpp_faces_dir = args.ffpp_faces_dir | |
| # get arguments from the model path | |
| face_policy = str(model_path).split('face-')[1].split('_')[0] | |
| patch_size = int(str(model_path).split('size-')[1].split('_')[0]) | |
| net_name = str(model_path).split('net-')[1].split('_')[0] | |
| model_name = '_'.join(model_path.with_suffix('').parts[-2:]) | |
| # Load net | |
| net_class = getattr(fornet, net_name) | |
| # load model | |
| print('Loading model...') | |
| state_tmp = torch.load(model_path, map_location='cpu') | |
| if 'net' not in state_tmp.keys(): | |
| state = OrderedDict({'net': OrderedDict()}) | |
| [state['net'].update({'model.{}'.format(k): v}) for k, v in state_tmp.items()] | |
| else: | |
| state = state_tmp | |
| net: FeatureExtractor = net_class().eval().to(device) | |
| incomp_keys = net.load_state_dict(state['net'], strict=True) | |
| print(incomp_keys) | |
| print('Model loaded!') | |
| # val loss per-frame | |
| criterion = nn.BCEWithLogitsLoss(reduction='none') | |
| # Define data transformers | |
| test_transformer = utils.get_transformer(face_policy, patch_size, net.get_normalizer(), train=False) | |
| # datasets and dataloaders (from train_binclass.py) | |
| print('Loading data...') | |
| # Check if paths for DFDC and FF++ extracted faces and DataFrames are provided | |
| for dataset in test_sets: | |
| if dataset.split('-')[0] == 'dfdc' and (dfdc_df_path is None or dfdc_faces_dir is None): | |
| raise RuntimeError('Specify DataFrame and directory for DFDC faces for testing!') | |
| elif dataset.split('-')[0] == 'ff' and (ffpp_df_path is None or ffpp_faces_dir is None): | |
| raise RuntimeError('Specify DataFrame and directory for FF++ faces for testing!') | |
| splits = split.make_splits(dfdc_df=dfdc_df_path, ffpp_df=ffpp_df_path, dfdc_dir=dfdc_faces_dir, | |
| ffpp_dir=ffpp_faces_dir, dbs={'train': test_sets, 'val': test_sets, 'test': test_sets}) | |
| train_dfs = [splits['train'][db][0] for db in splits['train']] | |
| train_roots = [splits['train'][db][1] for db in splits['train']] | |
| val_roots = [splits['val'][db][1] for db in splits['val']] | |
| val_dfs = [splits['val'][db][0] for db in splits['val']] | |
| test_dfs = [splits['test'][db][0] for db in splits['test']] | |
| test_roots = [splits['test'][db][1] for db in splits['test']] | |
| # Output paths | |
| out_folder = results_dir.joinpath(model_name) | |
| out_folder.mkdir(mode=0o775, parents=True, exist_ok=True) | |
| # Samples selection | |
| if max_num_videos_per_label and max_num_videos_per_label > 0: | |
| dfs_out_train = [select_videos(df, max_num_videos_per_label) for df in train_dfs] | |
| dfs_out_val = [select_videos(df, max_num_videos_per_label) for df in val_dfs] | |
| dfs_out_test = [select_videos(df, max_num_videos_per_label) for df in test_dfs] | |
| else: | |
| dfs_out_train = train_dfs | |
| dfs_out_val = val_dfs | |
| dfs_out_test = test_dfs | |
| # Extractions list | |
| extr_list = [] | |
| # Append train and validation set first | |
| if 'train' in test_splits: | |
| for idx, dataset in enumerate(test_sets): | |
| extr_list.append( | |
| (dfs_out_train[idx], out_folder.joinpath(dataset + '_train.pkl'), train_roots[idx], dataset + ' TRAIN') | |
| ) | |
| if 'val' in test_splits: | |
| for idx, dataset in enumerate(test_sets): | |
| extr_list.append( | |
| (dfs_out_val[idx], out_folder.joinpath(dataset + '_val.pkl'), val_roots[idx], dataset + ' VAL') | |
| ) | |
| if 'test' in test_splits: | |
| for idx, dataset in enumerate(test_sets): | |
| extr_list.append( | |
| (dfs_out_test[idx], out_folder.joinpath(dataset + '_test.pkl'), test_roots[idx], dataset + ' TEST') | |
| ) | |
| for df, df_path, df_root, tag in extr_list: | |
| if override or not df_path.exists(): | |
| print('\n##### PREDICT VIDEOS FROM {} #####'.format(tag)) | |
| print('Real frames: {}'.format(sum(df['label'] == False))) | |
| print('Fake frames: {}'.format(sum(df['label'] == True))) | |
| print('Real videos: {}'.format(df[df['label'] == False]['video'].nunique())) | |
| print('Fake videos: {}'.format(df[df['label'] == True]['video'].nunique())) | |
| dataset_out = process_dataset(root=df_root, df=df, net=net, criterion=criterion, | |
| patch_size=patch_size, | |
| face_policy=face_policy, transformer=test_transformer, | |
| batch_size=batch_size, | |
| num_workers=num_workers, device=device, ) | |
| df['score'] = dataset_out['score'].astype(np.float32) | |
| df['loss'] = dataset_out['loss'].astype(np.float32) | |
| print('Saving results to: {}'.format(df_path)) | |
| df.to_pickle(str(df_path)) | |
| if debug: | |
| plt.figure() | |
| plt.title(tag) | |
| plt.hist(df[df.label == True].score, bins=100, alpha=0.6, label='FAKE frames') | |
| plt.hist(df[df.label == False].score, bins=100, alpha=0.6, label='REAL frames') | |
| plt.legend() | |
| del (dataset_out) | |
| del (df) | |
| gc.collect() | |
| if debug: | |
| plt.show() | |
| print('Completed!') | |
| def process_dataset(df: pd.DataFrame, | |
| root: str, | |
| net: FeatureExtractor, | |
| criterion, | |
| patch_size: int, | |
| face_policy: str, | |
| transformer: A.BasicTransform, | |
| batch_size: int, | |
| num_workers: int, | |
| device: torch.device, | |
| ) -> dict: | |
| if isinstance(device, (int, str)): | |
| device = torch.device(device) | |
| dataset = FrameFaceDatasetTest( | |
| root=root, | |
| df=df, | |
| size=patch_size, | |
| scale=face_policy, | |
| transformer=transformer, | |
| ) | |
| # Preallocate | |
| score = np.zeros(len(df)) | |
| loss = np.zeros(len(df)) | |
| loader = DataLoader(dataset, batch_size=batch_size, num_workers=num_workers, shuffle=False, drop_last=False) | |
| with torch.no_grad(): | |
| idx0 = 0 | |
| for batch_data in tqdm(loader): | |
| batch_images = batch_data[0].to(device) | |
| batch_labels = batch_data[1].to(device) | |
| batch_samples = len(batch_images) | |
| batch_out = net(batch_images) | |
| batch_loss = criterion(batch_out, batch_labels) | |
| score[idx0:idx0 + batch_samples] = batch_out.cpu().numpy()[:, 0] | |
| loss[idx0:idx0 + batch_samples] = batch_loss.cpu().numpy()[:, 0] | |
| idx0 += batch_samples | |
| out_dict = {'score': score, 'loss': loss} | |
| return out_dict | |
| def select_videos(df: pd.DataFrame, max_videos_per_label: int) -> pd.DataFrame: | |
| """ | |
| Select up to a maximum number of videos | |
| :param df: DataFrame of frames. Required columns: 'video','label' | |
| :param max_videos_per_label: maximum number of real and fake videos | |
| :return: DataFrame of selected frames | |
| """ | |
| # Save random state | |
| st0 = np.random.get_state() | |
| # Set seed for this selection only | |
| np.random.seed(42) | |
| df_fake = df[df.label == True] | |
| fake_videos = df_fake['video'].unique() | |
| selected_fake_videos = np.random.choice(fake_videos, min(max_videos_per_label, len(fake_videos)), replace=False) | |
| df_selected_fake_frames = df_fake[df_fake['video'].isin(selected_fake_videos)] | |
| df_real = df[df.label == False] | |
| real_videos = df_real['video'].unique() | |
| selected_real_videos = np.random.choice(real_videos, min(max_videos_per_label, len(real_videos)), replace=False) | |
| df_selected_real_frames = df_real[df_real['video'].isin(selected_real_videos)] | |
| # Restore random state | |
| np.random.set_state(st0) | |
| return pd.concat((df_selected_fake_frames, df_selected_real_frames), axis=0, verify_integrity=True).copy() | |
| if __name__ == '__main__': | |
| main() | |