|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import mediapipe as mp |
|
|
from latentsync.utils.util import read_video |
|
|
import os |
|
|
import tqdm |
|
|
import shutil |
|
|
from multiprocessing import Pool |
|
|
|
|
|
paths = [] |
|
|
|
|
|
|
|
|
def gather_video_paths(input_dir, output_dir, resolution): |
|
|
for video in sorted(os.listdir(input_dir)): |
|
|
if video.endswith(".mp4"): |
|
|
video_input = os.path.join(input_dir, video) |
|
|
video_output = os.path.join(output_dir, video) |
|
|
if os.path.isfile(video_output): |
|
|
continue |
|
|
paths.append([video_input, video_output, resolution]) |
|
|
elif os.path.isdir(os.path.join(input_dir, video)): |
|
|
gather_video_paths(os.path.join(input_dir, video), os.path.join(output_dir, video), resolution) |
|
|
|
|
|
|
|
|
class FaceDetector: |
|
|
def __init__(self, resolution=256): |
|
|
self.face_detection = mp.solutions.face_detection.FaceDetection( |
|
|
model_selection=0, min_detection_confidence=0.5 |
|
|
) |
|
|
self.resolution = resolution |
|
|
|
|
|
def detect_face(self, image): |
|
|
height, width = image.shape[:2] |
|
|
|
|
|
results = self.face_detection.process(image) |
|
|
|
|
|
if not results.detections: |
|
|
raise Exception("Face not detected") |
|
|
|
|
|
if len(results.detections) != 1: |
|
|
return False |
|
|
detection = results.detections[0] |
|
|
|
|
|
bounding_box = detection.location_data.relative_bounding_box |
|
|
face_width = int(bounding_box.width * width) |
|
|
face_height = int(bounding_box.height * height) |
|
|
if face_width < self.resolution or face_height < self.resolution: |
|
|
return False |
|
|
return True |
|
|
|
|
|
def detect_video(self, video_path): |
|
|
video_frames = read_video(video_path, change_fps=False) |
|
|
if len(video_frames) == 0: |
|
|
return False |
|
|
for frame in video_frames: |
|
|
if not self.detect_face(frame): |
|
|
return False |
|
|
return True |
|
|
|
|
|
def close(self): |
|
|
self.face_detection.close() |
|
|
|
|
|
|
|
|
def filter_video(video_input, video_out, resolution): |
|
|
if os.path.isfile(video_out): |
|
|
return |
|
|
face_detector = FaceDetector(resolution) |
|
|
try: |
|
|
save = face_detector.detect_video(video_input) |
|
|
except Exception as e: |
|
|
|
|
|
face_detector.close() |
|
|
return |
|
|
if save: |
|
|
os.makedirs(os.path.dirname(video_out), exist_ok=True) |
|
|
shutil.copy(video_input, video_out) |
|
|
face_detector.close() |
|
|
|
|
|
|
|
|
def multi_run_wrapper(args): |
|
|
return filter_video(*args) |
|
|
|
|
|
|
|
|
def filter_high_resolution_multiprocessing(input_dir, output_dir, resolution, num_workers): |
|
|
print(f"Recursively gathering video paths of {input_dir} ...") |
|
|
gather_video_paths(input_dir, output_dir, resolution) |
|
|
|
|
|
print(f"Filtering high resolution videos in {input_dir} ...") |
|
|
with Pool(num_workers) as pool: |
|
|
for _ in tqdm.tqdm(pool.imap_unordered(multi_run_wrapper, paths), total=len(paths)): |
|
|
pass |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
input_dir = "/mnt/bn/maliva-gen-ai/lichunyu/HDTF/original/train" |
|
|
output_dir = "/mnt/bn/maliva-gen-ai/lichunyu/HDTF/detected/train" |
|
|
resolution = 256 |
|
|
num_workers = 50 |
|
|
|
|
|
filter_high_resolution_multiprocessing(input_dir, output_dir, resolution, num_workers) |
|
|
|