File size: 10,010 Bytes
1d2e071
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a97adf2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1d2e071
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a97adf2
 
 
 
 
1d2e071
 
a97adf2
1d2e071
 
a97adf2
1d2e071
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a97adf2
1d2e071
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a97adf2
 
 
1d2e071
 
 
a97adf2
1d2e071
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
import torch
from torch.utils.data import Dataset
from torchvision import transforms
import cv2
import numpy as np
from typing import List, Generator, Tuple
import os
import base64


# Image preprocessing parameters
IM_SIZE = 112
MEAN = [0.485, 0.456, 0.406]
STD = [0.229, 0.224, 0.225]

# Transform pipeline for video frames
train_transforms = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((IM_SIZE, IM_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(MEAN, STD)
])

# OpenCV DNN face detector (lightweight, no dlib needed)
# Using OpenCV's built-in DNN face detector
_face_detector = None

def get_face_detector():
    """
    Get or initialize the OpenCV DNN face detector.
    Uses OpenCV's built-in Caffe model for face detection.
    """
    global _face_detector
    if _face_detector is None:
        # Use OpenCV's built-in Haar Cascade as fallback (always available)
        cascade_path = cv2.data.haarcascades + 'haarcascade_frontalface_default.xml'
        _face_detector = cv2.CascadeClassifier(cascade_path)
    return _face_detector


def detect_faces_opencv(frame: np.ndarray) -> List[Tuple[int, int, int, int]]:
    """
    Detect faces using OpenCV's Haar Cascade detector.
    
    Args:
        frame: RGB image as numpy array
        
    Returns:
        List of face locations as (top, right, bottom, left) tuples
        (same format as face_recognition library for compatibility)
    """
    detector = get_face_detector()
    
    # Convert to grayscale for Haar cascade
    gray = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
    
    # Detect faces
    faces = detector.detectMultiScale(
        gray,
        scaleFactor=1.1,
        minNeighbors=5,
        minSize=(30, 30),
        flags=cv2.CASCADE_SCALE_IMAGE
    )
    
    # Convert from (x, y, w, h) to (top, right, bottom, left) format
    face_locations = []
    for (x, y, w, h) in faces:
        top = y
        right = x + w
        bottom = y + h
        left = x
        face_locations.append((top, right, bottom, left))
    
    return face_locations


class ValidationDataset(Dataset):
    """
    Dataset for processing a single video file for validation/prediction.
    Extracts frames, detects faces, and applies transformations.
    """
    
    def __init__(self, video_path: str, sequence_length: int = 60, transform=None):
        self.video_path = video_path
        self.transform = transform if transform else train_transforms
        self.sequence_length = sequence_length
    
    def __len__(self):
        return 1  # Single video
    
    def __getitem__(self, idx):
        frames = []
        
        # Extract frames from video
        for i, frame in enumerate(self.frame_extract(self.video_path)):
            # Convert BGR to RGB
            rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            
            # Detect face in frame using OpenCV
            faces = detect_faces_opencv(rgb_frame)
            try:
                top, right, bottom, left = faces[0]
                frame = rgb_frame[top:bottom, left:right, :]
            except (IndexError, ValueError):
                # No face detected, use full frame
                frame = rgb_frame
            
            frames.append(self.transform(frame))
            
            if len(frames) == self.sequence_length:
                break
        
        # If not enough frames, repeat the last frame
        if len(frames) < self.sequence_length:
            last_frame = frames[-1] if frames else torch.zeros(3, IM_SIZE, IM_SIZE)
            while len(frames) < self.sequence_length:
                frames.append(last_frame)
        
        frames = torch.stack(frames)
        frames = frames[:self.sequence_length]
        return frames.unsqueeze(0)
    
    def frame_extract(self, path: str) -> Generator[np.ndarray, None, None]:
        """Extract frames from video file"""
        vidObj = cv2.VideoCapture(path)
        success = True
        while success:
            success, image = vidObj.read()
            if success:
                yield image
        vidObj.release()


