File size: 4,281 Bytes
1dc2504
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
from __future__ import annotations

import argparse
from pathlib import Path
from typing import List, Tuple

import cv2
import mediapipe as mp
import numpy as np
import pandas as pd
from tqdm import tqdm


LEFT_EYE_IDX = [33, 133, 159, 145]
RIGHT_EYE_IDX = [362, 263, 386, 374]


def eye_bbox(landmarks, image_w: int, image_h: int, indices: List[int], pad: int = 8) -> Tuple[int, int, int, int]:
    pts = np.array([(int(landmarks[i].x * image_w), int(landmarks[i].y * image_h)) for i in indices])
    x1, y1 = pts.min(axis=0)
    x2, y2 = pts.max(axis=0)
    return max(0, x1 - pad), max(0, y1 - pad), min(image_w, x2 + pad), min(image_h, y2 + pad)


def process_frame(frame: np.ndarray, face_mesh) -> Tuple[np.ndarray, float]:
    rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    res = face_mesh.process(rgb)
    if not res.multi_face_landmarks:
        return None, 0.0

    h, w = frame.shape[:2]
    lm = res.multi_face_landmarks[0].landmark
    lx1, ly1, lx2, ly2 = eye_bbox(lm, w, h, LEFT_EYE_IDX)
    rx1, ry1, rx2, ry2 = eye_bbox(lm, w, h, RIGHT_EYE_IDX)

    left = frame[ly1:ly2, lx1:lx2]
    right = frame[ry1:ry2, rx1:rx2]
    if left.size == 0 or right.size == 0:
        return None, 0.0

    left = cv2.resize(left, (112, 112))
    right = cv2.resize(right, (112, 112))
    eye_pair = np.concatenate([left, right], axis=1)

    left_open = abs(lm[LEFT_EYE_IDX[2]].y - lm[LEFT_EYE_IDX[3]].y)
    right_open = abs(lm[RIGHT_EYE_IDX[2]].y - lm[RIGHT_EYE_IDX[3]].y)
    ear_proxy = float((left_open + right_open) / 2.0)
    return eye_pair, ear_proxy


def sample_sequences(frames: List[np.ndarray], ear: List[float], seq_len: int) -> Tuple[List[np.ndarray], List[np.ndarray]]:
    x, b = [], []
    for i in range(0, len(frames) - seq_len + 1, seq_len):
        x.append(np.stack(frames[i : i + seq_len], axis=0))
        b.append(np.array(ear[i : i + seq_len], dtype=np.float32))
    return x, b


def main() -> None:
    parser = argparse.ArgumentParser()
    parser.add_argument("--metadata", type=Path, required=True)
    parser.add_argument("--out-root", type=Path, required=True)
    parser.add_argument("--sequence-length", type=int, default=32)
    args = parser.parse_args()

    df = pd.read_csv(args.metadata)
    mesh = mp.solutions.face_mesh.FaceMesh(
        static_image_mode=False,
        max_num_faces=1,
        refine_landmarks=True,
        min_detection_confidence=0.5,
        min_tracking_confidence=0.5,
    )

    records = []
    for row in tqdm(df.to_dict(orient="records"), desc="Extracting eye sequences"):
        frame_dir = Path(row["frame_dir"])
        frame_paths = sorted(frame_dir.glob("*.jpg"))
        eye_frames: List[np.ndarray] = []
        ear_values: List[float] = []
        for fp in frame_paths:
            frame = cv2.imread(str(fp))
            if frame is None:
                continue
            eye_img, ear = process_frame(frame, mesh)
            if eye_img is None:
                continue
            eye_frames.append(eye_img)
            ear_values.append(ear)

        sequences, blink = sample_sequences(eye_frames, ear_values, args.sequence_length)
        video_id = Path(row["video_path"]).stem
        saved = 0
        for idx, (seq, blink_seq) in enumerate(zip(sequences, blink)):
            out_dir = args.out_root / "sequences" / row["dataset"] / row["split"] / video_id
            out_dir.mkdir(parents=True, exist_ok=True)
            out_path = out_dir / f"{idx:03d}.npz"
            np.savez_compressed(out_path, frames=seq, blink=blink_seq, label=row["label"])
            saved += 1
        if saved > 0:
            records.append(
                {
                    "dataset": row["dataset"],
                    "video_path": row["video_path"],
                    "identity": row["identity"],
                    "split": row["split"],
                    "label": row["label"],
                    "sequence_count": saved,
                    "sequence_dir": str((args.out_root / "sequences" / row["dataset"] / row["split"] / video_id).resolve()),
                }
            )

    out_csv = args.out_root / "metadata_sequences.csv"
    pd.DataFrame(records).to_csv(out_csv, index=False)
    print(f"Saved sequence metadata: {out_csv}")


if __name__ == "__main__":
    main()