deepfake-backend / preprocessing.py
Devanshu2025's picture
Replace dlib with OpenCV face detection, add HuggingFace model downloads
a97adf2
import torch
from torch.utils.data import Dataset
from torchvision import transforms
import cv2
import numpy as np
from typing import List, Generator, Tuple
import os
import base64
# Image preprocessing parameters
IM_SIZE = 112
MEAN = [0.485, 0.456, 0.406]
STD = [0.229, 0.224, 0.225]
# Transform pipeline for video frames
train_transforms = transforms.Compose([
transforms.ToPILImage(),
transforms.Resize((IM_SIZE, IM_SIZE)),
transforms.ToTensor(),
transforms.Normalize(MEAN, STD)
])
# OpenCV DNN face detector (lightweight, no dlib needed)
# Using OpenCV's built-in DNN face detector
_face_detector = None
def get_face_detector():
"""
Get or initialize the OpenCV DNN face detector.
Uses OpenCV's built-in Caffe model for face detection.
"""
global _face_detector
if _face_detector is None:
# Use OpenCV's built-in Haar Cascade as fallback (always available)
cascade_path = cv2.data.haarcascades + 'haarcascade_frontalface_default.xml'
_face_detector = cv2.CascadeClassifier(cascade_path)
return _face_detector
def detect_faces_opencv(frame: np.ndarray) -> List[Tuple[int, int, int, int]]:
"""
Detect faces using OpenCV's Haar Cascade detector.
Args:
frame: RGB image as numpy array
Returns:
List of face locations as (top, right, bottom, left) tuples
(same format as face_recognition library for compatibility)
"""
detector = get_face_detector()
# Convert to grayscale for Haar cascade
gray = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
# Detect faces
faces = detector.detectMultiScale(
gray,
scaleFactor=1.1,
minNeighbors=5,
minSize=(30, 30),
flags=cv2.CASCADE_SCALE_IMAGE
)
# Convert from (x, y, w, h) to (top, right, bottom, left) format
face_locations = []
for (x, y, w, h) in faces:
top = y
right = x + w
bottom = y + h
left = x
face_locations.append((top, right, bottom, left))
return face_locations
class ValidationDataset(Dataset):
"""
Dataset for processing a single video file for validation/prediction.
Extracts frames, detects faces, and applies transformations.
"""
def __init__(self, video_path: str, sequence_length: int = 60, transform=None):
self.video_path = video_path
self.transform = transform if transform else train_transforms
self.sequence_length = sequence_length
def __len__(self):
return 1 # Single video
def __getitem__(self, idx):
frames = []
# Extract frames from video
for i, frame in enumerate(self.frame_extract(self.video_path)):
# Convert BGR to RGB
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Detect face in frame using OpenCV
faces = detect_faces_opencv(rgb_frame)
try:
top, right, bottom, left = faces[0]
frame = rgb_frame[top:bottom, left:right, :]
except (IndexError, ValueError):
# No face detected, use full frame
frame = rgb_frame
frames.append(self.transform(frame))
if len(frames) == self.sequence_length:
break
# If not enough frames, repeat the last frame
if len(frames) < self.sequence_length:
last_frame = frames[-1] if frames else torch.zeros(3, IM_SIZE, IM_SIZE)
while len(frames) < self.sequence_length:
frames.append(last_frame)
frames = torch.stack(frames)
frames = frames[:self.sequence_length]
return frames.unsqueeze(0)
def frame_extract(self, path: str) -> Generator[np.ndarray, None, None]:
"""Extract frames from video file"""
vidObj = cv2.VideoCapture(path)
success = True
while success:
success, image = vidObj.read()
if success:
yield image
vidObj.release()
def preprocess_video(
video_path: str,
sequence_length: int,
save_preprocessed: bool = False,
output_dir: str = "temp_frames"
) -> tuple:
"""
Preprocess video for model prediction.
Args:
video_path: Path to the video file
sequence_length: Number of frames to extract
save_preprocessed: Whether to save preprocessed images
output_dir: Directory to save preprocessed images
Returns:
Tuple of (preprocessed_tensor, preprocessed_images_list, face_cropped_images_list, faces_found)
"""
preprocessed_images = []
face_cropped_images = []
# Create output directory if saving images
if save_preprocessed and not os.path.exists(output_dir):
os.makedirs(output_dir)
# Read video
cap = cv2.VideoCapture(video_path)
frames = []
while cap.isOpened():
ret, frame = cap.read()
if ret:
frames.append(frame)
else:
break
cap.release()
print(f"Total frames extracted: {len(frames)}")
# Process frames
padding = 40
faces_found = 0
processed_frames = []
for i in range(min(sequence_length, len(frames))):
frame = frames[i]
# Convert BGR to RGB
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Save preprocessed image if requested
if save_preprocessed:
preprocessed_path = os.path.join(output_dir, f"frame_{i+1}.png")
cv2.imwrite(preprocessed_path, cv2.cvtColor(rgb_frame, cv2.COLOR_RGB2BGR))
preprocessed_images.append(preprocessed_path)
# Face detection using OpenCV (much lighter than dlib/face_recognition)
# Using scaled frame for faster detection
scale_factor = 0.5 # Less aggressive scaling since Haar is already fast
small_frame = cv2.resize(rgb_frame, (0, 0), fx=scale_factor, fy=scale_factor)
# Detect faces on the smaller frame
face_locations_small = detect_faces_opencv(small_frame)
if len(face_locations_small) > 0:
# Scale bounding box back to original resolution
top_small, right_small, bottom_small, left_small = face_locations_small[0]
top = int(top_small / scale_factor)
right = int(right_small / scale_factor)
bottom = int(bottom_small / scale_factor)
left = int(left_small / scale_factor)
# Apply padding (on original resolution coordinates)
top = max(0, top - padding)
bottom = min(rgb_frame.shape[0], bottom + padding)
left = max(0, left - padding)
right = min(rgb_frame.shape[1], right + padding)
# Crop face from ORIGINAL full-resolution frame
frame_face = rgb_frame[top:bottom, left:right]
# Save cropped face if requested
if save_preprocessed:
face_path = os.path.join(output_dir, f"face_{i+1}.png")
cv2.imwrite(face_path, cv2.cvtColor(frame_face, cv2.COLOR_RGB2BGR))
face_cropped_images.append(face_path)
# Create base64 encoded image for frontend display
# Resize to reasonable size for display (224x224)
display_face = cv2.resize(frame_face, (224, 224))
_, buffer = cv2.imencode('.jpg', cv2.cvtColor(display_face, cv2.COLOR_RGB2BGR), [cv2.IMWRITE_JPEG_QUALITY, 85])
base64_face = base64.b64encode(buffer).decode('utf-8')
face_cropped_images.append(f"data:image/jpeg;base64,{base64_face}")
faces_found += 1
processed_frame = frame_face
else:
# No face detected, use full frame
processed_frame = rgb_frame
# For display, resize full frame to show what was used
display_frame = cv2.resize(rgb_frame, (224, 224))
_, buffer = cv2.imencode('.jpg', cv2.cvtColor(display_frame, cv2.COLOR_RGB2BGR), [cv2.IMWRITE_JPEG_QUALITY, 85])
base64_frame = base64.b64encode(buffer).decode('utf-8')
face_cropped_images.append(f"data:image/jpeg;base64,{base64_frame}")
# Apply transforms
transformed_frame = train_transforms(processed_frame)
processed_frames.append(transformed_frame)
print(f"Faces detected: {faces_found}/{sequence_length}")
# Handle case where not enough frames
if len(processed_frames) < sequence_length:
last_frame = processed_frames[-1] if processed_frames else torch.zeros(3, IM_SIZE, IM_SIZE)
while len(processed_frames) < sequence_length:
processed_frames.append(last_frame)
# Stack frames into tensor
frames_tensor = torch.stack(processed_frames[:sequence_length])
frames_tensor = frames_tensor.unsqueeze(0) # Add batch dimension
return frames_tensor, preprocessed_images, face_cropped_images, faces_found
def predict(model, img_tensor, device: str = "cpu"):
"""
Make prediction on preprocessed video tensor.
Args:
model: Loaded PyTorch model
img_tensor: Preprocessed video tensor
device: 'cpu' or 'cuda'
Returns:
Tuple of (prediction, confidence)
prediction: 0 for FAKE, 1 for REAL
confidence: Confidence percentage (0-100)
"""
sm = torch.nn.Softmax(dim=1)
# Move tensor to device
if device == "cuda":
img_tensor = img_tensor.cuda()
else:
img_tensor = img_tensor.cpu()
# Forward pass
with torch.no_grad():
fmap, logits = model(img_tensor)
logits = sm(logits)
_, prediction = torch.max(logits, 1)
confidence = logits[0, int(prediction.item())].item() * 100
return int(prediction.item()), confidence