Spaces:
Sleeping
Sleeping
| import cv2 | |
| from mtcnn.mtcnn import MTCNN | |
| import os | |
| import sys | |
| from tqdm import tqdm # Our progress bar library! | |
| import warnings | |
| # --- Suppress TensorFlow & MTCNN warnings --- | |
| # This just quiets down the console output | |
| os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' | |
| warnings.filterwarnings('ignore', category=UserWarning, module='tensorflow') | |
| warnings.filterwarnings('ignore', category=FutureWarning, module='tensorflow') | |
| # --- End Warning Suppression --- | |
| # | |
| # 1. IMPORT FROM OUR CONFIG FILE | |
| # | |
| # This is the "best practice" part. We import all our paths | |
| # and settings from the single config.py file. | |
| # | |
| try: | |
| import config | |
| except ImportError: | |
| print("Error: Could not import config.py.") | |
| print("Make sure it's in the 'src/' directory.") | |
| sys.exit(1) | |
| def load_test_list(filepath): | |
| """ | |
| Loads the official test file list into a set for fast lookup. | |
| The file format is: | |
| 1/id0_0002.mp4 | |
| 1/id0_0006.mp4 | |
| ... | |
| 0/id2_0001.mp4 | |
| We only care about the video filename (e.g., "id0_0002.mp4"). | |
| The "1/" (fake) or "0/" (real) prefix in the list confirms the label. | |
| """ | |
| test_videos = set() | |
| try: | |
| with open(filepath, 'r') as f: | |
| for line in f: | |
| # Get the part after the slash (e.g., "1/id0_0002.mp4" -> "id0_0002.mp4") | |
| filename = line.strip().split('/')[-1] | |
| test_videos.add(filename) | |
| except FileNotFoundError: | |
| print(f"Error: Test list file not found at: {filepath}") | |
| sys.exit(1) | |
| print(f"Loaded {len(test_videos)} videos into the test set.") | |
| return test_videos | |
| def create_directories(): | |
| """ | |
| Creates all the necessary output directories defined in our config. | |
| The 'exist_ok=True' parameter prevents errors if the folders already exist. | |
| """ | |
| print("Creating output directories...") | |
| os.makedirs(config.TRAIN_REAL_DIR, exist_ok=True) | |
| os.makedirs(config.TRAIN_FAKE_DIR, exist_ok=True) | |
| os.makedirs(config.TEST_REAL_DIR, exist_ok=True) | |
| os.makedirs(config.TEST_FAKE_DIR, exist_ok=True) | |
| print("Directories created/verified.") | |
| def extract_faces(video_path, output_dir, video_filename, detector): | |
| """ | |
| Extracts, crops, and resizes faces from a single video file | |
| and saves them to the specified output directory. | |
| """ | |
| try: | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| print(f" [Warning] Could not open video: {video_filename}") | |
| return | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| if total_frames == 0: | |
| print(f" [Warning] Video has 0 frames: {video_filename}") | |
| return | |
| # Calculate a step to pick frames evenly, ensuring we don't just | |
| # get the first N frames. | |
| step = max(1, total_frames // config.FRAMES_PER_VIDEO) | |
| frame_num = 0 | |
| faces_saved = 0 | |
| while frame_num < total_frames and faces_saved < config.FRAMES_PER_VIDEO: | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num) | |
| ret, frame = cap.read() | |
| if not ret: | |
| frame_num += step | |
| continue | |
| # Convert frame to RGB for MTCNN | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| # Detect faces | |
| # This is the most time-consuming part | |
| faces = detector.detect_faces(frame_rgb) | |
| if faces: | |
| # Get the first face | |
| face = faces[0] | |
| x, y, w, h = face['box'] | |
| # Make sure coordinates are not negative | |
| x, y = abs(x), abs(y) | |
| # Crop the face | |
| face_crop = frame[y : y+h, x : x+w] | |
| if face_crop.size == 0: | |
| frame_num += step | |
| continue | |
| # Resize to our standard size | |
| resized_face = cv2.resize(face_crop, (config.IMAGE_SIZE, config.IMAGE_SIZE)) | |
| # Save the image | |
| save_name = f"{video_filename}_frame_{frame_num}.jpg" | |
| save_path = os.path.join(output_dir, save_name) | |
| cv2.imwrite(save_path, resized_face) | |
| faces_saved += 1 | |
| frame_num += step | |
| except Exception as e: | |
| print(f" [Error] Processing {video_filename}: {e}") | |
| finally: | |
| if cap.isOpened(): | |
| cap.release() | |
| return faces_saved | |
| def process_all_videos(detector): | |
| """ | |
| Orchestrates the entire preprocessing pipeline. | |
| """ | |
| # Load the set of videos that belong in the "test" set | |
| test_set = load_test_list(config.TEST_LIST_FILE) | |
| # 1. --- Process REAL Videos --- | |
| # We combine 'Celeb-real' and 'Youtube-real' into one list | |
| real_video_dirs = [config.CELEB_REAL_DIR, config.YOUTUBE_REAL_DIR] | |
| real_video_paths = [] | |
| for dir in real_video_dirs: | |
| for filename in os.listdir(dir): | |
| if filename.endswith('.mp4'): | |
| real_video_paths.append(os.path.join(dir, filename)) | |
| print(f"\nFound {len(real_video_paths)} real videos. Processing...") | |
| # Use tqdm for a nice progress bar | |
| for video_path in tqdm(real_video_paths, desc="Processing Real Videos"): | |
| filename = os.path.basename(video_path) | |
| # Decide if it's train or test | |
| if filename in test_set: | |
| output_dir = config.TEST_REAL_DIR | |
| else: | |
| output_dir = config.TRAIN_REAL_DIR | |
| # Extract faces | |
| extract_faces(video_path, output_dir, os.path.splitext(filename)[0], detector) | |
| # 2. --- Process FAKE Videos --- | |
| fake_video_paths = [] | |
| for filename in os.listdir(config.CELEB_FAKE_DIR): | |
| if filename.endswith('.mp4'): | |
| fake_video_paths.append(os.path.join(config.CELEB_FAKE_DIR, filename)) | |
| print(f"\nFound {len(fake_video_paths)} fake videos. Processing...") | |
| # Use tqdm for a nice progress bar | |
| for video_path in tqdm(fake_video_paths, desc="Processing Fake Videos"): | |
| filename = os.path.basename(video_path) | |
| # Decide if it's train or test | |
| if filename in test_set: | |
| output_dir = config.TEST_FAKE_DIR | |
| else: | |
| output_dir = config.TRAIN_FAKE_DIR | |
| # Extract faces | |
| extract_faces(video_path, output_dir, os.path.splitext(filename)[0], detector) | |
| # | |
| # This is the "entry point" of our script | |
| # | |
| if __name__ == "__main__": | |
| print("--- DeepFake Detector: Data Preprocessing ---") | |
| # 1. Create all output folders | |
| create_directories() | |
| # 2. Initialize the MTCNN detector | |
| # We initialize it ONCE here and pass it to the functions. | |
| # This is much more efficient than creating one for every video. | |
| print("Initializing MTCNN face detector (this may take a moment)...") | |
| try: | |
| mtcnn_detector = MTCNN() | |
| print("MTCNN detector initialized.") | |
| except Exception as e: | |
| print(f"Fatal Error: Could not initialize MTCNN.") | |
| print("Please ensure TensorFlow is installed correctly.") | |
| print(f"Error details: {e}") | |
| sys.exit(1) | |
| # 3. Run the main processing loop | |
| process_all_videos(mtcnn_detector) | |
| print("\n--- Preprocessing Complete! ---") | |
| print(f"Your processed frames are now in: {config.PROCESSED_DATA_DIR}") |