Spaces:
Sleeping
Sleeping
File size: 10,010 Bytes
1d2e071 a97adf2 1d2e071 a97adf2 1d2e071 a97adf2 1d2e071 a97adf2 1d2e071 a97adf2 1d2e071 a97adf2 1d2e071 a97adf2 1d2e071 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 |
import torch
from torch.utils.data import Dataset
from torchvision import transforms
import cv2
import numpy as np
from typing import List, Generator, Tuple
import os
import base64
# Image preprocessing parameters
IM_SIZE = 112
MEAN = [0.485, 0.456, 0.406]
STD = [0.229, 0.224, 0.225]
# Transform pipeline for video frames
train_transforms = transforms.Compose([
transforms.ToPILImage(),
transforms.Resize((IM_SIZE, IM_SIZE)),
transforms.ToTensor(),
transforms.Normalize(MEAN, STD)
])
# OpenCV DNN face detector (lightweight, no dlib needed)
# Using OpenCV's built-in DNN face detector
_face_detector = None
def get_face_detector():
"""
Get or initialize the OpenCV DNN face detector.
Uses OpenCV's built-in Caffe model for face detection.
"""
global _face_detector
if _face_detector is None:
# Use OpenCV's built-in Haar Cascade as fallback (always available)
cascade_path = cv2.data.haarcascades + 'haarcascade_frontalface_default.xml'
_face_detector = cv2.CascadeClassifier(cascade_path)
return _face_detector
def detect_faces_opencv(frame: np.ndarray) -> List[Tuple[int, int, int, int]]:
"""
Detect faces using OpenCV's Haar Cascade detector.
Args:
frame: RGB image as numpy array
Returns:
List of face locations as (top, right, bottom, left) tuples
(same format as face_recognition library for compatibility)
"""
detector = get_face_detector()
# Convert to grayscale for Haar cascade
gray = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
# Detect faces
faces = detector.detectMultiScale(
gray,
scaleFactor=1.1,
minNeighbors=5,
minSize=(30, 30),
flags=cv2.CASCADE_SCALE_IMAGE
)
# Convert from (x, y, w, h) to (top, right, bottom, left) format
face_locations = []
for (x, y, w, h) in faces:
top = y
right = x + w
bottom = y + h
left = x
face_locations.append((top, right, bottom, left))
return face_locations
class ValidationDataset(Dataset):
"""
Dataset for processing a single video file for validation/prediction.
Extracts frames, detects faces, and applies transformations.
"""
def __init__(self, video_path: str, sequence_length: int = 60, transform=None):
self.video_path = video_path
self.transform = transform if transform else train_transforms
self.sequence_length = sequence_length
def __len__(self):
return 1 # Single video
def __getitem__(self, idx):
frames = []
# Extract frames from video
for i, frame in enumerate(self.frame_extract(self.video_path)):
# Convert BGR to RGB
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Detect face in frame using OpenCV
faces = detect_faces_opencv(rgb_frame)
try:
top, right, bottom, left = faces[0]
frame = rgb_frame[top:bottom, left:right, :]
except (IndexError, ValueError):
# No face detected, use full frame
frame = rgb_frame
frames.append(self.transform(frame))
if len(frames) == self.sequence_length:
break
# If not enough frames, repeat the last frame
if len(frames) < self.sequence_length:
last_frame = frames[-1] if frames else torch.zeros(3, IM_SIZE, IM_SIZE)
while len(frames) < self.sequence_length:
frames.append(last_frame)
frames = torch.stack(frames)
frames = frames[:self.sequence_length]
return frames.unsqueeze(0)
def frame_extract(self, path: str) -> Generator[np.ndarray, None, None]:
"""Extract frames from video file"""
vidObj = cv2.VideoCapture(path)
success = True
while success:
success, image = vidObj.read()
if success:
yield image
vidObj.release()
def preprocess_video(
video_path: str,
sequence_length: int,
save_preprocessed: bool = False,
output_dir: str = "temp_frames"
) -> tuple:
"""
Preprocess video for model prediction.
Args:
video_path: Path to the video file
sequence_length: Number of frames to extract
save_preprocessed: Whether to save preprocessed images
output_dir: Directory to save preprocessed images
Returns:
Tuple of (preprocessed_tensor, preprocessed_images_list, face_cropped_images_list, faces_found)
"""
preprocessed_images = []
face_cropped_images = []
# Create output directory if saving images
if save_preprocessed and not os.path.exists(output_dir):
os.makedirs(output_dir)
# Read video
cap = cv2.VideoCapture(video_path)
frames = []
while cap.isOpened():
ret, frame = cap.read()
if ret:
frames.append(frame)
else:
break
cap.release()
print(f"Total frames extracted: {len(frames)}")
# Process frames
padding = 40
faces_found = 0
processed_frames = []
for i in range(min(sequence_length, len(frames))):
frame = frames[i]
# Convert BGR to RGB
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Save preprocessed image if requested
if save_preprocessed:
preprocessed_path = os.path.join(output_dir, f"frame_{i+1}.png")
cv2.imwrite(preprocessed_path, cv2.cvtColor(rgb_frame, cv2.COLOR_RGB2BGR))
preprocessed_images.append(preprocessed_path)
# Face detection using OpenCV (much lighter than dlib/face_recognition)
# Using scaled frame for faster detection
scale_factor = 0.5 # Less aggressive scaling since Haar is already fast
small_frame = cv2.resize(rgb_frame, (0, 0), fx=scale_factor, fy=scale_factor)
# Detect faces on the smaller frame
face_locations_small = detect_faces_opencv(small_frame)
if len(face_locations_small) > 0:
# Scale bounding box back to original resolution
top_small, right_small, bottom_small, left_small = face_locations_small[0]
top = int(top_small / scale_factor)
right = int(right_small / scale_factor)
bottom = int(bottom_small / scale_factor)
left = int(left_small / scale_factor)
# Apply padding (on original resolution coordinates)
top = max(0, top - padding)
bottom = min(rgb_frame.shape[0], bottom + padding)
left = max(0, left - padding)
right = min(rgb_frame.shape[1], right + padding)
# Crop face from ORIGINAL full-resolution frame
frame_face = rgb_frame[top:bottom, left:right]
# Save cropped face if requested
if save_preprocessed:
face_path = os.path.join(output_dir, f"face_{i+1}.png")
cv2.imwrite(face_path, cv2.cvtColor(frame_face, cv2.COLOR_RGB2BGR))
face_cropped_images.append(face_path)
# Create base64 encoded image for frontend display
# Resize to reasonable size for display (224x224)
display_face = cv2.resize(frame_face, (224, 224))
_, buffer = cv2.imencode('.jpg', cv2.cvtColor(display_face, cv2.COLOR_RGB2BGR), [cv2.IMWRITE_JPEG_QUALITY, 85])
base64_face = base64.b64encode(buffer).decode('utf-8')
face_cropped_images.append(f"data:image/jpeg;base64,{base64_face}")
faces_found += 1
processed_frame = frame_face
else:
# No face detected, use full frame
processed_frame = rgb_frame
# For display, resize full frame to show what was used
display_frame = cv2.resize(rgb_frame, (224, 224))
_, buffer = cv2.imencode('.jpg', cv2.cvtColor(display_frame, cv2.COLOR_RGB2BGR), [cv2.IMWRITE_JPEG_QUALITY, 85])
base64_frame = base64.b64encode(buffer).decode('utf-8')
face_cropped_images.append(f"data:image/jpeg;base64,{base64_frame}")
# Apply transforms
transformed_frame = train_transforms(processed_frame)
processed_frames.append(transformed_frame)
print(f"Faces detected: {faces_found}/{sequence_length}")
# Handle case where not enough frames
if len(processed_frames) < sequence_length:
last_frame = processed_frames[-1] if processed_frames else torch.zeros(3, IM_SIZE, IM_SIZE)
while len(processed_frames) < sequence_length:
processed_frames.append(last_frame)
# Stack frames into tensor
frames_tensor = torch.stack(processed_frames[:sequence_length])
frames_tensor = frames_tensor.unsqueeze(0) # Add batch dimension
return frames_tensor, preprocessed_images, face_cropped_images, faces_found
def predict(model, img_tensor, device: str = "cpu"):
"""
Make prediction on preprocessed video tensor.
Args:
model: Loaded PyTorch model
img_tensor: Preprocessed video tensor
device: 'cpu' or 'cuda'
Returns:
Tuple of (prediction, confidence)
prediction: 0 for FAKE, 1 for REAL
confidence: Confidence percentage (0-100)
"""
sm = torch.nn.Softmax(dim=1)
# Move tensor to device
if device == "cuda":
img_tensor = img_tensor.cuda()
else:
img_tensor = img_tensor.cpu()
# Forward pass
with torch.no_grad():
fmap, logits = model(img_tensor)
logits = sm(logits)
_, prediction = torch.max(logits, 1)
confidence = logits[0, int(prediction.item())].item() * 100
return int(prediction.item()), confidence
|