import os import cv2 import numpy as np import pandas as pd from tqdm import tqdm import shutil from mtcnn.mtcnn import MTCNN # <-- NEW IMPORT import tensorflow as tf # --- Fix for MTCNN in Colab --- # MTCNN can have issues with TF2.x, this helps tf.get_logger().setLevel('ERROR') try: import config except ImportError: print("Error: Could not import config.py. Make sure it's in the src/ directory.") exit(1) def load_test_list(): """Loads the list of test videos from the text file.""" try: with open(config.TEST_LIST_FILE, 'r') as f: test_videos = [line.strip().split('/')[-1] for line in f] return set(test_videos) except FileNotFoundError: print(f"Error: Test list file not found at {config.TEST_LIST_FILE}") return set() def extract_frames(video_path, output_folder, sequence_length, detector): """ Extracts, face-detects, and crops 'sequence_length' evenly spaced frames from a video. """ cap = cv2.VideoCapture(video_path) if not cap.isOpened(): print(f" > Warning: Could not open video {video_path}") return False total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) if total_frames < sequence_length: print(f" > Warning: Video {video_path} is too short ({total_frames} frames). Skipping.") return False frame_indices = np.linspace(0, total_frames - 1, sequence_length, dtype=int) os.makedirs(output_folder, exist_ok=True) frames_saved = 0 for i, frame_index in enumerate(frame_indices): cap.set(cv2.CAP_PROP_POS_FRAMES, frame_index) ret, frame = cap.read() if not ret: continue # --- NEW FACE DETECTION STEP --- try: # Convert from BGR (OpenCV) to RGB (MTCNN) frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # Detect faces results = detector.detect_faces(frame_rgb) if results: # Get the first and most confident face x1, y1, width, height = results[0]['box'] # Add a 20% margin for safety x1, y1 = max(0, x1 - int(width*0.1)), max(0, y1 - int(height*0.1)) x2, y2 = min(frame.shape[1], x1 + int(width*1.2)), min(frame.shape[0], y1 + int(height*1.2)) # Crop the face from the *original* BGR frame face_crop = frame[y1:y2, x1:x2] # Resize the cropped face to our model's input size face_resized = cv2.resize(face_crop, (config.TARGET_IMAGE_SIZE, config.TARGET_IMAGE_SIZE)) # Save the frame frame_filename = f"frame_{frames_saved:02d}.jpg" save_path = os.path.join(output_folder, frame_filename) cv2.imwrite(save_path, face_resized) frames_saved += 1 except Exception as e: # Sometimes MTCNN fails on a weird frame print(f" > Warning: Face detection failed for a frame in {video_path}. Error: {e}") pass cap.release() # Check if we actually saved enough frames if frames_saved < sequence_length * 0.8: # Allow for a few failures print(f" > Warning: Only saved {frames_saved} frames for {video_path}. Skipping.") shutil.rmtree(output_folder) # Clean up partial folder return False return True def main(): """ Main function to process all raw videos into sequence folders. """ print("--- Starting Video Sequence Preprocessing (with Face Detection) ---") # 0. Clean old directory if os.path.exists(config.PROCESSED_SEQ_DIR): print(f"Removing old directory: {config.PROCESSED_SEQ_DIR}") shutil.rmtree(config.PROCESSED_SEQ_DIR) # 1. Load the list of test videos test_video_set = load_test_list() print(f"Loaded {len(test_video_set)} videos in the test set.") # 2. --- NEW: Initialize the Face Detector --- print("Initializing MTCNN face detector...") detector = MTCNN() print("Detector initialized.") data_sources = [ ('real', config.CELEB_REAL_DIR), ('real', config.YOUTUBE_REAL_DIR), ('fake', config.CELEB_FAKE_DIR) ] video_count = 0 for label, source_dir in data_sources: print(f"\nProcessing directory: {source_dir} (Label: {label})") if not os.path.exists(source_dir): print(f" > Warning: Directory not found. Skipping.") continue # Use tqdm for a nice progress bar for video_file in tqdm(os.listdir(source_dir)): if not video_file.endswith(('.mp4', '.avi', '.mov')): continue video_path = os.path.join(source_dir, video_file) video_name = os.path.splitext(video_file)[0] if video_file in test_video_set: output_dir_base = config.TEST_SEQ_DIR else: output_dir_base = config.TRAIN_SEQ_DIR output_folder = os.path.join(output_dir_base, label, video_name) # Pass the detector to the function success = extract_frames(video_path, output_folder, config.SEQUENCE_LENGTH, detector) if success: video_count += 1 print("\n--- Preprocessing Complete ---") print(f"Successfully processed {video_count} videos with face cropping.") print(f"New data is located in: {config.PROCESSED_SEQ_DIR}") if __name__ == "__main__": main()