Spaces:
Sleeping
Sleeping
File size: 7,500 Bytes
45742a7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 |
import cv2
from mtcnn.mtcnn import MTCNN
import os
import sys
from tqdm import tqdm # Our progress bar library!
import warnings
# --- Suppress TensorFlow & MTCNN warnings ---
# This just quiets down the console output
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
warnings.filterwarnings('ignore', category=UserWarning, module='tensorflow')
warnings.filterwarnings('ignore', category=FutureWarning, module='tensorflow')
# --- End Warning Suppression ---
#
# 1. IMPORT FROM OUR CONFIG FILE
#
# This is the "best practice" part. We import all our paths
# and settings from the single config.py file.
#
try:
import config
except ImportError:
print("Error: Could not import config.py.")
print("Make sure it's in the 'src/' directory.")
sys.exit(1)
def load_test_list(filepath):
"""
Loads the official test file list into a set for fast lookup.
The file format is:
1/id0_0002.mp4
1/id0_0006.mp4
...
0/id2_0001.mp4
We only care about the video filename (e.g., "id0_0002.mp4").
The "1/" (fake) or "0/" (real) prefix in the list confirms the label.
"""
test_videos = set()
try:
with open(filepath, 'r') as f:
for line in f:
# Get the part after the slash (e.g., "1/id0_0002.mp4" -> "id0_0002.mp4")
filename = line.strip().split('/')[-1]
test_videos.add(filename)
except FileNotFoundError:
print(f"Error: Test list file not found at: {filepath}")
sys.exit(1)
print(f"Loaded {len(test_videos)} videos into the test set.")
return test_videos
def create_directories():
"""
Creates all the necessary output directories defined in our config.
The 'exist_ok=True' parameter prevents errors if the folders already exist.
"""
print("Creating output directories...")
os.makedirs(config.TRAIN_REAL_DIR, exist_ok=True)
os.makedirs(config.TRAIN_FAKE_DIR, exist_ok=True)
os.makedirs(config.TEST_REAL_DIR, exist_ok=True)
os.makedirs(config.TEST_FAKE_DIR, exist_ok=True)
print("Directories created/verified.")
def extract_faces(video_path, output_dir, video_filename, detector):
"""
Extracts, crops, and resizes faces from a single video file
and saves them to the specified output directory.
"""
try:
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f" [Warning] Could not open video: {video_filename}")
return
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if total_frames == 0:
print(f" [Warning] Video has 0 frames: {video_filename}")
return
# Calculate a step to pick frames evenly, ensuring we don't just
# get the first N frames.
step = max(1, total_frames // config.FRAMES_PER_VIDEO)
frame_num = 0
faces_saved = 0
while frame_num < total_frames and faces_saved < config.FRAMES_PER_VIDEO:
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num)
ret, frame = cap.read()
if not ret:
frame_num += step
continue
# Convert frame to RGB for MTCNN
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Detect faces
# This is the most time-consuming part
faces = detector.detect_faces(frame_rgb)
if faces:
# Get the first face
face = faces[0]
x, y, w, h = face['box']
# Make sure coordinates are not negative
x, y = abs(x), abs(y)
# Crop the face
face_crop = frame[y : y+h, x : x+w]
if face_crop.size == 0:
frame_num += step
continue
# Resize to our standard size
resized_face = cv2.resize(face_crop, (config.IMAGE_SIZE, config.IMAGE_SIZE))
# Save the image
save_name = f"{video_filename}_frame_{frame_num}.jpg"
save_path = os.path.join(output_dir, save_name)
cv2.imwrite(save_path, resized_face)
faces_saved += 1
frame_num += step
except Exception as e:
print(f" [Error] Processing {video_filename}: {e}")
finally:
if cap.isOpened():
cap.release()
return faces_saved
def process_all_videos(detector):
"""
Orchestrates the entire preprocessing pipeline.
"""
# Load the set of videos that belong in the "test" set
test_set = load_test_list(config.TEST_LIST_FILE)
# 1. --- Process REAL Videos ---
# We combine 'Celeb-real' and 'Youtube-real' into one list
real_video_dirs = [config.CELEB_REAL_DIR, config.YOUTUBE_REAL_DIR]
real_video_paths = []
for dir in real_video_dirs:
for filename in os.listdir(dir):
if filename.endswith('.mp4'):
real_video_paths.append(os.path.join(dir, filename))
print(f"\nFound {len(real_video_paths)} real videos. Processing...")
# Use tqdm for a nice progress bar
for video_path in tqdm(real_video_paths, desc="Processing Real Videos"):
filename = os.path.basename(video_path)
# Decide if it's train or test
if filename in test_set:
output_dir = config.TEST_REAL_DIR
else:
output_dir = config.TRAIN_REAL_DIR
# Extract faces
extract_faces(video_path, output_dir, os.path.splitext(filename)[0], detector)
# 2. --- Process FAKE Videos ---
fake_video_paths = []
for filename in os.listdir(config.CELEB_FAKE_DIR):
if filename.endswith('.mp4'):
fake_video_paths.append(os.path.join(config.CELEB_FAKE_DIR, filename))
print(f"\nFound {len(fake_video_paths)} fake videos. Processing...")
# Use tqdm for a nice progress bar
for video_path in tqdm(fake_video_paths, desc="Processing Fake Videos"):
filename = os.path.basename(video_path)
# Decide if it's train or test
if filename in test_set:
output_dir = config.TEST_FAKE_DIR
else:
output_dir = config.TRAIN_FAKE_DIR
# Extract faces
extract_faces(video_path, output_dir, os.path.splitext(filename)[0], detector)
#
# This is the "entry point" of our script
#
if __name__ == "__main__":
print("--- DeepFake Detector: Data Preprocessing ---")
# 1. Create all output folders
create_directories()
# 2. Initialize the MTCNN detector
# We initialize it ONCE here and pass it to the functions.
# This is much more efficient than creating one for every video.
print("Initializing MTCNN face detector (this may take a moment)...")
try:
mtcnn_detector = MTCNN()
print("MTCNN detector initialized.")
except Exception as e:
print(f"Fatal Error: Could not initialize MTCNN.")
print("Please ensure TensorFlow is installed correctly.")
print(f"Error details: {e}")
sys.exit(1)
# 3. Run the main processing loop
process_all_videos(mtcnn_detector)
print("\n--- Preprocessing Complete! ---")
print(f"Your processed frames are now in: {config.PROCESSED_DATA_DIR}") |