Spaces:
Sleeping
Sleeping
File size: 6,823 Bytes
45742a7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 | import os
import cv2
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing import image
import pandas as pd
import random
# Import our settings (SEQUENCE_LENGTH, TARGET_IMAGE_SIZE)
try:
import config
except ImportError:
print("Error: Could not import config.py. Make sure it's in the src/ directory.")
exit(1)
class VideoDataGenerator(tf.keras.utils.Sequence):
"""
This is a Keras data generator for loading video sequences.
It loads pre-processed sequences of frames from disk,
preprocesses them, and yields them in batches for model training/testing.
"""
def __init__(self, data_dir, batch_size, sequence_length,
img_size, shuffle=True):
"""
Initialization
:param data_dir: Path to the directory (e.g., .../train)
:param batch_size: Size of each batch
:param sequence_length: Number of frames per video
:param img_size: Target image size (e.g., 299)
:param shuffle: Whether to shuffle data at each epoch
"""
self.data_dir = data_dir
self.batch_size = batch_size
self.sequence_length = sequence_length
self.img_size = img_size
self.shuffle = shuffle
# Build the dataframe that maps video folders to labels
self.df = self.__create_video_dataframe()
# This will be called at the end of each epoch
self.on_epoch_end()
def __create_video_dataframe(self):
"""
Scans the data_dir and creates a DataFrame of
[video_folder_path, label].
"""
print(f"Creating video dataframe from: {self.data_dir}")
video_paths = []
labels = []
# Walk through the data directory (train or test)
for label in ['real', 'fake']:
class_dir = os.path.join(self.data_dir, label)
if not os.path.exists(class_dir):
print(f"Warning: Directory not found: {class_dir}")
continue
# Each folder inside is a video
for video_folder in os.listdir(class_dir):
video_folder_path = os.path.join(class_dir, video_folder)
if os.path.isdir(video_folder_path):
video_paths.append(video_folder_path)
labels.append(label)
print(f"Found {len(video_paths)} videos.")
return pd.DataFrame({'filepath': video_paths, 'label': labels})
def __len__(self):
"""Returns the number of batches per epoch."""
return int(np.floor(len(self.df) / self.batch_size))
def __getitem__(self, index):
"""Generates one batch of data."""
# Get the video_folder_paths for this batch
start_idx = index * self.batch_size
end_idx = (index + 1) * self.batch_size
batch_df = self.df.iloc[start_idx:end_idx]
# --- Initialize our batch arrays ---
# X is our video data
# (batch, seq_len, height, width, channels)
X = np.empty((self.batch_size, self.sequence_length, self.img_size, self.img_size, 3), dtype=np.float32)
# y is our labels
y = np.empty((self.batch_size), dtype=int)
# We must use 'enumerate' to get a batch index from 0 to 15 for a batch size of 16
for i, (original_index, row) in enumerate(batch_df.iterrows()):
# Get the path to the video folder
video_folder_path = row['filepath']
# Load the sequence of frames
# 'i' is now our clean batch index (0, 1, 2...)
X[i,] = self.__load_video_frames(video_folder_path)
# Store the label (0 for fake, 1 for real)
y[i] = 1 if row['label'] == 'real' else 0
return X, y
def on_epoch_end(self):
"""Shuffles the DataFrame at the end of each epoch."""
if self.shuffle:
self.df = self.df.sample(frac=1).reset_index(drop=True)
def __load_video_frames(self, video_folder_path):
"""
Loads, preprocesses, and returns a sequence of frames
from a single video folder.
"""
# Initialize an empty array for this video's frames
video_frames = np.empty((self.sequence_length, self.img_size, self.img_size, 3), dtype=np.float32)
# Get all frame paths in the folder and sort them
all_frames = sorted([f for f in os.listdir(video_folder_path) if f.endswith('.jpg')])
# Select 'sequence_length' frames
if len(all_frames) >= self.sequence_length:
# If we have enough frames, take the first 'n'
frames_to_load = all_frames[:self.sequence_length]
else:
# If video is shorter (shouldn't happen with our pre-processing)
# We'll just use the frames we have and pad with zeros
frames_to_load = all_frames
print(f"Warning: Video folder {video_folder_path} has < {self.sequence_length} frames.")
for i, frame_name in enumerate(frames_to_load):
frame_path = os.path.join(video_folder_path, frame_name)
# Load the image
# The frames were already resized during pre-processing,
# but this is a good safety check.
img = image.load_img(frame_path, target_size=(self.img_size, self.img_size))
img_array = image.img_to_array(img)
# Normalize (rescale)
img_array = img_array / 255.0
video_frames[i] = img_array
# (If we didn't have enough frames, the remaining entries
# in 'video_frames' will be all zeros, which is fine)
return video_frames
if __name__ == "__main__":
# --- This is a quick test to see if the generator works ---
print("Running a quick test of the VideoDataGenerator...")
# We'll use the new path from our config file
test_gen = VideoDataGenerator(
data_dir=config.TRAIN_SEQ_DIR,
batch_size=4, # Small batch for testing
sequence_length=config.SEQUENCE_LENGTH,
img_size=config.TARGET_IMAGE_SIZE
)
# Try to get one batch
try:
X_batch, y_batch = test_gen[0]
print("\n--- Generator Test SUCCESS ---")
print(f"X batch shape: {X_batch.shape}")
print(f"y batch shape: {y_batch.shape}")
print(f"X batch data type: {X_batch.dtype}")
print(f"Labels in batch: {y_batch}")
except Exception as e:
print(f"\n--- Generator Test FAILED ---")
print(f"Error: {e}")
print("Please check that your 'processed_sequences/train' directory is not empty and has the correct structure.") |