Spaces:
Sleeping
Sleeping
File size: 5,728 Bytes
d5b4f5f ac0baac d5b4f5f ac0baac d5b4f5f ac0baac d5b4f5f ac0baac d5b4f5f ac0baac d5b4f5f ac0baac d5b4f5f ac0baac d5b4f5f ac0baac d5b4f5f ac0baac d5b4f5f ac0baac d5b4f5f ac0baac d5b4f5f ac0baac d5b4f5f ac0baac d5b4f5f ac0baac d5b4f5f ac0baac d5b4f5f ac0baac d5b4f5f ac0baac d5b4f5f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 | import pathlib
import time
from typing import Union
import cv2
import numpy as np
import torch
import torch.nn as nn
from dataclasses import dataclass
from face_detection import RetinaFace
from .utils import prep_input_numpy, getArch
from .results import GazeResultContainer
class Pipeline:
def __init__(
self,
weights: pathlib.Path,
arch: str,
device: str = 'cpu',
include_detector:bool = True,
confidence_threshold:float = 0.5
):
# Save input parameters
self.weights = weights
self.include_detector = include_detector
self.device = device
self.confidence_threshold = confidence_threshold
# Create L2CS model
self.model = getArch(arch, 90)
# PyTorch 2.6+ defaults weights_only=True; these checkpoints need full unpickle
self.model.load_state_dict(
torch.load(self.weights, map_location=device, weights_only=False)
)
self.model.to(self.device)
self.model.eval()
# Half precision on GPU for ~2x speedup
self._use_half = (device.type != 'cpu')
if self._use_half:
self.model.half()
# Create RetinaFace if requested
if self.include_detector:
if device.type == 'cpu':
self.detector = RetinaFace()
else:
self.detector = RetinaFace(gpu_id=device.index)
self.softmax = nn.Softmax(dim=1)
self.idx_tensor = [idx for idx in range(90)]
self.idx_tensor = torch.FloatTensor(self.idx_tensor).to(self.device)
# Warmup: dummy forward pass to avoid cold-start latency
self._warmup()
def _warmup(self):
"""Run a dummy forward pass to warm up the model and CUDA kernels."""
dummy = np.zeros((224, 224, 3), dtype=np.uint8)
try:
with torch.no_grad():
self.predict_gaze(dummy)
print("[L2CS] Model warmup complete")
except Exception as e:
print(f"[L2CS] Warmup failed (non-fatal): {e}")
def step(self, frame: np.ndarray) -> GazeResultContainer:
# Creating containers
face_imgs = []
bboxes = []
landmarks = []
scores = []
if self.include_detector:
t0 = time.perf_counter()
faces = self.detector(frame)
t_detect = (time.perf_counter() - t0) * 1000
if faces is not None:
t0 = time.perf_counter()
for box, landmark, score in faces:
# Apply threshold
if score < self.confidence_threshold:
continue
# Extract safe min and max of x,y
x_min=int(box[0])
if x_min < 0:
x_min = 0
y_min=int(box[1])
if y_min < 0:
y_min = 0
x_max=int(box[2])
y_max=int(box[3])
# Crop image
img = frame[y_min:y_max, x_min:x_max]
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = cv2.resize(img, (224, 224))
face_imgs.append(img)
# Save data
bboxes.append(box)
landmarks.append(landmark)
scores.append(score)
t_preprocess = (time.perf_counter() - t0) * 1000
# Predict gaze
t0 = time.perf_counter()
with torch.no_grad():
pitch, yaw = self.predict_gaze(np.stack(face_imgs))
t_inference = (time.perf_counter() - t0) * 1000
# Log timing every 30 frames (avoid spamming)
if not hasattr(self, '_step_count'):
self._step_count = 0
self._step_count += 1
if self._step_count % 30 == 1:
print(f"[L2CS timing] detect={t_detect:.1f}ms preprocess={t_preprocess:.1f}ms inference={t_inference:.1f}ms total={t_detect+t_preprocess+t_inference:.1f}ms")
else:
pitch = np.empty((0,1))
yaw = np.empty((0,1))
else:
with torch.no_grad():
pitch, yaw = self.predict_gaze(frame)
# Save data
results = GazeResultContainer(
pitch=pitch,
yaw=yaw,
bboxes=np.stack(bboxes),
landmarks=np.stack(landmarks),
scores=np.stack(scores)
)
return results
def predict_gaze(self, frame: Union[np.ndarray, torch.Tensor]):
# Prepare input
if isinstance(frame, np.ndarray):
img = prep_input_numpy(frame, self.device)
elif isinstance(frame, torch.Tensor):
img = frame
else:
raise RuntimeError("Invalid dtype for input")
# Half precision on GPU
if self._use_half:
img = img.half()
# Forward pass (caller should wrap in torch.no_grad())
gaze_pitch, gaze_yaw = self.model(img)
pitch_predicted = self.softmax(gaze_pitch.float())
yaw_predicted = self.softmax(gaze_yaw.float())
# Get continuous predictions in degrees.
pitch_predicted = torch.sum(pitch_predicted.data * self.idx_tensor, dim=1) * 4 - 180
yaw_predicted = torch.sum(yaw_predicted.data * self.idx_tensor, dim=1) * 4 - 180
pitch_predicted = pitch_predicted.cpu().detach().numpy() * np.pi / 180.0
yaw_predicted = yaw_predicted.cpu().detach().numpy() * np.pi / 180.0
return pitch_predicted, yaw_predicted
|