"""Eye sequence extraction for inference (no Hugging Face dependencies).""" from __future__ import annotations import os import tempfile from pathlib import Path import cv2 import mediapipe as mp import numpy as np LEFT_EYE = [33, 160, 158, 133, 153, 144] RIGHT_EYE = [362, 385, 387, 263, 373, 380] def compute_ear(landmarks, eye_indices: list[int]) -> float: p = [landmarks[i] for i in eye_indices] a = np.linalg.norm(np.array([p[1].x, p[1].y]) - np.array([p[5].x, p[5].y])) b = np.linalg.norm(np.array([p[2].x, p[2].y]) - np.array([p[4].x, p[4].y])) c = np.linalg.norm(np.array([p[0].x, p[0].y]) - np.array([p[3].x, p[3].y])) return float((a + b) / (2.0 * c + 1e-6)) def _extract_sequences_from_capture( cap: cv2.VideoCapture, label: int, video_id: str, seq_len: int, face_mesh: mp.solutions.face_mesh.FaceMesh, ) -> list[dict]: fps = cap.get(cv2.CAP_PROP_FPS) or 30 frame_interval = max(1, int(fps / 10)) all_frames: list[np.ndarray] = [] all_ears: list[float] = [] frame_idx = 0 while True: ok, frame = cap.read() if not ok: break if frame_idx % frame_interval == 0: rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) result = face_mesh.process(rgb) if result.multi_face_landmarks: lm = result.multi_face_landmarks[0].landmark h, w = frame.shape[:2] left_ear = compute_ear(lm, LEFT_EYE) right_ear = compute_ear(lm, RIGHT_EYE) ear = (left_ear + right_ear) / 2.0 eye_pts = [lm[i] for i in LEFT_EYE + RIGHT_EYE] xs = [int(p.x * w) for p in eye_pts] ys = [int(p.y * h) for p in eye_pts] x1, x2 = max(0, min(xs) - 20), min(w, max(xs) + 20) y1, y2 = max(0, min(ys) - 20), min(h, max(ys) + 20) crop = rgb[y1:y2, x1:x2] if crop.size > 0: crop = cv2.resize(crop, (224, 224)) all_frames.append(crop.astype(np.uint8)) all_ears.append(ear) frame_idx += 1 sequences: list[dict] = [] for i in range(0, len(all_frames) - seq_len + 1, seq_len): frames = np.stack(all_frames[i : i + seq_len]).astype(np.uint8) ears = np.array(all_ears[i : i + seq_len], dtype=np.float32) sequences.append( { "frames": frames, "ear": ears, "label": label, "video_id": f"{video_id}_seq{i // seq_len:03d}", } ) return sequences def extract_sequences_from_video_path( video_path: str | Path, label: int, video_id: str, seq_len: int = 16, ) -> list[dict]: face_mesh = mp.solutions.face_mesh.FaceMesh( static_image_mode=False, max_num_faces=1, refine_landmarks=True, min_detection_confidence=0.5, min_tracking_confidence=0.5, ) cap = cv2.VideoCapture(str(video_path)) try: return _extract_sequences_from_capture(cap, label, video_id, seq_len, face_mesh) finally: cap.release() face_mesh.close() def extract_sequences_from_video_bytes( video_bytes: bytes, label: int, video_id: str, seq_len: int = 16, ) -> list[dict]: with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp: tmp.write(video_bytes) tmp_path = tmp.name try: return extract_sequences_from_video_path(tmp_path, label, video_id, seq_len=seq_len) finally: if os.path.exists(tmp_path): os.unlink(tmp_path)