Spaces:
Sleeping
Sleeping
| import torch | |
| from torch.utils.data import Dataset | |
| from torchvision import transforms | |
| import cv2 | |
| import numpy as np | |
| from typing import List, Generator, Tuple | |
| import os | |
| import base64 | |
| # Image preprocessing parameters | |
| IM_SIZE = 112 | |
| MEAN = [0.485, 0.456, 0.406] | |
| STD = [0.229, 0.224, 0.225] | |
| # Transform pipeline for video frames | |
| train_transforms = transforms.Compose([ | |
| transforms.ToPILImage(), | |
| transforms.Resize((IM_SIZE, IM_SIZE)), | |
| transforms.ToTensor(), | |
| transforms.Normalize(MEAN, STD) | |
| ]) | |
| # OpenCV DNN face detector (lightweight, no dlib needed) | |
| # Using OpenCV's built-in DNN face detector | |
| _face_detector = None | |
| def get_face_detector(): | |
| """ | |
| Get or initialize the OpenCV DNN face detector. | |
| Uses OpenCV's built-in Caffe model for face detection. | |
| """ | |
| global _face_detector | |
| if _face_detector is None: | |
| # Use OpenCV's built-in Haar Cascade as fallback (always available) | |
| cascade_path = cv2.data.haarcascades + 'haarcascade_frontalface_default.xml' | |
| _face_detector = cv2.CascadeClassifier(cascade_path) | |
| return _face_detector | |
| def detect_faces_opencv(frame: np.ndarray) -> List[Tuple[int, int, int, int]]: | |
| """ | |
| Detect faces using OpenCV's Haar Cascade detector. | |
| Args: | |
| frame: RGB image as numpy array | |
| Returns: | |
| List of face locations as (top, right, bottom, left) tuples | |
| (same format as face_recognition library for compatibility) | |
| """ | |
| detector = get_face_detector() | |
| # Convert to grayscale for Haar cascade | |
| gray = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY) | |
| # Detect faces | |
| faces = detector.detectMultiScale( | |
| gray, | |
| scaleFactor=1.1, | |
| minNeighbors=5, | |
| minSize=(30, 30), | |
| flags=cv2.CASCADE_SCALE_IMAGE | |
| ) | |
| # Convert from (x, y, w, h) to (top, right, bottom, left) format | |
| face_locations = [] | |
| for (x, y, w, h) in faces: | |
| top = y | |
| right = x + w | |
| bottom = y + h | |
| left = x | |
| face_locations.append((top, right, bottom, left)) | |
| return face_locations | |
| class ValidationDataset(Dataset): | |
| """ | |
| Dataset for processing a single video file for validation/prediction. | |
| Extracts frames, detects faces, and applies transformations. | |
| """ | |
| def __init__(self, video_path: str, sequence_length: int = 60, transform=None): | |
| self.video_path = video_path | |
| self.transform = transform if transform else train_transforms | |
| self.sequence_length = sequence_length | |
| def __len__(self): | |
| return 1 # Single video | |
| def __getitem__(self, idx): | |
| frames = [] | |
| # Extract frames from video | |
| for i, frame in enumerate(self.frame_extract(self.video_path)): | |
| # Convert BGR to RGB | |
| rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| # Detect face in frame using OpenCV | |
| faces = detect_faces_opencv(rgb_frame) | |
| try: | |
| top, right, bottom, left = faces[0] | |
| frame = rgb_frame[top:bottom, left:right, :] | |
| except (IndexError, ValueError): | |
| # No face detected, use full frame | |
| frame = rgb_frame | |
| frames.append(self.transform(frame)) | |
| if len(frames) == self.sequence_length: | |
| break | |
| # If not enough frames, repeat the last frame | |
| if len(frames) < self.sequence_length: | |
| last_frame = frames[-1] if frames else torch.zeros(3, IM_SIZE, IM_SIZE) | |
| while len(frames) < self.sequence_length: | |
| frames.append(last_frame) | |
| frames = torch.stack(frames) | |
| frames = frames[:self.sequence_length] | |
| return frames.unsqueeze(0) | |
| def frame_extract(self, path: str) -> Generator[np.ndarray, None, None]: | |
| """Extract frames from video file""" | |
| vidObj = cv2.VideoCapture(path) | |
| success = True | |
| while success: | |
| success, image = vidObj.read() | |
| if success: | |
| yield image | |
| vidObj.release() | |
| def preprocess_video( | |
| video_path: str, | |
| sequence_length: int, | |
| save_preprocessed: bool = False, | |
| output_dir: str = "temp_frames" | |
| ) -> tuple: | |
| """ | |
| Preprocess video for model prediction. | |
| Args: | |
| video_path: Path to the video file | |
| sequence_length: Number of frames to extract | |
| save_preprocessed: Whether to save preprocessed images | |
| output_dir: Directory to save preprocessed images | |
| Returns: | |
| Tuple of (preprocessed_tensor, preprocessed_images_list, face_cropped_images_list, faces_found) | |
| """ | |
| preprocessed_images = [] | |
| face_cropped_images = [] | |
| # Create output directory if saving images | |
| if save_preprocessed and not os.path.exists(output_dir): | |
| os.makedirs(output_dir) | |
| # Read video | |
| cap = cv2.VideoCapture(video_path) | |
| frames = [] | |
| while cap.isOpened(): | |
| ret, frame = cap.read() | |
| if ret: | |
| frames.append(frame) | |
| else: | |
| break | |
| cap.release() | |
| print(f"Total frames extracted: {len(frames)}") | |
| # Process frames | |
| padding = 40 | |
| faces_found = 0 | |
| processed_frames = [] | |
| for i in range(min(sequence_length, len(frames))): | |
| frame = frames[i] | |
| # Convert BGR to RGB | |
| rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| # Save preprocessed image if requested | |
| if save_preprocessed: | |
| preprocessed_path = os.path.join(output_dir, f"frame_{i+1}.png") | |
| cv2.imwrite(preprocessed_path, cv2.cvtColor(rgb_frame, cv2.COLOR_RGB2BGR)) | |
| preprocessed_images.append(preprocessed_path) | |
| # Face detection using OpenCV (much lighter than dlib/face_recognition) | |
| # Using scaled frame for faster detection | |
| scale_factor = 0.5 # Less aggressive scaling since Haar is already fast | |
| small_frame = cv2.resize(rgb_frame, (0, 0), fx=scale_factor, fy=scale_factor) | |
| # Detect faces on the smaller frame | |
| face_locations_small = detect_faces_opencv(small_frame) | |
| if len(face_locations_small) > 0: | |
| # Scale bounding box back to original resolution | |
| top_small, right_small, bottom_small, left_small = face_locations_small[0] | |
| top = int(top_small / scale_factor) | |
| right = int(right_small / scale_factor) | |
| bottom = int(bottom_small / scale_factor) | |
| left = int(left_small / scale_factor) | |
| # Apply padding (on original resolution coordinates) | |
| top = max(0, top - padding) | |
| bottom = min(rgb_frame.shape[0], bottom + padding) | |
| left = max(0, left - padding) | |
| right = min(rgb_frame.shape[1], right + padding) | |
| # Crop face from ORIGINAL full-resolution frame | |
| frame_face = rgb_frame[top:bottom, left:right] | |
| # Save cropped face if requested | |
| if save_preprocessed: | |
| face_path = os.path.join(output_dir, f"face_{i+1}.png") | |
| cv2.imwrite(face_path, cv2.cvtColor(frame_face, cv2.COLOR_RGB2BGR)) | |
| face_cropped_images.append(face_path) | |
| # Create base64 encoded image for frontend display | |
| # Resize to reasonable size for display (224x224) | |
| display_face = cv2.resize(frame_face, (224, 224)) | |
| _, buffer = cv2.imencode('.jpg', cv2.cvtColor(display_face, cv2.COLOR_RGB2BGR), [cv2.IMWRITE_JPEG_QUALITY, 85]) | |
| base64_face = base64.b64encode(buffer).decode('utf-8') | |
| face_cropped_images.append(f"data:image/jpeg;base64,{base64_face}") | |
| faces_found += 1 | |
| processed_frame = frame_face | |
| else: | |
| # No face detected, use full frame | |
| processed_frame = rgb_frame | |
| # For display, resize full frame to show what was used | |
| display_frame = cv2.resize(rgb_frame, (224, 224)) | |
| _, buffer = cv2.imencode('.jpg', cv2.cvtColor(display_frame, cv2.COLOR_RGB2BGR), [cv2.IMWRITE_JPEG_QUALITY, 85]) | |
| base64_frame = base64.b64encode(buffer).decode('utf-8') | |
| face_cropped_images.append(f"data:image/jpeg;base64,{base64_frame}") | |
| # Apply transforms | |
| transformed_frame = train_transforms(processed_frame) | |
| processed_frames.append(transformed_frame) | |
| print(f"Faces detected: {faces_found}/{sequence_length}") | |
| # Handle case where not enough frames | |
| if len(processed_frames) < sequence_length: | |
| last_frame = processed_frames[-1] if processed_frames else torch.zeros(3, IM_SIZE, IM_SIZE) | |
| while len(processed_frames) < sequence_length: | |
| processed_frames.append(last_frame) | |
| # Stack frames into tensor | |
| frames_tensor = torch.stack(processed_frames[:sequence_length]) | |
| frames_tensor = frames_tensor.unsqueeze(0) # Add batch dimension | |
| return frames_tensor, preprocessed_images, face_cropped_images, faces_found | |
| def predict(model, img_tensor, device: str = "cpu"): | |
| """ | |
| Make prediction on preprocessed video tensor. | |
| Args: | |
| model: Loaded PyTorch model | |
| img_tensor: Preprocessed video tensor | |
| device: 'cpu' or 'cuda' | |
| Returns: | |
| Tuple of (prediction, confidence) | |
| prediction: 0 for FAKE, 1 for REAL | |
| confidence: Confidence percentage (0-100) | |
| """ | |
| sm = torch.nn.Softmax(dim=1) | |
| # Move tensor to device | |
| if device == "cuda": | |
| img_tensor = img_tensor.cuda() | |
| else: | |
| img_tensor = img_tensor.cpu() | |
| # Forward pass | |
| with torch.no_grad(): | |
| fmap, logits = model(img_tensor) | |
| logits = sm(logits) | |
| _, prediction = torch.max(logits, 1) | |
| confidence = logits[0, int(prediction.item())].item() * 100 | |
| return int(prediction.item()), confidence | |