Spaces:
Running
Running
| """ | |
| Video Face Manipulation Detection Through Ensemble of CNNs | |
| Image and Sound Processing Lab - Politecnico di Milano | |
| Nicolò Bonettini | |
| Edoardo Daniele Cannas | |
| Sara Mandelli | |
| Luca Bondi | |
| Paolo Bestagini | |
| """ | |
| import os | |
| from pathlib import Path | |
| from typing import List | |
| import albumentations as A | |
| import numpy as np | |
| import pandas as pd | |
| import torch | |
| from PIL import Image | |
| from albumentations.pytorch import ToTensorV2 | |
| from torch.utils.data import Dataset, IterableDataset | |
| from .utils import extract_bb | |
| def load_face(record: pd.Series, root: str, size: int, scale: str, transformer: A.BasicTransform) -> torch.Tensor: | |
| path = os.path.join(str(root), str(record.name)) | |
| autocache = size < 256 or scale == 'tight' | |
| if scale in ['crop', 'scale', ]: | |
| cached_path = str(Path(root).joinpath('autocache', scale, str(size), str(record.name)).with_suffix('.jpg')) | |
| else: | |
| # when self.scale == 'tight' the extracted face is not dependent on size | |
| cached_path = str(Path(root).joinpath('autocache', scale, str(record.name)).with_suffix('.jpg')) | |
| face = np.zeros((size, size, 3), dtype=np.uint8) | |
| if os.path.exists(cached_path): | |
| try: | |
| face = Image.open(cached_path) | |
| face = np.array(face) | |
| if len(face.shape) != 3: | |
| raise RuntimeError('Incorrect format: {}'.format(path)) | |
| except KeyboardInterrupt as e: | |
| # We want keybord interrupts to be propagated | |
| raise e | |
| except (OSError, IOError) as e: | |
| print('Deleting corrupted cache file: {}'.format(cached_path)) | |
| print(e) | |
| os.unlink(cached_path) | |
| face = np.zeros((size, size, 3), dtype=np.uint8) | |
| if not os.path.exists(cached_path): | |
| try: | |
| frame = Image.open(path) | |
| bb = record['left'], record['top'], record['right'], record['bottom'] | |
| face = extract_bb(frame, bb=bb, size=size, scale=scale) | |
| if autocache: | |
| os.makedirs(os.path.dirname(cached_path), exist_ok=True) | |
| face.save(cached_path, quality=95, subsampling='4:4:4') | |
| face = np.array(face) | |
| if len(face.shape) != 3: | |
| raise RuntimeError('Incorrect format: {}'.format(path)) | |
| except KeyboardInterrupt as e: | |
| # We want keybord interrupts to be propagated | |
| raise e | |
| except (OSError, IOError) as e: | |
| print('Error while reading: {}'.format(path)) | |
| print(e) | |
| face = np.zeros((size, size, 3), dtype=np.uint8) | |
| face = transformer(image=face)['image'] | |
| return face | |
| class FrameFaceIterableDataset(IterableDataset): | |
| def __init__(self, | |
| roots: List[str], | |
| dfs: List[pd.DataFrame], | |
| size: int, scale: str, | |
| num_samples: int = -1, | |
| transformer: A.BasicTransform = ToTensorV2(), | |
| output_index: bool = False, | |
| labels_map: dict = None, | |
| seed: int = None): | |
| """ | |
| :param roots: List of root folders for frames cache | |
| :param dfs: List of DataFrames of cached frames with 'bb' column as array of 4 elements (left,top,right,bottom) | |
| and 'label' column | |
| :param size: face size | |
| :param num_samples: | |
| :param scale: Rescale the face to the given size, preserving the aspect ratio. | |
| If false crop around center to the given size | |
| :param transformer: | |
| :param output_index: enable output of df_frames index | |
| :param labels_map: map from 'REAL' and 'FAKE' to actual labels | |
| """ | |
| self.dfs = dfs | |
| self.size = int(size) | |
| self.seed0 = int(seed) if seed is not None else np.random.choice(2 ** 32) | |
| # adapt indices | |
| dfs_adapted = [df.copy() for df in self.dfs] | |
| for df_idx, df in enumerate(dfs_adapted): | |
| mi = pd.MultiIndex.from_tuples([(df_idx, key) for key in df.index], names=['df_idx', 'df_key']) | |
| df.index = mi | |
| # Concat | |
| self.df = pd.concat(dfs_adapted, axis=0, join='inner') | |
| self.df_real = self.df[self.df['label'] == 0] | |
| self.df_fake = self.df[self.df['label'] == 1] | |
| self.longer_set = 'real' if len(self.df_real) > len(self.df_fake) else 'fake' | |
| self.num_samples = max(len(self.df_real), len(self.df_fake)) * 2 | |
| self.num_samples = min(self.num_samples, num_samples) if num_samples > 0 else self.num_samples | |
| self.output_idx = bool(output_index) | |
| self.scale = str(scale) | |
| self.roots = [str(r) for r in roots] | |
| self.transformer = transformer | |
| self.labels_map = labels_map | |
| if self.labels_map is None: | |
| self.labels_map = {False: np.array([0., ]), True: np.array([1., ])} | |
| else: | |
| self.labels_map = dict(self.labels_map) | |
| def _get_face(self, item: pd.Index) -> (torch.Tensor, torch.Tensor) or (torch.Tensor, torch.Tensor, str): | |
| record = self.dfs[item[0]].loc[item[1]] | |
| face = load_face(record=record, | |
| root=self.roots[item[0]], | |
| size=self.size, | |
| scale=self.scale, | |
| transformer=self.transformer) | |
| label = self.labels_map[record.label] | |
| if self.output_idx: | |
| return face, label, record.name | |
| else: | |
| return face, label | |
| def __len__(self): | |
| return self.num_samples | |
| def __iter__(self): | |
| random_fake_idxs, random_real_idxs = get_iterative_real_fake_idxs( | |
| df_real=self.df_real, | |
| df_fake=self.df_fake, | |
| num_samples=self.num_samples, | |
| seed0=self.seed0 | |
| ) | |
| while len(random_fake_idxs) >= 1 and len(random_real_idxs) >= 1: | |
| yield self._get_face(random_fake_idxs.pop()) | |
| yield self._get_face(random_real_idxs.pop()) | |
| def get_iterative_real_fake_idxs(df_real: pd.DataFrame, df_fake: pd.DataFrame, | |
| num_samples: int, seed0: int): | |
| longer_set = 'real' if len(df_real) > len(df_fake) else 'fake' | |
| worker_info = torch.utils.data.get_worker_info() | |
| if worker_info is None: | |
| seed = seed0 | |
| np.random.seed(seed) | |
| worker_num_couple_samples = num_samples // 2 | |
| fake_idxs_portion = np.random.choice(df_fake.index, worker_num_couple_samples, | |
| replace=longer_set == 'real') | |
| real_idxs_portion = np.random.choice(df_real.index, worker_num_couple_samples, | |
| replace=longer_set == 'fake') | |
| else: | |
| worker_id = worker_info.id | |
| seed = seed0 + worker_id | |
| np.random.seed(seed) | |
| worker_num_couple_samples = (num_samples // 2) // worker_info.num_workers | |
| if longer_set == 'fake': | |
| fake_idxs_portion = df_fake.index[ | |
| worker_id * worker_num_couple_samples:(worker_id + 1) * worker_num_couple_samples] | |
| real_idxs_portion = np.random.choice(df_real.index, worker_num_couple_samples, replace=True) | |
| else: | |
| real_idxs_portion = df_real.index[ | |
| worker_id * worker_num_couple_samples:(worker_id + 1) * worker_num_couple_samples] | |
| fake_idxs_portion = np.random.choice(df_fake.index, worker_num_couple_samples, | |
| replace=True) | |
| random_fake_idxs = list(np.random.permutation(fake_idxs_portion)) | |
| random_real_idxs = list(np.random.permutation(real_idxs_portion)) | |
| assert (len(random_fake_idxs) == len(random_real_idxs)) | |
| return random_fake_idxs, random_real_idxs | |
| class FrameFaceDatasetTest(Dataset): | |
| def __init__(self, root: str, df: pd.DataFrame, | |
| size: int, scale: str, | |
| transformer: A.BasicTransform = ToTensorV2(), | |
| labels_map: dict = None, | |
| aug_transformers: List[A.BasicTransform] = None): | |
| """ | |
| :param root: root folder for frames cache | |
| :param df: DataFrame of cached frames with 'bb' column as array of 4 elements (left,top,right,bottom) | |
| and 'label' column | |
| :param size: face size | |
| :param num_samples: | |
| :param scale: Rescale the face to the given size, preserving the aspect ratio. | |
| If false crop around center to the given size | |
| :param transformer: | |
| :param labels_map: dcit to map df labels | |
| :param aug_transformers: if not None, creates multiple copies of the same sample according to the provided augmentations | |
| """ | |
| self.df = df | |
| self.size = int(size) | |
| self.scale = str(scale) | |
| self.root = str(root) | |
| self.transformer = transformer | |
| self.aug_transformers = aug_transformers | |
| self.labels_map = labels_map | |
| if self.labels_map is None: | |
| self.labels_map = {False: np.array([0., ]), True: np.array([1., ])} | |
| else: | |
| self.labels_map = dict(self.labels_map) | |
| def _get_face(self, item: pd.Index) -> (torch.Tensor, torch.Tensor) or (torch.Tensor, torch.Tensor, str): | |
| record = self.df.loc[item] | |
| label = self.labels_map[record.label] | |
| if self.aug_transformers is None: | |
| face = load_face(record=record, | |
| root=self.root, | |
| size=self.size, | |
| scale=self.scale, | |
| transformer=self.transformer) | |
| return face, label | |
| else: | |
| faces = [] | |
| for aug_transf in self.aug_transformers: | |
| faces.append( | |
| load_face(record=record, | |
| root=self.root, | |
| size=self.size, | |
| scale=self.scale, | |
| transformer=A.Compose([aug_transf, self.transformer]) | |
| )) | |
| faces = torch.stack(faces) | |
| return faces, label | |
| def __len__(self): | |
| return len(self.df) | |
| def __getitem__(self, item): | |
| return self._get_face(self.df.index[item]) | |