| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | import mediapipe as mp
|
| | from latentsync.utils.util import read_video
|
| | import os
|
| | import tqdm
|
| | import shutil
|
| | from multiprocessing import Pool
|
| |
|
| | paths = []
|
| |
|
| |
|
| | def gather_video_paths(input_dir, output_dir, resolution):
|
| | for video in sorted(os.listdir(input_dir)):
|
| | if video.endswith(".mp4"):
|
| | video_input = os.path.join(input_dir, video)
|
| | video_output = os.path.join(output_dir, video)
|
| | if os.path.isfile(video_output):
|
| | continue
|
| | paths.append([video_input, video_output, resolution])
|
| | elif os.path.isdir(os.path.join(input_dir, video)):
|
| | gather_video_paths(os.path.join(input_dir, video), os.path.join(output_dir, video), resolution)
|
| |
|
| |
|
| | class FaceDetector:
|
| | def __init__(self, resolution=256):
|
| | self.face_detection = mp.solutions.face_detection.FaceDetection(
|
| | model_selection=0, min_detection_confidence=0.5
|
| | )
|
| | self.resolution = resolution
|
| |
|
| | def detect_face(self, image):
|
| | height, width = image.shape[:2]
|
| |
|
| | results = self.face_detection.process(image)
|
| |
|
| | if not results.detections:
|
| | raise Exception("Face not detected")
|
| |
|
| | if len(results.detections) != 1:
|
| | return False
|
| | detection = results.detections[0]
|
| |
|
| | bounding_box = detection.location_data.relative_bounding_box
|
| | face_width = int(bounding_box.width * width)
|
| | face_height = int(bounding_box.height * height)
|
| | if face_width < self.resolution or face_height < self.resolution:
|
| | return False
|
| | return True
|
| |
|
| | def detect_video(self, video_path):
|
| | video_frames = read_video(video_path, change_fps=False)
|
| | if len(video_frames) == 0:
|
| | return False
|
| | for frame in video_frames:
|
| | if not self.detect_face(frame):
|
| | return False
|
| | return True
|
| |
|
| | def close(self):
|
| | self.face_detection.close()
|
| |
|
| |
|
| | def filter_video(video_input, video_out, resolution):
|
| | if os.path.isfile(video_out):
|
| | return
|
| | face_detector = FaceDetector(resolution)
|
| | try:
|
| | save = face_detector.detect_video(video_input)
|
| | except Exception as e:
|
| |
|
| | face_detector.close()
|
| | return
|
| | if save:
|
| | os.makedirs(os.path.dirname(video_out), exist_ok=True)
|
| | shutil.copy(video_input, video_out)
|
| | face_detector.close()
|
| |
|
| |
|
| | def multi_run_wrapper(args):
|
| | return filter_video(*args)
|
| |
|
| |
|
| | def filter_high_resolution_multiprocessing(input_dir, output_dir, resolution, num_workers):
|
| | print(f"Recursively gathering video paths of {input_dir} ...")
|
| | gather_video_paths(input_dir, output_dir, resolution)
|
| |
|
| | print(f"Filtering high resolution videos in {input_dir} ...")
|
| | with Pool(num_workers) as pool:
|
| | for _ in tqdm.tqdm(pool.imap_unordered(multi_run_wrapper, paths), total=len(paths)):
|
| | pass
|
| |
|
| |
|
| | if __name__ == "__main__":
|
| | input_dir = "/mnt/bn/maliva-gen-ai/lichunyu/HDTF/original/train"
|
| | output_dir = "/mnt/bn/maliva-gen-ai/lichunyu/HDTF/detected/train"
|
| | resolution = 256
|
| | num_workers = 50
|
| |
|
| | filter_high_resolution_multiprocessing(input_dir, output_dir, resolution, num_workers)
|
| |
|