Spaces:
Running
on
Zero
Running
on
Zero
| # Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved. | |
| # | |
| # This work is licensed under the Creative Commons | |
| # Attribution-NonCommercial-ShareAlike 4.0 International License. | |
| # To view a copy of this license, visit | |
| # http://creativecommons.org/licenses/by-nc-sa/4.0/ or send a letter to | |
| # Creative Commons, PO Box 1866, Mountain View, CA 94042, USA. | |
| # | |
| # Includes modifications proposed by Jeremy Fix | |
| # from here: https://github.com/NVlabs/ffhq-dataset/pull/3 | |
| import os | |
| import sys | |
| import json | |
| import argparse | |
| import numpy as np | |
| import multiprocessing | |
| from tqdm import tqdm | |
| from scipy.ndimage import gaussian_filter1d | |
| # Image processing libraries | |
| import PIL | |
| from PIL import Image, ImageFile | |
| # Project-specific imports | |
| from lib.preprocess import align_img | |
| PIL.ImageFile.LOAD_TRUNCATED_IMAGES = True # avoid "Decompressed Data Too Large" error | |
| def save_detection_as_txt(dst, lm5p): | |
| outLand = open(dst, "w") | |
| for i in range(lm5p.shape[0]): | |
| outLand.write(str(float(lm5p[i][0])) + " " + str(float(lm5p[i][1])) + "\n") | |
| outLand.close() | |
| def process_image(kwargs): | |
| """ | |
| Processes an image by aligning and cropping it based on facial landmarks. | |
| Args: | |
| kwargs (dict): Dictionary containing the following keys: | |
| - src_dir (str): Directory containing the source image. | |
| - dst_dir (str): Directory to save the processed image. | |
| - lm5p (np.ndarray): Array of shape (N, 2) representing facial landmarks. | |
| - im_name (str): Name of the image file. | |
| - save_realign_dir (str or None): Directory to save realigned images. | |
| - save_detection_dir (str or None): Directory to save detection results. | |
| Returns: | |
| None | |
| """ | |
| # Extract parameters from kwargs | |
| src_dir = kwargs['src_dir'] | |
| dst_dir = kwargs['dst_dir'] | |
| lm5p = kwargs['lm5p'] | |
| lm3d = kwargs['lm3d'] | |
| im_name = kwargs['im_name'] | |
| save_realign_dir = kwargs.get('save_realign_dir', None) | |
| save_detection_dir = kwargs.get('save_detection_dir', None) | |
| save_align3d_dir = kwargs['save_align3d_dir'] | |
| # Ensure the destination directory exists | |
| os.makedirs(dst_dir, exist_ok=True) | |
| # Construct file paths | |
| src_file = os.path.join(src_dir, im_name) | |
| # Ensure the source file exists before proceeding | |
| assert os.path.isfile(src_file), f"Source file not found: {src_file}" | |
| # Open the image | |
| img = Image.open(src_file) | |
| _, H = img.size # Get image dimensions | |
| # Prepare alignment parameters | |
| params = {'name': src_file, 'lm': lm5p.tolist()} | |
| aligned_lm5p = lm5p.copy() | |
| aligned_lm3d = lm3d.copy() | |
| # Flip Y-coordinates to match the image coordinate system | |
| aligned_lm5p[:, -1] = H - 1 - aligned_lm5p[:, -1] | |
| aligned_lm3d[:, 1] = H - 1 - aligned_lm3d[:, 1] | |
| # Convert image name to PNG format | |
| im_name = im_name.rsplit('.', 1)[0] + '.png' | |
| dst_file = os.path.join(dst_dir, im_name) | |
| # Optionally save the realigned image | |
| if save_realign_dir: | |
| img.save(os.path.join(save_realign_dir, im_name)) | |
| # Optionally save detected landmarks as a text file | |
| if save_detection_dir: | |
| save_detection_as_txt( | |
| os.path.join(save_detection_dir, im_name.replace('.png', '.txt')), aligned_lm5p | |
| ) | |
| # Crop the image based on aligned landmarks | |
| img_cropped, crop_param, aligned_lm3d_save = crop_image(img, aligned_lm5p.copy(), aligned_lm3d.copy(), output_size=kwargs['output_size']) | |
| params['crop'] = crop_param | |
| aligned_lm3d_save = np.concatenate([aligned_lm3d_save[:, 0:1], 512 - aligned_lm3d_save[:, 1:2]], 1) | |
| np.save(os.path.join(save_align3d_dir, | |
| im_name.replace(".png", ".npy").replace(".jpg", ".npy").replace(".jpeg", ".npy")), | |
| aligned_lm3d_save) | |
| # Save the cropped image | |
| img_cropped.save(dst_file) | |
| def crop_image(im, lm, ldmk_3d, center_crop_size=700, rescale_factor=300, | |
| target_size=1024., output_size=512): | |
| """ | |
| Crops and resizes an image based on facial landmarks. | |
| Args: | |
| im (PIL.Image.Image): Input image. | |
| lm (np.ndarray): Facial landmarks array of shape (N, 2). | |
| center_crop_size (int, optional): Size of the centered crop. Defaults to 700. | |
| rescale_factor (int, optional): Scaling factor for alignment. Defaults to 300. | |
| target_size (float, optional): Target size for transformation. Defaults to 1024. | |
| output_size (int, optional): Final resized output size. Defaults to 512. | |
| Returns: | |
| tuple: | |
| - im_cropped (PIL.Image.Image): The cropped and resized image. | |
| - crop_param (list): List of cropping parameters. | |
| """ | |
| # Get image height | |
| _, H = im.size | |
| # Define a standardized 3D landmark set for alignment | |
| lm3D_std = np.array([ | |
| [-0.31148657, 0.09036078, 0.13377953], # Left eye corner | |
| [ 0.30979887, 0.08972035, 0.13179526], # Right eye corner | |
| [ 0.0032535, -0.24617933, 0.55244243], # Nose tip | |
| [-0.25216928, -0.5813392, 0.22405732], # Left mouth corner | |
| [ 0.2484662, -0.5812824, 0.22235769], # Right mouth corner | |
| ]) | |
| # Adjust standard landmarks for better alignment | |
| lm3D_std[:, 2] += 0.4 # Adjust depth (Z-axis) | |
| lm3D_std[:, 1] += 0.1 # Adjust vertical position (Y-axis) | |
| # Align the image based on landmarks | |
| _, im_high, _, _, crop_left, crop_up, s, ldmk_3d_align = align_img( | |
| im, lm, lm3D_std, ldmk_3d, target_size=target_size, rescale_factor=rescale_factor, rescale_factor_3D=218 | |
| ) | |
| # Compute center crop coordinates | |
| left = int(im_high.size[0] / 2 - center_crop_size / 2) | |
| upper = int(im_high.size[1] / 2 - center_crop_size / 2) | |
| right = left + center_crop_size | |
| lower = upper + center_crop_size | |
| # Crop the image | |
| im_cropped = im_high.crop((left, upper, right, lower)) | |
| # Resize the cropped image to the output size | |
| im_cropped = im_cropped.resize((output_size, output_size), resample=Image.LANCZOS) | |
| # Define cropping parameters for reference | |
| crop_param = [ | |
| int(left), int(upper), int(center_crop_size), | |
| int(crop_left), int(crop_up), float(H * s), int(target_size) | |
| ] | |
| return im_cropped, crop_param, ldmk_3d_align | |
| def process_video(kwargs): | |
| """ | |
| Processes a video by aligning images based on facial landmarks. | |
| Args: | |
| kwargs (dict): Dictionary containing the following keys: | |
| - src_dir (str): Directory containing video frames. | |
| - dst_dir (str): Directory to save processed images. | |
| - lm5p (dict): Dictionary of image filenames and their corresponding 5-point landmarks. | |
| - im_names (list): List of image filenames. | |
| - output_size (int): Final output image resolution. | |
| - transform_size (int): Size used for transformations before cropping. | |
| - enable_padding (bool): Whether to apply padding. | |
| - enable_warping (bool): Whether to apply warping transformation. | |
| - save_realign_dir (str or None): Directory to save realigned images. | |
| - save_detection_dir (str or None): Directory to save detection results. | |
| - apply_GF (int): Gaussian filtering level for smoothing keypoints. | |
| Returns: | |
| None | |
| """ | |
| # Extract parameters from kwargs | |
| video_dir = kwargs['src_dir'] | |
| dst_dir = kwargs['dst_dir'] | |
| lm5p_dict = kwargs['lm5p'] | |
| lm3d_dict = kwargs['lm3d'] | |
| output_size = kwargs['output_size'] | |
| enable_padding = kwargs['enable_padding'] | |
| enable_warping = kwargs['enable_warping'] | |
| save_realign_dir = kwargs['save_realign_dir'] | |
| save_detection_dir = kwargs['save_detection_dir'] | |
| save_align3d_dir = kwargs['save_align3d_dir'] | |
| apply_GF = kwargs['apply_GF'] | |
| # Use landmark dictionary keys as image names | |
| im_names = list(lm5p_dict.keys()) | |
| # Apply Gaussian filtering for smoother keypoint transitions (if enabled) | |
| if apply_GF > 0: | |
| im_names.sort(key=lambda x: int(x.split('.')[0])) # Sort images by frame index | |
| kps_sequence = np.asarray([lm5p_dict[key] for key in im_names], dtype=np.float32) | |
| kps_sequence = gaussian_filter1d(kps_sequence, sigma=apply_GF, axis=0) # Apply Gaussian smoothing | |
| else: | |
| kps_sequence = np.asarray([lm5p_dict[key] for key in im_names], dtype=np.float32) | |
| # Ensure number of images matches the number of keypoints | |
| assert len(im_names) == kps_sequence.shape[0], "Mismatch between image count and keypoint data." | |
| # Create directories for saving realigned images and detections (if specified) | |
| if save_realign_dir: | |
| os.makedirs(save_realign_dir, exist_ok=True) | |
| if save_detection_dir: | |
| os.makedirs(save_detection_dir, exist_ok=True) | |
| kps_sequence_3d = np.asarray([lm3d_dict[key] for key in im_names], dtype=np.float32) | |
| # Process each image in the video sequence | |
| for idx, im_name in enumerate(im_names): | |
| lm5p = kps_sequence[idx].reshape([-1, 2]) # Reshape keypoints to (N, 2) format | |
| lm3d = kps_sequence_3d[idx].reshape([-1, 3]) | |
| # Prepare input dictionary for image processing | |
| input_data = { | |
| 'src_dir': video_dir, | |
| 'dst_dir': dst_dir, | |
| 'im_name': im_name, | |
| 'lm5p': lm5p, | |
| 'lm3d': lm3d, | |
| 'save_realign_dir': save_realign_dir, | |
| 'save_detection_dir': save_detection_dir, | |
| 'save_align3d_dir':save_align3d_dir, | |
| 'output_size': output_size, | |
| 'enable_padding': enable_padding, | |
| 'enable_warping': enable_warping | |
| } | |
| # Process the image using the defined function | |
| process_image(input_data) | |
| # Create a 'finish' file to mark completion of processing | |
| with open(os.path.join(dst_dir, 'finish'), "w") as f: | |
| pass # Creates an empty file | |
| def recreate_aligned_images( | |
| root_dir, lms_root_dir, dst_dir, valid_imgs_json, | |
| output_size=512, enable_padding=True, already_align=False | |
| ): | |
| """ | |
| Recreates aligned images by applying facial landmark-based transformations. | |
| Args: | |
| root_dir (str): Directory containing original images. | |
| lms_root_dir (str): Directory containing facial landmark JSON files. | |
| dst_dir (str): Directory to save aligned images. | |
| save_realign_dir (str): Directory to save realigned images. | |
| valid_imgs_json (str): JSON file containing valid video names and image lists. | |
| output_size (int, optional): Final output image resolution. Defaults to 512. | |
| enable_padding (bool, optional): Whether to apply padding. Defaults to True. | |
| Returns: | |
| None | |
| """ | |
| print("Recreating aligned images...") | |
| # Load valid video names and corresponding image lists from JSON file | |
| with open(valid_imgs_json, 'r') as f: | |
| valid_idx = json.load(f) | |
| inputs = [] # List to store image processing parameters | |
| # Iterate over each valid video | |
| for video_name, img_names in valid_idx: | |
| video_dir = os.path.join(root_dir, video_name) # Path to video images | |
| dst_save_dir = os.path.join(dst_dir, video_name) # Destination folder for aligned images | |
| base_dir = os.path.dirname(os.path.dirname(dst_dir)) | |
| save_realign_dir = os.path.join(base_dir, 'realign', video_name) | |
| save_detection_dir = os.path.join(base_dir, 'realign_detections', video_name) | |
| save_align3d_dir = os.path.join(base_dir, 'align_3d_landmark', video_name) | |
| os.makedirs(save_align3d_dir, exist_ok=True) | |
| if save_realign_dir: | |
| os.makedirs(save_realign_dir, exist_ok=True) | |
| os.makedirs( save_detection_dir, exist_ok=True) | |
| # Skip processing if video directory does not exist | |
| # if not os.path.isdir(video_dir): | |
| # continue | |
| # Load facial landmark data for this video | |
| lm5p_path = os.path.join(lms_root_dir, f"{video_name}.json") | |
| lm3d_path = os.path.join(lms_root_dir, f"{video_name}3d.json") | |
| with open(lm5p_path, 'r') as f: | |
| lm5p_dict = json.load(f) | |
| with open(lm3d_path, 'r') as f: | |
| lm3d_dict = json.load(f) | |
| # Iterate over images in the video | |
| for im_name in img_names: | |
| if im_name not in lm5p_dict: | |
| continue # Skip if landmarks for this image are missing | |
| if im_name not in lm3d_dict: | |
| continue | |
| # Convert and reshape landmark points | |
| lm5p = np.asarray(lm5p_dict[im_name], dtype=np.float32).reshape([-1, 2]) | |
| lm3d = np.asarray(lm3d_dict[im_name], dtype=np.float32).reshape([-1, 3]) | |
| # Prepare input dictionary for processing | |
| input_data = { | |
| 'src_dir': video_dir, | |
| 'dst_dir': dst_save_dir, | |
| 'im_name': im_name, | |
| 'lm5p': lm5p, | |
| 'lm3d': lm3d, | |
| 'save_realign_dir': save_realign_dir, | |
| 'save_detection_dir': save_detection_dir, | |
| 'save_align3d_dir':save_align3d_dir, | |
| 'output_size': output_size, | |
| 'enable_padding': enable_padding | |
| } | |
| inputs.append(input_data) | |
| # break # Stops after processing the first video (Is this intentional?) | |
| # Parallel Processing using multiprocessing (commented out for now) | |
| # with multiprocessing.Pool(n_threads) as pool: | |
| # results = list(tqdm(pool.imap(process_image, inputs), total=len(inputs), smoothing=0.1)) | |
| # Sequential processing (useful for debugging) | |
| if already_align: | |
| for input_data in tqdm(inputs, desc="Processing images"): | |
| src_dir = input_data['src_dir'] | |
| dst_dir = input_data['dst_dir'] | |
| im_name = input_data['im_name'] | |
| lm5p = input_data['lm5p'] | |
| save_realign_dir = input_data.get('save_realign_dir', None) | |
| save_detection_dir = input_data.get('save_detection_dir', None) | |
| save_align3d_dir = input_data['save_align3d_dir'] | |
| # Ensure the destination directory exists | |
| os.makedirs(dst_dir, exist_ok=True) | |
| # Construct file paths | |
| src_file = os.path.join(src_dir, im_name) | |
| # Ensure the source file exists before proceeding | |
| assert os.path.isfile(src_file), f"Source file not found: {src_file}" | |
| # Open the image | |
| img = Image.open(src_file) | |
| _, H = img.size # Get image dimensions | |
| im_name = im_name.rsplit('.', 1)[0] + '.png' | |
| dst_file = os.path.join(dst_dir, im_name) | |
| # Optionally save the realigned image | |
| if save_realign_dir: | |
| os.makedirs(save_realign_dir, exist_ok=True) | |
| img.save(os.path.join(save_realign_dir, im_name)) | |
| aligned_lm5p = lm5p.copy() | |
| # Flip Y-coordinates to match the image coordinate system | |
| aligned_lm5p[:, -1] = H - 1 - aligned_lm5p[:, -1] | |
| # Optionally save detected landmarks as a text file | |
| if save_detection_dir: | |
| os.makedirs(save_detection_dir, exist_ok=True) | |
| save_detection_as_txt( | |
| os.path.join(save_detection_dir, im_name.replace('.png', '.txt')), aligned_lm5p | |
| ) | |
| # Save the cropped image | |
| img.save(dst_file) | |
| lm3d = input_data['lm3d'][:, 0:2] | |
| np.save(os.path.join(save_align3d_dir, | |
| im_name.replace(".png", ".npy").replace(".jpg", ".npy").replace(".jpeg", ".npy")), | |
| lm3d) | |
| else: | |
| for input_data in tqdm(inputs, desc="Processing images"): | |
| process_image(input_data) | |
| def recreate_aligned_videos_multiprocessing( | |
| root_dir, lms_root_dir, dst_dir, valid_video_json, save_realign=True, skip=True, | |
| enable_warping=False, output_size=512, | |
| enable_padding='zero_padding', n_threads=12, apply_GF=0 | |
| ): | |
| """ | |
| Recreates aligned videos by processing images with landmark-based transformations. | |
| Args: | |
| root_dir (str): Directory containing original video frames. | |
| lms_root_dir (str): Directory with corresponding facial landmark JSON files. | |
| dst_dir (str): Directory to save aligned images. | |
| valid_video_json (str): JSON file containing valid video names and frame lists. | |
| save_realign (bool, optional): Whether to save realigned images. Defaults to True. | |
| skip (bool, optional): Skip already processed videos if 'finish' file exists. Defaults to False. | |
| enable_warping (bool, optional): Apply warping transformation. Defaults to True. | |
| output_size (int, optional): Desired output image resolution. Defaults to 1024. | |
| transform_size (int, optional): Size used for transformation before cropping. Defaults to 4096. | |
| enable_padding (str, optional): Padding mode ('zero_padding', 'blur_padding', 'reflect_padding', or None). Defaults to None. | |
| n_threads (int, optional): Number of parallel threads for processing. Defaults to 12. | |
| apply_GF (int, optional): Gaussian filtering level. Defaults to 0. | |
| Returns: | |
| None | |
| """ | |
| print("Recreating aligned images...") | |
| # Validate `enable_padding` argument | |
| assert enable_padding in [None, 'zero_padding', 'blur_padding', 'reflect_padding'], \ | |
| f"Invalid enable_padding value: {enable_padding}" | |
| # Load valid video indices from JSON | |
| with open(valid_video_json, 'r') as f: | |
| valid_idx = json.load(f) | |
| inputs = [] # List to store parameters for multiprocessing | |
| # Iterate through each valid video and prepare processing inputs | |
| for video_name, im_names in valid_idx: | |
| video_dir = os.path.join(root_dir, video_name) # Path to video frames | |
| dst_save_dir = os.path.join(dst_dir, video_name) # Destination path for aligned images | |
| base_dir = os.path.dirname(os.path.dirname(dst_dir)) | |
| save_align3d_dir = os.path.join(base_dir, 'align_3d_landmark', video_name) | |
| os.makedirs(save_align3d_dir, exist_ok=True) | |
| # Paths for saving realigned images and detections (if enabled) | |
| save_realign_dir = save_detection_dir = None | |
| if save_realign: | |
| save_realign_dir = os.path.join(base_dir, 'realign', video_name) | |
| save_detection_dir = os.path.join(base_dir, 'realign_detections', video_name) | |
| # Skip processing if video directory or landmark JSON does not exist | |
| if not os.path.isdir(video_dir): | |
| continue | |
| if not os.path.exists(os.path.join(lms_root_dir, f"{video_name}.json")): | |
| continue | |
| # Skip if already processed and `skip=True` | |
| if skip and os.path.exists(os.path.join(dst_save_dir, 'finish')): | |
| continue | |
| # Load facial landmark data | |
| with open(os.path.join(lms_root_dir, f"{video_name}.json"), 'r') as f: | |
| lm5p_dict = json.load(f) | |
| with open(os.path.join(lms_root_dir, f"{video_name}3d.json"), 'r') as f: | |
| lm3d_dict = json.load(f) | |
| # Prepare input dictionary for processing | |
| input_data = { | |
| 'src_dir': video_dir, | |
| 'dst_dir': dst_save_dir, | |
| 'lm5p': lm5p_dict, | |
| 'lm3d': lm3d_dict, | |
| 'im_names': im_names, | |
| 'save_realign_dir': save_realign_dir, | |
| 'save_detection_dir': save_detection_dir, | |
| 'save_align3d_dir':save_align3d_dir, | |
| 'output_size': output_size, | |
| 'enable_padding': enable_padding, | |
| 'apply_GF': apply_GF, | |
| 'enable_warping': enable_warping | |
| } | |
| inputs.append(input_data) | |
| # Process videos in parallel using multiprocessing | |
| with multiprocessing.Pool(n_threads) as pool: | |
| results = list(tqdm(pool.imap(process_video, inputs), total=len(inputs), smoothing=0.1)) | |
| # Alternative: Process sequentially (useful for debugging) | |
| # for input_data in tqdm(inputs): | |
| # process_video(input_data) | |
| # # ---------------------------------------------------------------------------- | |
| # | |
| # if __name__ == "__main__": | |
| # parser = argparse.ArgumentParser() | |
| # parser.add_argument('--source', type=str, default='.') | |
| # parser.add_argument('--lm_source', type=str, default='') | |
| # parser.add_argument('--dest', type=str, default='realign1500') | |
| # parser.add_argument('--valid_video_json', type=str, default=None) | |
| # parser.add_argument('--threads', type=int, default=12) | |
| # parser.add_argument('--output_size', type=int, default=768) | |
| # parser.add_argument('--transform_size', type=int, default=768) | |
| # parser.add_argument('--apply_GF', type=float, default=0) | |
| # parser.add_argument('--save_realign', action='store_true') | |
| # parser.add_argument('--skip', action='store_true') | |
| # parser.add_argument('--disable_warping', action='store_true') | |
| # parser.add_argument('--padding_mode', type=str, default=None) | |
| # args = parser.parse_args() | |
| # | |
| # # recreate_aligned_images_fast(args.source, args.lm_source, args.dest, args.save_realign_dir, args.valid_video_json, | |
| # # output_size=args.output_size, transform_size=args.transform_size, n_threads=args.threads) | |
| # recreate_aligned_videos_fast(args.source, args.lm_source, args.dest, args.valid_video_json, | |
| # save_realign=args.save_realign, skip=args.skip, | |
| # output_size=args.output_size, transform_size=args.transform_size, | |
| # n_threads=args.threads, apply_GF=args.apply_GF, | |
| # enable_padding=args.padding_mode, enable_warping=False) | |
| # | |
| # # run_cmdline(sys.argv) | |
| # ---------------------------------------------------------------------------- | |