Spaces:
Sleeping
Sleeping
File size: 4,516 Bytes
7a77962 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 | import os
import glob
import random
import shutil
import yaml
import cv2
import numpy as np
from tqdm import tqdm
from preprocessing.face_detect import FaceDetector
def load_config(config_path="config.yaml"):
with open(config_path, "r") as f:
config = yaml.safe_load(f)
return config
def collect_videos(base_dir):
video_exts = ["*.mp4", "*.avi", "*.mov", "*.mkv"]
orig_root = os.path.join(base_dir, "DFD_original sequences")
manip_root = os.path.join(base_dir, "DFD_manipulated_sequences")
orig_videos = []
manip_videos = []
for ext in video_exts:
orig_videos.extend(
glob.glob(os.path.join(orig_root, "**", ext), recursive=True)
)
manip_videos.extend(
glob.glob(os.path.join(manip_root, "**", ext), recursive=True)
)
return orig_videos, manip_videos
def ensure_clean_dir(path):
os.makedirs(path, exist_ok=True)
for f in glob.glob(os.path.join(path, "*")):
if os.path.isfile(f):
os.remove(f)
def extract_faces_from_videos(video_paths, label_name, out_dir, face_detector, fps, max_faces=None):
os.makedirs(out_dir, exist_ok=True)
saved_paths = []
for video_path in tqdm(video_paths, desc=f"Processing {label_name} videos"):
faces = []
try:
faces = face_detector.process_video(video_path, fps=fps)
except Exception as e:
print(f"Error processing {video_path}: {e}")
continue
base = os.path.splitext(os.path.basename(video_path))[0]
for idx, face_img in enumerate(faces):
out_name = f"{base}_f{idx:05d}.png"
out_path = os.path.join(out_dir, out_name)
bgr = cv2.cvtColor(face_img, cv2.COLOR_RGB2BGR)
ok = cv2.imwrite(out_path, bgr)
if ok:
saved_paths.append(out_path)
if max_faces is not None and len(saved_paths) >= max_faces:
return saved_paths
return saved_paths
def build_balanced_dataset():
config = load_config()
fps = config["data"]["frame_fps"]
orig_videos, manip_videos = collect_videos("data")
print(f"Found {len(orig_videos)} original videos")
print(f"Found {len(manip_videos)} manipulated videos")
if len(orig_videos) == 0 and len(manip_videos) == 0:
print("No DFD videos found under data/.")
return
face_detector = FaceDetector()
raw_real_dir = os.path.join("data", "dfd_faces_real_all")
raw_fake_dir = os.path.join("data", "dfd_faces_fake_all")
os.makedirs(raw_real_dir, exist_ok=True)
os.makedirs(raw_fake_dir, exist_ok=True)
existing_real = glob.glob(os.path.join(raw_real_dir, "*.png"))
existing_fake = glob.glob(os.path.join(raw_fake_dir, "*.png"))
if existing_real and existing_fake:
real_paths = existing_real
fake_paths = existing_fake
print(f"Using existing face crops: {len(real_paths)} real, {len(fake_paths)} fake")
else:
ensure_clean_dir(raw_real_dir)
ensure_clean_dir(raw_fake_dir)
real_paths = extract_faces_from_videos(
orig_videos, "real", raw_real_dir, face_detector, fps
)
fake_paths = extract_faces_from_videos(
manip_videos, "fake", raw_fake_dir, face_detector, fps, max_faces=len(real_paths)
)
print(f"Extracted {len(real_paths)} real face crops")
print(f"Extracted {len(fake_paths)} fake face crops")
if len(real_paths) == 0 or len(fake_paths) == 0:
print("Not enough faces in one of the classes to build a balanced dataset.")
return
min_count = min(len(real_paths), len(fake_paths))
print(f"Balancing to {min_count} samples per class")
random.shuffle(real_paths)
random.shuffle(fake_paths)
real_paths = real_paths[:min_count]
fake_paths = fake_paths[:min_count]
real_target = os.path.join("data", "real")
fake_target = os.path.join("data", "fake")
ensure_clean_dir(real_target)
ensure_clean_dir(fake_target)
for src in real_paths:
dst = os.path.join(real_target, os.path.basename(src))
shutil.copy2(src, dst)
for src in fake_paths:
dst = os.path.join(fake_target, os.path.basename(src))
shutil.copy2(src, dst)
print(f"Final balanced dataset:")
print(f" Real: {len(real_paths)} images -> {real_target}")
print(f" Fake: {len(fake_paths)} images -> {fake_target}")
if __name__ == "__main__":
build_balanced_dataset()
|