deepfake-api / src /data_preprocessing.py
piyushnaula's picture
Initial commit - Deepfake Detector API
45742a7
import cv2
from mtcnn.mtcnn import MTCNN
import os
import sys
from tqdm import tqdm # Our progress bar library!
import warnings
# --- Suppress TensorFlow & MTCNN warnings ---
# This just quiets down the console output
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
warnings.filterwarnings('ignore', category=UserWarning, module='tensorflow')
warnings.filterwarnings('ignore', category=FutureWarning, module='tensorflow')
# --- End Warning Suppression ---
#
# 1. IMPORT FROM OUR CONFIG FILE
#
# This is the "best practice" part. We import all our paths
# and settings from the single config.py file.
#
try:
import config
except ImportError:
print("Error: Could not import config.py.")
print("Make sure it's in the 'src/' directory.")
sys.exit(1)
def load_test_list(filepath):
"""
Loads the official test file list into a set for fast lookup.
The file format is:
1/id0_0002.mp4
1/id0_0006.mp4
...
0/id2_0001.mp4
We only care about the video filename (e.g., "id0_0002.mp4").
The "1/" (fake) or "0/" (real) prefix in the list confirms the label.
"""
test_videos = set()
try:
with open(filepath, 'r') as f:
for line in f:
# Get the part after the slash (e.g., "1/id0_0002.mp4" -> "id0_0002.mp4")
filename = line.strip().split('/')[-1]
test_videos.add(filename)
except FileNotFoundError:
print(f"Error: Test list file not found at: {filepath}")
sys.exit(1)
print(f"Loaded {len(test_videos)} videos into the test set.")
return test_videos
def create_directories():
"""
Creates all the necessary output directories defined in our config.
The 'exist_ok=True' parameter prevents errors if the folders already exist.
"""
print("Creating output directories...")
os.makedirs(config.TRAIN_REAL_DIR, exist_ok=True)
os.makedirs(config.TRAIN_FAKE_DIR, exist_ok=True)
os.makedirs(config.TEST_REAL_DIR, exist_ok=True)
os.makedirs(config.TEST_FAKE_DIR, exist_ok=True)
print("Directories created/verified.")
def extract_faces(video_path, output_dir, video_filename, detector):
"""
Extracts, crops, and resizes faces from a single video file
and saves them to the specified output directory.
"""
try:
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f" [Warning] Could not open video: {video_filename}")
return
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if total_frames == 0:
print(f" [Warning] Video has 0 frames: {video_filename}")
return
# Calculate a step to pick frames evenly, ensuring we don't just
# get the first N frames.
step = max(1, total_frames // config.FRAMES_PER_VIDEO)
frame_num = 0
faces_saved = 0
while frame_num < total_frames and faces_saved < config.FRAMES_PER_VIDEO:
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num)
ret, frame = cap.read()
if not ret:
frame_num += step
continue
# Convert frame to RGB for MTCNN
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Detect faces
# This is the most time-consuming part
faces = detector.detect_faces(frame_rgb)
if faces:
# Get the first face
face = faces[0]
x, y, w, h = face['box']
# Make sure coordinates are not negative
x, y = abs(x), abs(y)
# Crop the face
face_crop = frame[y : y+h, x : x+w]
if face_crop.size == 0:
frame_num += step
continue
# Resize to our standard size
resized_face = cv2.resize(face_crop, (config.IMAGE_SIZE, config.IMAGE_SIZE))
# Save the image
save_name = f"{video_filename}_frame_{frame_num}.jpg"
save_path = os.path.join(output_dir, save_name)
cv2.imwrite(save_path, resized_face)
faces_saved += 1
frame_num += step
except Exception as e:
print(f" [Error] Processing {video_filename}: {e}")
finally:
if cap.isOpened():
cap.release()
return faces_saved
def process_all_videos(detector):
"""
Orchestrates the entire preprocessing pipeline.
"""
# Load the set of videos that belong in the "test" set
test_set = load_test_list(config.TEST_LIST_FILE)
# 1. --- Process REAL Videos ---
# We combine 'Celeb-real' and 'Youtube-real' into one list
real_video_dirs = [config.CELEB_REAL_DIR, config.YOUTUBE_REAL_DIR]
real_video_paths = []
for dir in real_video_dirs:
for filename in os.listdir(dir):
if filename.endswith('.mp4'):
real_video_paths.append(os.path.join(dir, filename))
print(f"\nFound {len(real_video_paths)} real videos. Processing...")
# Use tqdm for a nice progress bar
for video_path in tqdm(real_video_paths, desc="Processing Real Videos"):
filename = os.path.basename(video_path)
# Decide if it's train or test
if filename in test_set:
output_dir = config.TEST_REAL_DIR
else:
output_dir = config.TRAIN_REAL_DIR
# Extract faces
extract_faces(video_path, output_dir, os.path.splitext(filename)[0], detector)
# 2. --- Process FAKE Videos ---
fake_video_paths = []
for filename in os.listdir(config.CELEB_FAKE_DIR):
if filename.endswith('.mp4'):
fake_video_paths.append(os.path.join(config.CELEB_FAKE_DIR, filename))
print(f"\nFound {len(fake_video_paths)} fake videos. Processing...")
# Use tqdm for a nice progress bar
for video_path in tqdm(fake_video_paths, desc="Processing Fake Videos"):
filename = os.path.basename(video_path)
# Decide if it's train or test
if filename in test_set:
output_dir = config.TEST_FAKE_DIR
else:
output_dir = config.TRAIN_FAKE_DIR
# Extract faces
extract_faces(video_path, output_dir, os.path.splitext(filename)[0], detector)
#
# This is the "entry point" of our script
#
if __name__ == "__main__":
print("--- DeepFake Detector: Data Preprocessing ---")
# 1. Create all output folders
create_directories()
# 2. Initialize the MTCNN detector
# We initialize it ONCE here and pass it to the functions.
# This is much more efficient than creating one for every video.
print("Initializing MTCNN face detector (this may take a moment)...")
try:
mtcnn_detector = MTCNN()
print("MTCNN detector initialized.")
except Exception as e:
print(f"Fatal Error: Could not initialize MTCNN.")
print("Please ensure TensorFlow is installed correctly.")
print(f"Error details: {e}")
sys.exit(1)
# 3. Run the main processing loop
process_all_videos(mtcnn_detector)
print("\n--- Preprocessing Complete! ---")
print(f"Your processed frames are now in: {config.PROCESSED_DATA_DIR}")