|
|
import cv2 |
|
|
import torch |
|
|
import numpy as np |
|
|
import os |
|
|
from PIL import Image |
|
|
|
|
|
def process_video(video_path, model, transform, device, frames_per_second=1): |
|
|
""" |
|
|
Process a video file frame-by-frame using the deepfake detection model. |
|
|
|
|
|
Args: |
|
|
video_path (str): Path to the video file. |
|
|
model (torch.nn.Module): Loaded PyTorch model. |
|
|
transform (callable): Albumentations transform pipeline. |
|
|
device (torch.device): Device to run inference on. |
|
|
frames_per_second (int): Number of frames to sample per second of video. |
|
|
Default is 1 to keep processing fast. |
|
|
|
|
|
Returns: |
|
|
dict: Aggregated results including verdict, average confidence, and frame-level details. |
|
|
""" |
|
|
if model is None: |
|
|
return {"error": "Model not loaded"} |
|
|
|
|
|
cap = cv2.VideoCapture(video_path) |
|
|
if not cap.isOpened(): |
|
|
return {"error": "Could not open video file"} |
|
|
|
|
|
|
|
|
fps = cap.get(cv2.CAP_PROP_FPS) |
|
|
if fps <= 0: fps = 30 |
|
|
|
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
duration = total_frames / fps |
|
|
|
|
|
|
|
|
|
|
|
step = int(fps / frames_per_second) |
|
|
if step < 1: step = 1 |
|
|
|
|
|
frame_indices = [] |
|
|
probs = [] |
|
|
|
|
|
print(f"Processing video: {video_path}") |
|
|
print(f"Duration: {duration:.2f}s, FPS: {fps}, Total Frames: {total_frames}") |
|
|
print(f"Sampling every {step} frames...") |
|
|
|
|
|
count = 0 |
|
|
processed_count = 0 |
|
|
|
|
|
suspicious_frames = [] |
|
|
|
|
|
while cap.isOpened(): |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
|
|
|
if count % step == 0: |
|
|
|
|
|
try: |
|
|
|
|
|
image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
|
|
|
|
|
|
|
|
|
|
if not hasattr(process_video, "face_cascade"): |
|
|
try: |
|
|
cascade_path = cv2.data.haarcascades + 'haarcascade_frontalface_default.xml' |
|
|
process_video.face_cascade = cv2.CascadeClassifier(cascade_path) |
|
|
except: |
|
|
process_video.face_cascade = None |
|
|
|
|
|
face_crop = None |
|
|
if process_video.face_cascade: |
|
|
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) |
|
|
faces = process_video.face_cascade.detectMultiScale( |
|
|
gray, scaleFactor=1.1, minNeighbors=5, minSize=(60, 60) |
|
|
) |
|
|
|
|
|
if len(faces) > 0: |
|
|
|
|
|
largest_face = max(faces, key=lambda rect: rect[2] * rect[3]) |
|
|
x, y, w, h = largest_face |
|
|
|
|
|
|
|
|
margin = int(max(w, h) * 0.2) |
|
|
x_start = max(x - margin, 0) |
|
|
y_start = max(y - margin, 0) |
|
|
x_end = min(x + w + margin, frame.shape[1]) |
|
|
y_end = min(y + h + margin, frame.shape[0]) |
|
|
|
|
|
face_crop = image[y_start:y_end, x_start:x_end] |
|
|
|
|
|
|
|
|
input_image = face_crop if face_crop is not None else image |
|
|
|
|
|
|
|
|
augmented = transform(image=input_image) |
|
|
image_tensor = augmented['image'].unsqueeze(0).to(device) |
|
|
|
|
|
|
|
|
with torch.no_grad(): |
|
|
logits = model(image_tensor) |
|
|
prob = torch.sigmoid(logits).item() |
|
|
|
|
|
probs.append(prob) |
|
|
frame_indices.append(count) |
|
|
processed_count += 1 |
|
|
|
|
|
|
|
|
if prob > 0.5: |
|
|
timestamp = count / fps |
|
|
suspicious_frames.append({ |
|
|
"timestamp": round(timestamp, 2), |
|
|
"frame_index": count, |
|
|
"fake_prob": round(prob, 4) |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error processing frame {count}: {e}") |
|
|
|
|
|
count += 1 |
|
|
|
|
|
cap.release() |
|
|
|
|
|
if processed_count == 0: |
|
|
return {"error": "No frames processed"} |
|
|
|
|
|
|
|
|
avg_prob = sum(probs) / len(probs) |
|
|
max_prob = max(probs) |
|
|
fake_frame_count = len([p for p in probs if p > 0.6]) |
|
|
fake_ratio = fake_frame_count / processed_count |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cond1 = avg_prob > 0.65 |
|
|
|
|
|
|
|
|
|
|
|
cond2 = fake_ratio > 0.15 and max_prob > 0.7 |
|
|
|
|
|
|
|
|
cond3 = max_prob > 0.95 |
|
|
|
|
|
is_fake = cond1 or cond2 or cond3 |
|
|
|
|
|
verdict = "FAKE" if is_fake else "REAL" |
|
|
|
|
|
|
|
|
if is_fake: |
|
|
confidence = max(max_prob, 0.6) |
|
|
else: |
|
|
confidence = 1 - avg_prob |
|
|
|
|
|
return { |
|
|
"type": "video", |
|
|
"prediction": verdict, |
|
|
"confidence": float(confidence), |
|
|
"avg_fake_prob": float(avg_prob), |
|
|
"max_fake_prob": float(max_prob), |
|
|
"fake_frame_ratio": float(fake_ratio), |
|
|
"processed_frames": processed_count, |
|
|
"duration": float(duration), |
|
|
"timeline": [ |
|
|
{"time": round(i / fps, 2), "prob": round(p, 3)} |
|
|
for i, p in zip(frame_indices, probs) |
|
|
], |
|
|
"suspicious_frames": suspicious_frames[:10] |
|
|
} |
|
|
|