Spaces:
Sleeping
Sleeping
File size: 4,281 Bytes
1dc2504 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 | from __future__ import annotations
import argparse
from pathlib import Path
from typing import List, Tuple
import cv2
import mediapipe as mp
import numpy as np
import pandas as pd
from tqdm import tqdm
LEFT_EYE_IDX = [33, 133, 159, 145]
RIGHT_EYE_IDX = [362, 263, 386, 374]
def eye_bbox(landmarks, image_w: int, image_h: int, indices: List[int], pad: int = 8) -> Tuple[int, int, int, int]:
pts = np.array([(int(landmarks[i].x * image_w), int(landmarks[i].y * image_h)) for i in indices])
x1, y1 = pts.min(axis=0)
x2, y2 = pts.max(axis=0)
return max(0, x1 - pad), max(0, y1 - pad), min(image_w, x2 + pad), min(image_h, y2 + pad)
def process_frame(frame: np.ndarray, face_mesh) -> Tuple[np.ndarray, float]:
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
res = face_mesh.process(rgb)
if not res.multi_face_landmarks:
return None, 0.0
h, w = frame.shape[:2]
lm = res.multi_face_landmarks[0].landmark
lx1, ly1, lx2, ly2 = eye_bbox(lm, w, h, LEFT_EYE_IDX)
rx1, ry1, rx2, ry2 = eye_bbox(lm, w, h, RIGHT_EYE_IDX)
left = frame[ly1:ly2, lx1:lx2]
right = frame[ry1:ry2, rx1:rx2]
if left.size == 0 or right.size == 0:
return None, 0.0
left = cv2.resize(left, (112, 112))
right = cv2.resize(right, (112, 112))
eye_pair = np.concatenate([left, right], axis=1)
left_open = abs(lm[LEFT_EYE_IDX[2]].y - lm[LEFT_EYE_IDX[3]].y)
right_open = abs(lm[RIGHT_EYE_IDX[2]].y - lm[RIGHT_EYE_IDX[3]].y)
ear_proxy = float((left_open + right_open) / 2.0)
return eye_pair, ear_proxy
def sample_sequences(frames: List[np.ndarray], ear: List[float], seq_len: int) -> Tuple[List[np.ndarray], List[np.ndarray]]:
x, b = [], []
for i in range(0, len(frames) - seq_len + 1, seq_len):
x.append(np.stack(frames[i : i + seq_len], axis=0))
b.append(np.array(ear[i : i + seq_len], dtype=np.float32))
return x, b
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--metadata", type=Path, required=True)
parser.add_argument("--out-root", type=Path, required=True)
parser.add_argument("--sequence-length", type=int, default=32)
args = parser.parse_args()
df = pd.read_csv(args.metadata)
mesh = mp.solutions.face_mesh.FaceMesh(
static_image_mode=False,
max_num_faces=1,
refine_landmarks=True,
min_detection_confidence=0.5,
min_tracking_confidence=0.5,
)
records = []
for row in tqdm(df.to_dict(orient="records"), desc="Extracting eye sequences"):
frame_dir = Path(row["frame_dir"])
frame_paths = sorted(frame_dir.glob("*.jpg"))
eye_frames: List[np.ndarray] = []
ear_values: List[float] = []
for fp in frame_paths:
frame = cv2.imread(str(fp))
if frame is None:
continue
eye_img, ear = process_frame(frame, mesh)
if eye_img is None:
continue
eye_frames.append(eye_img)
ear_values.append(ear)
sequences, blink = sample_sequences(eye_frames, ear_values, args.sequence_length)
video_id = Path(row["video_path"]).stem
saved = 0
for idx, (seq, blink_seq) in enumerate(zip(sequences, blink)):
out_dir = args.out_root / "sequences" / row["dataset"] / row["split"] / video_id
out_dir.mkdir(parents=True, exist_ok=True)
out_path = out_dir / f"{idx:03d}.npz"
np.savez_compressed(out_path, frames=seq, blink=blink_seq, label=row["label"])
saved += 1
if saved > 0:
records.append(
{
"dataset": row["dataset"],
"video_path": row["video_path"],
"identity": row["identity"],
"split": row["split"],
"label": row["label"],
"sequence_count": saved,
"sequence_dir": str((args.out_root / "sequences" / row["dataset"] / row["split"] / video_id).resolve()),
}
)
out_csv = args.out_root / "metadata_sequences.csv"
pd.DataFrame(records).to_csv(out_csv, index=False)
print(f"Saved sequence metadata: {out_csv}")
if __name__ == "__main__":
main()
|