def preprocess_video(
    video_path: str,
    sequence_length: int,
    save_preprocessed: bool = False,
    output_dir: str = "temp_frames"
) -> tuple:
    """
    Preprocess video for model prediction.
    
    Args:
        video_path: Path to the video file
        sequence_length: Number of frames to extract
        save_preprocessed: Whether to save preprocessed images
        output_dir: Directory to save preprocessed images
    
    Returns:
        Tuple of (preprocessed_tensor, preprocessed_images_list, face_cropped_images_list, faces_found)
    """
    preprocessed_images = []
    face_cropped_images = []
    
    # Create output directory if saving images
    if save_preprocessed and not os.path.exists(output_dir):
        os.makedirs(output_dir)
    
    # Read video
    cap = cv2.VideoCapture(video_path)
    frames = []
    while cap.isOpened():
        ret, frame = cap.read()
        if ret:
            frames.append(frame)
        else:
            break
    cap.release()
    
    print(f"Total frames extracted: {len(frames)}")
    
    # Process frames
    padding = 40
    faces_found = 0
    processed_frames = []
    
    for i in range(min(sequence_length, len(frames))):
        frame = frames[i]
        
        # Convert BGR to RGB
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        
        # Save preprocessed image if requested
        if save_preprocessed:
            preprocessed_path = os.path.join(output_dir, f"frame_{i+1}.png")
            cv2.imwrite(preprocessed_path, cv2.cvtColor(rgb_frame, cv2.COLOR_RGB2BGR))
            preprocessed_images.append(preprocessed_path)
        
        # Face detection using OpenCV (much lighter than dlib/face_recognition)
        # Using scaled frame for faster detection
        scale_factor = 0.5  # Less aggressive scaling since Haar is already fast
        small_frame = cv2.resize(rgb_frame, (0, 0), fx=scale_factor, fy=scale_factor)
        
        # Detect faces on the smaller frame
        face_locations_small = detect_faces_opencv(small_frame)
        
        if len(face_locations_small) > 0:
            # Scale bounding box back to original resolution
            top_small, right_small, bottom_small, left_small = face_locations_small[0]
            top = int(top_small / scale_factor)
            right = int(right_small / scale_factor)
            bottom = int(bottom_small / scale_factor)
            left = int(left_small / scale_factor)
            
            # Apply padding (on original resolution coordinates)
            top = max(0, top - padding)
            bottom = min(rgb_frame.shape[0], bottom + padding)
            left = max(0, left - padding)
            right = min(rgb_frame.shape[1], right + padding)
            
            # Crop face from ORIGINAL full-resolution frame
            frame_face = rgb_frame[top:bottom, left:right]
            
            # Save cropped face if requested
            if save_preprocessed:
                face_path = os.path.join(output_dir, f"face_{i+1}.png")
                cv2.imwrite(face_path, cv2.cvtColor(frame_face, cv2.COLOR_RGB2BGR))
                face_cropped_images.append(face_path)
            
            # Create base64 encoded image for frontend display
            # Resize to reasonable size for display (224x224)
            display_face = cv2.resize(frame_face, (224, 224))
            _, buffer = cv2.imencode('.jpg', cv2.cvtColor(display_face, cv2.COLOR_RGB2BGR), [cv2.IMWRITE_JPEG_QUALITY, 85])
            base64_face = base64.b64encode(buffer).decode('utf-8')
            face_cropped_images.append(f"data:image/jpeg;base64,{base64_face}")
            
            faces_found += 1
            processed_frame = frame_face
        else:
            # No face detected, use full frame
            processed_frame = rgb_frame
            
            # For display, resize full frame to show what was used
            display_frame = cv2.resize(rgb_frame, (224, 224))
            _, buffer = cv2.imencode('.jpg', cv2.cvtColor(display_frame, cv2.COLOR_RGB2BGR), [cv2.IMWRITE_JPEG_QUALITY, 85])
            base64_frame = base64.b64encode(buffer).decode('utf-8')
            face_cropped_images.append(f"data:image/jpeg;base64,{base64_frame}")
        
        # Apply transforms
        transformed_frame = train_transforms(processed_frame)
        processed_frames.append(transformed_frame)
    
    print(f"Faces detected: {faces_found}/{sequence_length}")
    
    # Handle case where not enough frames
    if len(processed_frames) < sequence_length:
        last_frame = processed_frames[-1] if processed_frames else torch.zeros(3, IM_SIZE, IM_SIZE)
        while len(processed_frames) < sequence_length:
            processed_frames.append(last_frame)
    
    # Stack frames into tensor
    frames_tensor = torch.stack(processed_frames[:sequence_length])
    frames_tensor = frames_tensor.unsqueeze(0)  # Add batch dimension
    
    return frames_tensor, preprocessed_images, face_cropped_images, faces_found


def predict(model, img_tensor, device: str = "cpu"):
    """
    Make prediction on preprocessed video tensor.
    
    Args:
        model: Loaded PyTorch model
        img_tensor: Preprocessed video tensor
        device: 'cpu' or 'cuda'
    
    Returns:
        Tuple of (prediction, confidence)
        prediction: 0 for FAKE, 1 for REAL
        confidence: Confidence percentage (0-100)
    """
    sm = torch.nn.Softmax(dim=1)
    
    # Move tensor to device
    if device == "cuda":
        img_tensor = img_tensor.cuda()
    else:
        img_tensor = img_tensor.cpu()
    
    # Forward pass
    with torch.no_grad():
        fmap, logits = model(img_tensor)
        logits = sm(logits)
        _, prediction = torch.max(logits, 1)
        confidence = logits[0, int(prediction.item())].item() * 100
    
    return int(prediction.item()), confidence