File size: 5,728 Bytes
d5b4f5f
ac0baac
d5b4f5f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ac0baac
 
d5b4f5f
ac0baac
d5b4f5f
 
 
 
 
 
 
 
 
 
 
 
ac0baac
 
 
 
d5b4f5f
 
 
ac0baac
 
 
 
 
d5b4f5f
 
 
 
 
 
 
 
 
 
 
 
ac0baac
 
 
 
 
 
 
 
 
 
 
 
 
d5b4f5f
 
 
 
 
 
 
 
 
ac0baac
d5b4f5f
ac0baac
d5b4f5f
ac0baac
 
d5b4f5f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ac0baac
d5b4f5f
 
 
 
 
 
 
 
 
 
 
ac0baac
 
d5b4f5f
ac0baac
 
 
 
 
 
 
 
 
 
 
d5b4f5f
 
 
 
 
 
 
ac0baac
 
d5b4f5f
 
 
 
 
 
 
 
 
 
 
 
 
ac0baac
d5b4f5f
 
 
 
 
 
 
ac0baac
 
 
 
 
 
d5b4f5f
ac0baac
 
 
d5b4f5f
 
 
ac0baac
 
 
d5b4f5f
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
import pathlib
import time
from typing import Union

import cv2
import numpy as np
import torch
import torch.nn as nn
from dataclasses import dataclass
from face_detection import RetinaFace

from .utils import prep_input_numpy, getArch
from .results import GazeResultContainer


class Pipeline:

    def __init__(
        self,
        weights: pathlib.Path,
        arch: str,
        device: str = 'cpu',
        include_detector:bool = True,
        confidence_threshold:float = 0.5
        ):

        # Save input parameters
        self.weights = weights
        self.include_detector = include_detector
        self.device = device
        self.confidence_threshold = confidence_threshold

        # Create L2CS model
        self.model = getArch(arch, 90)
        # PyTorch 2.6+ defaults weights_only=True; these checkpoints need full unpickle
        self.model.load_state_dict(
            torch.load(self.weights, map_location=device, weights_only=False)
        )
        self.model.to(self.device)
        self.model.eval()

        # Half precision on GPU for ~2x speedup
        self._use_half = (device.type != 'cpu')
        if self._use_half:
            self.model.half()

        # Create RetinaFace if requested
        if self.include_detector:

            if device.type == 'cpu':
                self.detector = RetinaFace()
            else:
                self.detector = RetinaFace(gpu_id=device.index)

            self.softmax = nn.Softmax(dim=1)
            self.idx_tensor = [idx for idx in range(90)]
            self.idx_tensor = torch.FloatTensor(self.idx_tensor).to(self.device)

        # Warmup: dummy forward pass to avoid cold-start latency
        self._warmup()

    def _warmup(self):
        """Run a dummy forward pass to warm up the model and CUDA kernels."""
        dummy = np.zeros((224, 224, 3), dtype=np.uint8)
        try:
            with torch.no_grad():
                self.predict_gaze(dummy)
            print("[L2CS] Model warmup complete")
        except Exception as e:
            print(f"[L2CS] Warmup failed (non-fatal): {e}")

    def step(self, frame: np.ndarray) -> GazeResultContainer:

        # Creating containers
        face_imgs = []
        bboxes = []
        landmarks = []
        scores = []

        if self.include_detector:
            t0 = time.perf_counter()
            faces = self.detector(frame)
            t_detect = (time.perf_counter() - t0) * 1000

            if faces is not None:
                t0 = time.perf_counter()
                for box, landmark, score in faces:

                    # Apply threshold
                    if score < self.confidence_threshold:
                        continue

                    # Extract safe min and max of x,y
                    x_min=int(box[0])
                    if x_min < 0:
                        x_min = 0
                    y_min=int(box[1])
                    if y_min < 0:
                        y_min = 0
                    x_max=int(box[2])
                    y_max=int(box[3])

                    # Crop image
                    img = frame[y_min:y_max, x_min:x_max]
                    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
                    img = cv2.resize(img, (224, 224))
                    face_imgs.append(img)

                    # Save data
                    bboxes.append(box)
                    landmarks.append(landmark)
                    scores.append(score)

                t_preprocess = (time.perf_counter() - t0) * 1000

                # Predict gaze
                t0 = time.perf_counter()
                with torch.no_grad():
                    pitch, yaw = self.predict_gaze(np.stack(face_imgs))
                t_inference = (time.perf_counter() - t0) * 1000

                # Log timing every 30 frames (avoid spamming)
                if not hasattr(self, '_step_count'):
                    self._step_count = 0
                self._step_count += 1
                if self._step_count % 30 == 1:
                    print(f"[L2CS timing] detect={t_detect:.1f}ms preprocess={t_preprocess:.1f}ms inference={t_inference:.1f}ms total={t_detect+t_preprocess+t_inference:.1f}ms")

            else:

                pitch = np.empty((0,1))
                yaw = np.empty((0,1))

        else:
            with torch.no_grad():
                pitch, yaw = self.predict_gaze(frame)

        # Save data
        results = GazeResultContainer(
            pitch=pitch,
            yaw=yaw,
            bboxes=np.stack(bboxes),
            landmarks=np.stack(landmarks),
            scores=np.stack(scores)
        )

        return results

    def predict_gaze(self, frame: Union[np.ndarray, torch.Tensor]):

        # Prepare input
        if isinstance(frame, np.ndarray):
            img = prep_input_numpy(frame, self.device)
        elif isinstance(frame, torch.Tensor):
            img = frame
        else:
            raise RuntimeError("Invalid dtype for input")

        # Half precision on GPU
        if self._use_half:
            img = img.half()

        # Forward pass (caller should wrap in torch.no_grad())
        gaze_pitch, gaze_yaw = self.model(img)
        pitch_predicted = self.softmax(gaze_pitch.float())
        yaw_predicted = self.softmax(gaze_yaw.float())

        # Get continuous predictions in degrees.
        pitch_predicted = torch.sum(pitch_predicted.data * self.idx_tensor, dim=1) * 4 - 180
        yaw_predicted = torch.sum(yaw_predicted.data * self.idx_tensor, dim=1) * 4 - 180

        pitch_predicted = pitch_predicted.cpu().detach().numpy() * np.pi / 180.0
        yaw_predicted = yaw_predicted.cpu().detach().numpy() * np.pi / 180.0

        return pitch_predicted, yaw_predicted