File size: 7,500 Bytes
45742a7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
import cv2
from mtcnn.mtcnn import MTCNN
import os
import sys
from tqdm import tqdm # Our progress bar library!
import warnings

# --- Suppress TensorFlow & MTCNN warnings ---
# This just quiets down the console output
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' 
warnings.filterwarnings('ignore', category=UserWarning, module='tensorflow')
warnings.filterwarnings('ignore', category=FutureWarning, module='tensorflow')
# --- End Warning Suppression ---

#
# 1. IMPORT FROM OUR CONFIG FILE
#
# This is the "best practice" part. We import all our paths 
# and settings from the single config.py file.
#
try:
    import config
except ImportError:
    print("Error: Could not import config.py.")
    print("Make sure it's in the 'src/' directory.")
    sys.exit(1)

def load_test_list(filepath):
    """
    Loads the official test file list into a set for fast lookup.
    
    The file format is:
    1/id0_0002.mp4
    1/id0_0006.mp4
    ...
    0/id2_0001.mp4
    
    We only care about the video filename (e.g., "id0_0002.mp4").
    The "1/" (fake) or "0/" (real) prefix in the list confirms the label.
    """
    test_videos = set()
    try:
        with open(filepath, 'r') as f:
            for line in f:
                # Get the part after the slash (e.g., "1/id0_0002.mp4" -> "id0_0002.mp4")
                filename = line.strip().split('/')[-1]
                test_videos.add(filename)
    except FileNotFoundError:
        print(f"Error: Test list file not found at: {filepath}")
        sys.exit(1)
    
    print(f"Loaded {len(test_videos)} videos into the test set.")
    return test_videos

def create_directories():
    """
    Creates all the necessary output directories defined in our config.
    The 'exist_ok=True' parameter prevents errors if the folders already exist.
    """
    print("Creating output directories...")
    os.makedirs(config.TRAIN_REAL_DIR, exist_ok=True)
    os.makedirs(config.TRAIN_FAKE_DIR, exist_ok=True)
    os.makedirs(config.TEST_REAL_DIR, exist_ok=True)
    os.makedirs(config.TEST_FAKE_DIR, exist_ok=True)
    print("Directories created/verified.")

def extract_faces(video_path, output_dir, video_filename, detector):
    """    
    Extracts, crops, and resizes faces from a single video file 
    and saves them to the specified output directory.
    """
    try:
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            print(f"  [Warning] Could not open video: {video_filename}")
            return

        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        if total_frames == 0:
            print(f"  [Warning] Video has 0 frames: {video_filename}")
            return

        # Calculate a step to pick frames evenly, ensuring we don't just
        # get the first N frames.
        step = max(1, total_frames // config.FRAMES_PER_VIDEO)
        
        frame_num = 0
        faces_saved = 0

        while frame_num < total_frames and faces_saved < config.FRAMES_PER_VIDEO:
            cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num)
            
            ret, frame = cap.read()
            if not ret:
                frame_num += step
                continue
            
            # Convert frame to RGB for MTCNN
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            
            # Detect faces
            # This is the most time-consuming part
            faces = detector.detect_faces(frame_rgb)
            
            if faces:
                # Get the first face
                face = faces[0]
                x, y, w, h = face['box']

                # Make sure coordinates are not negative
                x, y = abs(x), abs(y)

                # Crop the face
                face_crop = frame[y : y+h, x : x+w]
                
                if face_crop.size == 0:
                    frame_num += step
                    continue
                
                # Resize to our standard size
                resized_face = cv2.resize(face_crop, (config.IMAGE_SIZE, config.IMAGE_SIZE))
                
                # Save the image
                save_name = f"{video_filename}_frame_{frame_num}.jpg"
                save_path = os.path.join(output_dir, save_name)
                cv2.imwrite(save_path, resized_face)
                
                faces_saved += 1
                
            frame_num += step

    except Exception as e:
        print(f"  [Error] Processing {video_filename}: {e}")
    finally:
        if cap.isOpened():
            cap.release()
            
    return faces_saved

def process_all_videos(detector):
    """
    Orchestrates the entire preprocessing pipeline.
    """
    
    # Load the set of videos that belong in the "test" set
    test_set = load_test_list(config.TEST_LIST_FILE)
    
    # 1. --- Process REAL Videos ---
    # We combine 'Celeb-real' and 'Youtube-real' into one list
    real_video_dirs = [config.CELEB_REAL_DIR, config.YOUTUBE_REAL_DIR]
    
    real_video_paths = []
    for dir in real_video_dirs:
        for filename in os.listdir(dir):
            if filename.endswith('.mp4'):
                real_video_paths.append(os.path.join(dir, filename))
                
    print(f"\nFound {len(real_video_paths)} real videos. Processing...")
    
    # Use tqdm for a nice progress bar
    for video_path in tqdm(real_video_paths, desc="Processing Real Videos"):
        filename = os.path.basename(video_path)
        
        # Decide if it's train or test
        if filename in test_set:
            output_dir = config.TEST_REAL_DIR
        else:
            output_dir = config.TRAIN_REAL_DIR
            
        # Extract faces
        extract_faces(video_path, output_dir, os.path.splitext(filename)[0], detector)

    # 2. --- Process FAKE Videos ---
    fake_video_paths = []
    for filename in os.listdir(config.CELEB_FAKE_DIR):
        if filename.endswith('.mp4'):
            fake_video_paths.append(os.path.join(config.CELEB_FAKE_DIR, filename))
            
    print(f"\nFound {len(fake_video_paths)} fake videos. Processing...")
    
    # Use tqdm for a nice progress bar
    for video_path in tqdm(fake_video_paths, desc="Processing Fake Videos"):
        filename = os.path.basename(video_path)
        
        # Decide if it's train or test
        if filename in test_set:
            output_dir = config.TEST_FAKE_DIR
        else:
            output_dir = config.TRAIN_FAKE_DIR
            
        # Extract faces
        extract_faces(video_path, output_dir, os.path.splitext(filename)[0], detector)

#
# This is the "entry point" of our script
#
if __name__ == "__main__":
    print("--- DeepFake Detector: Data Preprocessing ---")
    
    # 1. Create all output folders
    create_directories()
    
    # 2. Initialize the MTCNN detector
    # We initialize it ONCE here and pass it to the functions.
    # This is much more efficient than creating one for every video.
    print("Initializing MTCNN face detector (this may take a moment)...")
    try:
        mtcnn_detector = MTCNN()
        print("MTCNN detector initialized.")
    except Exception as e:
        print(f"Fatal Error: Could not initialize MTCNN.")
        print("Please ensure TensorFlow is installed correctly.")
        print(f"Error details: {e}")
        sys.exit(1)

    # 3. Run the main processing loop
    process_all_videos(mtcnn_detector)
    
    print("\n--- Preprocessing Complete! ---")
    print(f"Your processed frames are now in: {config.PROCESSED_DATA_DIR}")