deepfake-server / src /data /eye_extract.py
DevQueen's picture
Sync from GitHub via hub-sync
1dc2504 verified
Raw
History Blame Contribute Delete
3.65 kB
"""Eye sequence extraction for inference (no Hugging Face dependencies)."""
from __future__ import annotations
import os
import tempfile
from pathlib import Path
import cv2
import mediapipe as mp
import numpy as np
LEFT_EYE = [33, 160, 158, 133, 153, 144]
RIGHT_EYE = [362, 385, 387, 263, 373, 380]
def compute_ear(landmarks, eye_indices: list[int]) -> float:
p = [landmarks[i] for i in eye_indices]
a = np.linalg.norm(np.array([p[1].x, p[1].y]) - np.array([p[5].x, p[5].y]))
b = np.linalg.norm(np.array([p[2].x, p[2].y]) - np.array([p[4].x, p[4].y]))
c = np.linalg.norm(np.array([p[0].x, p[0].y]) - np.array([p[3].x, p[3].y]))
return float((a + b) / (2.0 * c + 1e-6))
def _extract_sequences_from_capture(
cap: cv2.VideoCapture,
label: int,
video_id: str,
seq_len: int,
face_mesh: mp.solutions.face_mesh.FaceMesh,
) -> list[dict]:
fps = cap.get(cv2.CAP_PROP_FPS) or 30
frame_interval = max(1, int(fps / 10))
all_frames: list[np.ndarray] = []
all_ears: list[float] = []
frame_idx = 0
while True:
ok, frame = cap.read()
if not ok:
break
if frame_idx % frame_interval == 0:
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
result = face_mesh.process(rgb)
if result.multi_face_landmarks:
lm = result.multi_face_landmarks[0].landmark
h, w = frame.shape[:2]
left_ear = compute_ear(lm, LEFT_EYE)
right_ear = compute_ear(lm, RIGHT_EYE)
ear = (left_ear + right_ear) / 2.0
eye_pts = [lm[i] for i in LEFT_EYE + RIGHT_EYE]
xs = [int(p.x * w) for p in eye_pts]
ys = [int(p.y * h) for p in eye_pts]
x1, x2 = max(0, min(xs) - 20), min(w, max(xs) + 20)
y1, y2 = max(0, min(ys) - 20), min(h, max(ys) + 20)
crop = rgb[y1:y2, x1:x2]
if crop.size > 0:
crop = cv2.resize(crop, (224, 224))
all_frames.append(crop.astype(np.uint8))
all_ears.append(ear)
frame_idx += 1
sequences: list[dict] = []
for i in range(0, len(all_frames) - seq_len + 1, seq_len):
frames = np.stack(all_frames[i : i + seq_len]).astype(np.uint8)
ears = np.array(all_ears[i : i + seq_len], dtype=np.float32)
sequences.append(
{
"frames": frames,
"ear": ears,
"label": label,
"video_id": f"{video_id}_seq{i // seq_len:03d}",
}
)
return sequences
def extract_sequences_from_video_path(
video_path: str | Path,
label: int,
video_id: str,
seq_len: int = 16,
) -> list[dict]:
face_mesh = mp.solutions.face_mesh.FaceMesh(
static_image_mode=False,
max_num_faces=1,
refine_landmarks=True,
min_detection_confidence=0.5,
min_tracking_confidence=0.5,
)
cap = cv2.VideoCapture(str(video_path))
try:
return _extract_sequences_from_capture(cap, label, video_id, seq_len, face_mesh)
finally:
cap.release()
face_mesh.close()
def extract_sequences_from_video_bytes(
video_bytes: bytes,
label: int,
video_id: str,
seq_len: int = 16,
) -> list[dict]:
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp:
tmp.write(video_bytes)
tmp_path = tmp.name
try:
return extract_sequences_from_video_path(tmp_path, label, video_id, seq_len=seq_len)
finally:
if os.path.exists(tmp_path):
os.unlink(tmp_path)