import torch from torch.utils.data import Dataset from torchvision import transforms import cv2 import numpy as np from typing import List, Generator, Tuple import os import base64 # Image preprocessing parameters IM_SIZE = 112 MEAN = [0.485, 0.456, 0.406] STD = [0.229, 0.224, 0.225] # Transform pipeline for video frames train_transforms = transforms.Compose([ transforms.ToPILImage(), transforms.Resize((IM_SIZE, IM_SIZE)), transforms.ToTensor(), transforms.Normalize(MEAN, STD) ]) # OpenCV DNN face detector (lightweight, no dlib needed) # Using OpenCV's built-in DNN face detector _face_detector = None def get_face_detector(): """ Get or initialize the OpenCV DNN face detector. Uses OpenCV's built-in Caffe model for face detection. """ global _face_detector if _face_detector is None: # Use OpenCV's built-in Haar Cascade as fallback (always available) cascade_path = cv2.data.haarcascades + 'haarcascade_frontalface_default.xml' _face_detector = cv2.CascadeClassifier(cascade_path) return _face_detector def detect_faces_opencv(frame: np.ndarray) -> List[Tuple[int, int, int, int]]: """ Detect faces using OpenCV's Haar Cascade detector. Args: frame: RGB image as numpy array Returns: List of face locations as (top, right, bottom, left) tuples (same format as face_recognition library for compatibility) """ detector = get_face_detector() # Convert to grayscale for Haar cascade gray = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY) # Detect faces faces = detector.detectMultiScale( gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30), flags=cv2.CASCADE_SCALE_IMAGE ) # Convert from (x, y, w, h) to (top, right, bottom, left) format face_locations = [] for (x, y, w, h) in faces: top = y right = x + w bottom = y + h left = x face_locations.append((top, right, bottom, left)) return face_locations class ValidationDataset(Dataset): """ Dataset for processing a single video file for validation/prediction. Extracts frames, detects faces, and applies transformations. """ def __init__(self, video_path: str, sequence_length: int = 60, transform=None): self.video_path = video_path self.transform = transform if transform else train_transforms self.sequence_length = sequence_length def __len__(self): return 1 # Single video def __getitem__(self, idx): frames = [] # Extract frames from video for i, frame in enumerate(self.frame_extract(self.video_path)): # Convert BGR to RGB rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # Detect face in frame using OpenCV faces = detect_faces_opencv(rgb_frame) try: top, right, bottom, left = faces[0] frame = rgb_frame[top:bottom, left:right, :] except (IndexError, ValueError): # No face detected, use full frame frame = rgb_frame frames.append(self.transform(frame)) if len(frames) == self.sequence_length: break # If not enough frames, repeat the last frame if len(frames) < self.sequence_length: last_frame = frames[-1] if frames else torch.zeros(3, IM_SIZE, IM_SIZE) while len(frames) < self.sequence_length: frames.append(last_frame) frames = torch.stack(frames) frames = frames[:self.sequence_length] return frames.unsqueeze(0) def frame_extract(self, path: str) -> Generator[np.ndarray, None, None]: """Extract frames from video file""" vidObj = cv2.VideoCapture(path) success = True while success: success, image = vidObj.read() if success: yield image vidObj.release() def preprocess_video( video_path: str, sequence_length: int, save_preprocessed: bool = False, output_dir: str = "temp_frames" ) -> tuple: """ Preprocess video for model prediction. Args: video_path: Path to the video file sequence_length: Number of frames to extract save_preprocessed: Whether to save preprocessed images output_dir: Directory to save preprocessed images Returns: Tuple of (preprocessed_tensor, preprocessed_images_list, face_cropped_images_list, faces_found) """ preprocessed_images = [] face_cropped_images = [] # Create output directory if saving images if save_preprocessed and not os.path.exists(output_dir): os.makedirs(output_dir) # Read video cap = cv2.VideoCapture(video_path) frames = [] while cap.isOpened(): ret, frame = cap.read() if ret: frames.append(frame) else: break cap.release() print(f"Total frames extracted: {len(frames)}") # Process frames padding = 40 faces_found = 0 processed_frames = [] for i in range(min(sequence_length, len(frames))): frame = frames[i] # Convert BGR to RGB rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # Save preprocessed image if requested if save_preprocessed: preprocessed_path = os.path.join(output_dir, f"frame_{i+1}.png") cv2.imwrite(preprocessed_path, cv2.cvtColor(rgb_frame, cv2.COLOR_RGB2BGR)) preprocessed_images.append(preprocessed_path) # Face detection using OpenCV (much lighter than dlib/face_recognition) # Using scaled frame for faster detection scale_factor = 0.5 # Less aggressive scaling since Haar is already fast small_frame = cv2.resize(rgb_frame, (0, 0), fx=scale_factor, fy=scale_factor) # Detect faces on the smaller frame face_locations_small = detect_faces_opencv(small_frame) if len(face_locations_small) > 0: # Scale bounding box back to original resolution top_small, right_small, bottom_small, left_small = face_locations_small[0] top = int(top_small / scale_factor) right = int(right_small / scale_factor) bottom = int(bottom_small / scale_factor) left = int(left_small / scale_factor) # Apply padding (on original resolution coordinates) top = max(0, top - padding) bottom = min(rgb_frame.shape[0], bottom + padding) left = max(0, left - padding) right = min(rgb_frame.shape[1], right + padding) # Crop face from ORIGINAL full-resolution frame frame_face = rgb_frame[top:bottom, left:right] # Save cropped face if requested if save_preprocessed: face_path = os.path.join(output_dir, f"face_{i+1}.png") cv2.imwrite(face_path, cv2.cvtColor(frame_face, cv2.COLOR_RGB2BGR)) face_cropped_images.append(face_path) # Create base64 encoded image for frontend display # Resize to reasonable size for display (224x224) display_face = cv2.resize(frame_face, (224, 224)) _, buffer = cv2.imencode('.jpg', cv2.cvtColor(display_face, cv2.COLOR_RGB2BGR), [cv2.IMWRITE_JPEG_QUALITY, 85]) base64_face = base64.b64encode(buffer).decode('utf-8') face_cropped_images.append(f"data:image/jpeg;base64,{base64_face}") faces_found += 1 processed_frame = frame_face else: # No face detected, use full frame processed_frame = rgb_frame # For display, resize full frame to show what was used display_frame = cv2.resize(rgb_frame, (224, 224)) _, buffer = cv2.imencode('.jpg', cv2.cvtColor(display_frame, cv2.COLOR_RGB2BGR), [cv2.IMWRITE_JPEG_QUALITY, 85]) base64_frame = base64.b64encode(buffer).decode('utf-8') face_cropped_images.append(f"data:image/jpeg;base64,{base64_frame}") # Apply transforms transformed_frame = train_transforms(processed_frame) processed_frames.append(transformed_frame) print(f"Faces detected: {faces_found}/{sequence_length}") # Handle case where not enough frames if len(processed_frames) < sequence_length: last_frame = processed_frames[-1] if processed_frames else torch.zeros(3, IM_SIZE, IM_SIZE) while len(processed_frames) < sequence_length: processed_frames.append(last_frame) # Stack frames into tensor frames_tensor = torch.stack(processed_frames[:sequence_length]) frames_tensor = frames_tensor.unsqueeze(0) # Add batch dimension return frames_tensor, preprocessed_images, face_cropped_images, faces_found def predict(model, img_tensor, device: str = "cpu"): """ Make prediction on preprocessed video tensor. Args: model: Loaded PyTorch model img_tensor: Preprocessed video tensor device: 'cpu' or 'cuda' Returns: Tuple of (prediction, confidence) prediction: 0 for FAKE, 1 for REAL confidence: Confidence percentage (0-100) """ sm = torch.nn.Softmax(dim=1) # Move tensor to device if device == "cuda": img_tensor = img_tensor.cuda() else: img_tensor = img_tensor.cpu() # Forward pass with torch.no_grad(): fmap, logits = model(img_tensor) logits = sm(logits) _, prediction = torch.max(logits, 1) confidence = logits[0, int(prediction.item())].item() * 100 return int(prediction.item()), confidence