Spaces:
Sleeping
Sleeping
File size: 4,146 Bytes
45742a7 63bbcbd 45742a7 63bbcbd 45742a7 63bbcbd 45742a7 63bbcbd 45742a7 63bbcbd 45742a7 63bbcbd 45742a7 63bbcbd 45742a7 63bbcbd 45742a7 63bbcbd 45742a7 63bbcbd 45742a7 63bbcbd 45742a7 63bbcbd 45742a7 63bbcbd 45742a7 63bbcbd 45742a7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing import image
import cv2 # (opencv-python)
from mtcnn.mtcnn import MTCNN
# Use relative import for use as a module
try:
from . import config
except ImportError:
import config
# Suppress TensorFlow logs
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
tf.get_logger().setLevel('ERROR')
def process_video_for_prediction(video_path, detector):
"""
This is a complete pipeline to process one video for prediction.
1. Extracts 30 frames
2. Runs MTCNN to find/crop faces
3. Resizes faces to 299x299
4. Normalizes and stacks them into a (1, 30, 299, 299, 3) batch.
"""
print("Processing video... This may take a moment.")
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"Error: Could not open video file {video_path}")
return None
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if total_frames < config.SEQUENCE_LENGTH:
print(f"Warning: Video is too short. Has {total_frames} frames, needs {config.SEQUENCE_LENGTH}.")
frame_indices = np.linspace(0, total_frames - 1, config.SEQUENCE_LENGTH, dtype=int)
video_sequence = np.zeros(
(config.SEQUENCE_LENGTH, config.TARGET_IMAGE_SIZE, config.TARGET_IMAGE_SIZE, 3),
dtype=np.float32
)
frames_processed = 0
for i, frame_index in enumerate(frame_indices):
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_index)
ret, frame = cap.read()
if not ret:
continue
try:
# --- MTCNN Face Detection ---
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = detector.detect_faces(frame_rgb)
if results:
x1, y1, width, height = results[0]['box']
x1, y1 = max(0, x1), max(0, y1)
x2, y2 = min(frame.shape[1], x1 + width), min(frame.shape[0], y1 + height)
face_crop = frame[y1:y2, x1:x2]
# Resize to model's expected input
face_resized = cv2.resize(face_crop, (config.TARGET_IMAGE_SIZE, config.TARGET_IMAGE_SIZE))
# Normalize (just like in training)
normalized_face = face_resized / 255.0
# Add to our sequence
video_sequence[i] = normalized_face
frames_processed += 1
except Exception as e:
print(f"Warning: Error on frame {frame_index}: {e}")
pass
cap.release()
if frames_processed < (config.SEQUENCE_LENGTH * 0.5): # e.g., < 15 frames
print("Error: Could not detect faces in most of the video. Aborting.")
return None
print(f"Successfully processed {frames_processed} frames.")
# Add the "batch" dimension
# Shape becomes (1, 30, 299, 299, 3)
return np.expand_dims(video_sequence, axis=0)
def get_video_prediction(video_path, video_model, detector):
# 1. Process the video using the helper function
# We pass the pre-loaded detector to it.
video_batch = process_video_for_prediction(video_path, detector)
# 2. Handle processing failure
if video_batch is None:
print("Video processing failed, returning error.")
return {
"prediction": "Error",
"confidence": 0.0,
"detail": "Could not process video or detect faces."
}
# 3. Make a prediction
print("Model is making a prediction...")
prediction_prob = video_model.predict(video_batch)[0][0]
# 4. Interpret the result
# 'fake' = 0, 'real' = 1
if prediction_prob > 0.5:
label = 'REAL'
confidence = prediction_prob * 100
else:
label = 'FAKE'
confidence = (1 - prediction_prob) * 100
print("Prediction successful.")
# 5. Return the result dictionary
return {
"prediction": label,
"confidence": float(confidence),
"raw_score": float(prediction_prob)
}
|