Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import argparse | |
| from pathlib import Path | |
| from typing import List, Tuple | |
| import cv2 | |
| import mediapipe as mp | |
| import numpy as np | |
| import pandas as pd | |
| from tqdm import tqdm | |
| LEFT_EYE_IDX = [33, 133, 159, 145] | |
| RIGHT_EYE_IDX = [362, 263, 386, 374] | |
| def eye_bbox(landmarks, image_w: int, image_h: int, indices: List[int], pad: int = 8) -> Tuple[int, int, int, int]: | |
| pts = np.array([(int(landmarks[i].x * image_w), int(landmarks[i].y * image_h)) for i in indices]) | |
| x1, y1 = pts.min(axis=0) | |
| x2, y2 = pts.max(axis=0) | |
| return max(0, x1 - pad), max(0, y1 - pad), min(image_w, x2 + pad), min(image_h, y2 + pad) | |
| def process_frame(frame: np.ndarray, face_mesh) -> Tuple[np.ndarray, float]: | |
| rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| res = face_mesh.process(rgb) | |
| if not res.multi_face_landmarks: | |
| return None, 0.0 | |
| h, w = frame.shape[:2] | |
| lm = res.multi_face_landmarks[0].landmark | |
| lx1, ly1, lx2, ly2 = eye_bbox(lm, w, h, LEFT_EYE_IDX) | |
| rx1, ry1, rx2, ry2 = eye_bbox(lm, w, h, RIGHT_EYE_IDX) | |
| left = frame[ly1:ly2, lx1:lx2] | |
| right = frame[ry1:ry2, rx1:rx2] | |
| if left.size == 0 or right.size == 0: | |
| return None, 0.0 | |
| left = cv2.resize(left, (112, 112)) | |
| right = cv2.resize(right, (112, 112)) | |
| eye_pair = np.concatenate([left, right], axis=1) | |
| left_open = abs(lm[LEFT_EYE_IDX[2]].y - lm[LEFT_EYE_IDX[3]].y) | |
| right_open = abs(lm[RIGHT_EYE_IDX[2]].y - lm[RIGHT_EYE_IDX[3]].y) | |
| ear_proxy = float((left_open + right_open) / 2.0) | |
| return eye_pair, ear_proxy | |
| def sample_sequences(frames: List[np.ndarray], ear: List[float], seq_len: int) -> Tuple[List[np.ndarray], List[np.ndarray]]: | |
| x, b = [], [] | |
| for i in range(0, len(frames) - seq_len + 1, seq_len): | |
| x.append(np.stack(frames[i : i + seq_len], axis=0)) | |
| b.append(np.array(ear[i : i + seq_len], dtype=np.float32)) | |
| return x, b | |
| def main() -> None: | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--metadata", type=Path, required=True) | |
| parser.add_argument("--out-root", type=Path, required=True) | |
| parser.add_argument("--sequence-length", type=int, default=32) | |
| args = parser.parse_args() | |
| df = pd.read_csv(args.metadata) | |
| mesh = mp.solutions.face_mesh.FaceMesh( | |
| static_image_mode=False, | |
| max_num_faces=1, | |
| refine_landmarks=True, | |
| min_detection_confidence=0.5, | |
| min_tracking_confidence=0.5, | |
| ) | |
| records = [] | |
| for row in tqdm(df.to_dict(orient="records"), desc="Extracting eye sequences"): | |
| frame_dir = Path(row["frame_dir"]) | |
| frame_paths = sorted(frame_dir.glob("*.jpg")) | |
| eye_frames: List[np.ndarray] = [] | |
| ear_values: List[float] = [] | |
| for fp in frame_paths: | |
| frame = cv2.imread(str(fp)) | |
| if frame is None: | |
| continue | |
| eye_img, ear = process_frame(frame, mesh) | |
| if eye_img is None: | |
| continue | |
| eye_frames.append(eye_img) | |
| ear_values.append(ear) | |
| sequences, blink = sample_sequences(eye_frames, ear_values, args.sequence_length) | |
| video_id = Path(row["video_path"]).stem | |
| saved = 0 | |
| for idx, (seq, blink_seq) in enumerate(zip(sequences, blink)): | |
| out_dir = args.out_root / "sequences" / row["dataset"] / row["split"] / video_id | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| out_path = out_dir / f"{idx:03d}.npz" | |
| np.savez_compressed(out_path, frames=seq, blink=blink_seq, label=row["label"]) | |
| saved += 1 | |
| if saved > 0: | |
| records.append( | |
| { | |
| "dataset": row["dataset"], | |
| "video_path": row["video_path"], | |
| "identity": row["identity"], | |
| "split": row["split"], | |
| "label": row["label"], | |
| "sequence_count": saved, | |
| "sequence_dir": str((args.out_root / "sequences" / row["dataset"] / row["split"] / video_id).resolve()), | |
| } | |
| ) | |
| out_csv = args.out_root / "metadata_sequences.csv" | |
| pd.DataFrame(records).to_csv(out_csv, index=False) | |
| print(f"Saved sequence metadata: {out_csv}") | |
| if __name__ == "__main__": | |
| main() | |