Spaces:
Running
Running
| from typing import List, Dict, Tuple | |
| """ | |
| Video Face Manipulation Detection Through Ensemble of CNNs | |
| Image and Sound Processing Lab - Politecnico di Milano | |
| Nicolò Bonettini | |
| Edoardo Daniele Cannas | |
| Sara Mandelli | |
| Luca Bondi | |
| Paolo Bestagini | |
| """ | |
| import numpy as np | |
| import pandas as pd | |
| available_datasets = [ | |
| 'dfdc-35-5-10', | |
| 'ff-c23-720-140-140', | |
| 'ff-c23-720-140-140-5fpv', | |
| 'ff-c23-720-140-140-10fpv', | |
| 'ff-c23-720-140-140-15fpv', | |
| 'ff-c23-720-140-140-20fpv', | |
| 'ff-c23-720-140-140-25fpv', | |
| 'celebdf', # just for convenience, not used in the original paper | |
| ] | |
| def load_df(dfdc_df_path: str, ffpp_df_path: str, dfdc_faces_dir: str, ffpp_faces_dir: str, dataset: str) -> (pd.DataFrame, str): | |
| if dataset.startswith('dfdc'): | |
| df = pd.read_pickle(dfdc_df_path) | |
| root = dfdc_faces_dir | |
| elif dataset.startswith('ff-'): | |
| df = pd.read_pickle(ffpp_df_path) | |
| root = ffpp_faces_dir | |
| else: | |
| raise NotImplementedError('Unknown dataset: {}'.format(dataset)) | |
| return df, root | |
| def get_split_df(df: pd.DataFrame, dataset: str, split: str) -> pd.DataFrame: | |
| if dataset == 'dfdc-35-5-10': | |
| if split == 'train': | |
| split_df = df[df['folder'].isin(range(35))] | |
| elif split == 'val': | |
| split_df = df[df['folder'].isin(range(35, 40))] | |
| elif split == 'test': | |
| split_df = df[df['folder'].isin(range(40, 50))] | |
| else: | |
| raise NotImplementedError('Unknown split: {}'.format(split)) | |
| elif dataset.startswith('ff-c23-720-140-140'): | |
| # Save random state | |
| st0 = np.random.get_state() | |
| # Set seed for this selection only | |
| np.random.seed(41) | |
| # Split on original videos | |
| crf = dataset.split('-')[1] | |
| random_youtube_videos = np.random.permutation( | |
| df[(df['source'] == 'youtube') & (df['quality'] == crf)]['video'].unique()) | |
| train_orig = random_youtube_videos[:720] | |
| val_orig = random_youtube_videos[720:720 + 140] | |
| test_orig = random_youtube_videos[720 + 140:] | |
| if split == 'train': | |
| split_df = pd.concat((df[df['original'].isin(train_orig)], df[df['video'].isin(train_orig)]), axis=0) | |
| elif split == 'val': | |
| split_df = pd.concat((df[df['original'].isin(val_orig)], df[df['video'].isin(val_orig)]), axis=0) | |
| elif split == 'test': | |
| split_df = pd.concat((df[df['original'].isin(test_orig)], df[df['video'].isin(test_orig)]), axis=0) | |
| else: | |
| raise NotImplementedError('Unknown split: {}'.format(split)) | |
| if dataset.endswith('fpv'): | |
| fpv = int(dataset.rsplit('-', 1)[1][:-3]) | |
| idxs = [] | |
| for video in split_df['video'].unique(): | |
| idxs.append(np.random.choice(split_df[split_df['video'] == video].index, fpv, replace=False)) | |
| idxs = np.concatenate(idxs) | |
| split_df = split_df.loc[idxs] | |
| # Restore random state | |
| np.random.set_state(st0) | |
| elif dataset == 'celebdf': | |
| seed = 41 | |
| num_real_train = 600 | |
| # Save random state | |
| st0 = np.random.get_state() | |
| # Set seed for this selection only | |
| np.random.seed(seed) | |
| # Split on original videos | |
| random_train_val_real_videos = np.random.permutation( | |
| df[(df['label'] == False) & (df['test'] == False)]['video'].unique()) | |
| train_orig = random_train_val_real_videos[:num_real_train] | |
| val_orig = random_train_val_real_videos[num_real_train:] | |
| if split == 'train': | |
| split_df = pd.concat((df[df['original'].isin(train_orig)], df[df['video'].isin(train_orig)]), axis=0) | |
| elif split == 'val': | |
| split_df = pd.concat((df[df['original'].isin(val_orig)], df[df['video'].isin(val_orig)]), axis=0) | |
| elif split == 'test': | |
| split_df = df[df['test'] == True] | |
| else: | |
| raise NotImplementedError('Unknown split: {}'.format(split)) | |
| # Restore random state | |
| np.random.set_state(st0) | |
| else: | |
| raise NotImplementedError('Unknown dataset: {}'.format(dataset)) | |
| return split_df | |
| def make_splits(dfdc_df: str, ffpp_df: str, dfdc_dir: str, ffpp_dir: str, dbs: Dict[str, List[str]]) -> Dict[str, Dict[str, Tuple[pd.DataFrame, str]]]: | |
| """ | |
| Make split and return Dataframe and root | |
| :param | |
| dfdc_df: str, path to the DataFrame containing info on the faces extracted from the DFDC dataset with extract_faces.py | |
| ffpp_df: str, path to the DataFrame containing info on the faces extracted from the FF++ dataset with extract_faces.py | |
| dfdc_dir: str, path to the directory containing the faces extracted from the DFDC dataset with extract_faces.py | |
| ffpp_dir: str, path to the directory containing the faces extracted from the FF++ dataset with extract_faces.py | |
| dbs: {split_name:[split_dataset1,split_dataset2,...]} | |
| Example: | |
| {'train':['dfdc-35-5-15',],'val':['dfdc-35-5-15',]} | |
| :return: split_dict: dictonary containing {split_name: ['train', 'val'], splitdb: List(pandas.DataFrame, str)} | |
| Example: | |
| {'train, 'dfdc-35-5-15': (dfdc_train_df, 'path/to/dir/of/DFDC/faces')} | |
| """ | |
| split_dict = {} | |
| full_dfs = {} | |
| for split_name, split_dbs in dbs.items(): | |
| split_dict[split_name] = dict() | |
| for split_db in split_dbs: | |
| if split_db not in full_dfs: | |
| full_dfs[split_db] = load_df(dfdc_df, ffpp_df, dfdc_dir, ffpp_dir, split_db) | |
| full_df, root = full_dfs[split_db] | |
| split_df = get_split_df(df=full_df, dataset=split_db, split=split_name) | |
| split_dict[split_name][split_db] = (split_df, root) | |
| return split_dict | |