final_test / models /L2CS-Net /l2cs /pipeline.py
Abdelrahman Almatrooshi
Deploy snapshot from main b7a59b11809483dfc959f196f1930240f2662c49
22a6915
import pathlib
import time
from typing import Union
import cv2
import numpy as np
import torch
import torch.nn as nn
from dataclasses import dataclass
from face_detection import RetinaFace
from .utils import prep_input_numpy, getArch
from .results import GazeResultContainer
class Pipeline:
def __init__(
self,
weights: pathlib.Path,
arch: str,
device: str = 'cpu',
include_detector:bool = True,
confidence_threshold:float = 0.5
):
# Save input parameters
self.weights = weights
self.include_detector = include_detector
self.device = device
self.confidence_threshold = confidence_threshold
# Create L2CS model
self.model = getArch(arch, 90)
# PyTorch 2.6+ defaults weights_only=True; these checkpoints need full unpickle
self.model.load_state_dict(
torch.load(self.weights, map_location=device, weights_only=False)
)
self.model.to(self.device)
self.model.eval()
# Half precision on GPU for ~2x speedup
self._use_half = (device.type != 'cpu')
if self._use_half:
self.model.half()
# Create RetinaFace if requested
if self.include_detector:
if device.type == 'cpu':
self.detector = RetinaFace()
else:
self.detector = RetinaFace(gpu_id=device.index)
self.softmax = nn.Softmax(dim=1)
self.idx_tensor = [idx for idx in range(90)]
self.idx_tensor = torch.FloatTensor(self.idx_tensor).to(self.device)
# Warmup: dummy forward pass to avoid cold-start latency
self._warmup()
def _warmup(self):
"""Run a dummy forward pass to warm up the model and CUDA kernels."""
dummy = np.zeros((224, 224, 3), dtype=np.uint8)
try:
with torch.no_grad():
self.predict_gaze(dummy)
print("[L2CS] Model warmup complete")
except Exception as e:
print(f"[L2CS] Warmup failed (non-fatal): {e}")
def step(self, frame: np.ndarray) -> GazeResultContainer:
# Creating containers
face_imgs = []
bboxes = []
landmarks = []
scores = []
if self.include_detector:
t0 = time.perf_counter()
faces = self.detector(frame)
t_detect = (time.perf_counter() - t0) * 1000
if faces is not None:
t0 = time.perf_counter()
for box, landmark, score in faces:
# Apply threshold
if score < self.confidence_threshold:
continue
# Extract safe min and max of x,y
x_min=int(box[0])
if x_min < 0:
x_min = 0
y_min=int(box[1])
if y_min < 0:
y_min = 0
x_max=int(box[2])
y_max=int(box[3])
# Crop image
img = frame[y_min:y_max, x_min:x_max]
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = cv2.resize(img, (224, 224))
face_imgs.append(img)
# Save data
bboxes.append(box)
landmarks.append(landmark)
scores.append(score)
t_preprocess = (time.perf_counter() - t0) * 1000
# Predict gaze
t0 = time.perf_counter()
with torch.no_grad():
pitch, yaw = self.predict_gaze(np.stack(face_imgs))
t_inference = (time.perf_counter() - t0) * 1000
# Log timing every 30 frames (avoid spamming)
if not hasattr(self, '_step_count'):
self._step_count = 0
self._step_count += 1
if self._step_count % 30 == 1:
print(f"[L2CS timing] detect={t_detect:.1f}ms preprocess={t_preprocess:.1f}ms inference={t_inference:.1f}ms total={t_detect+t_preprocess+t_inference:.1f}ms")
else:
pitch = np.empty((0,1))
yaw = np.empty((0,1))
else:
with torch.no_grad():
pitch, yaw = self.predict_gaze(frame)
# Save data
results = GazeResultContainer(
pitch=pitch,
yaw=yaw,
bboxes=np.stack(bboxes),
landmarks=np.stack(landmarks),
scores=np.stack(scores)
)
return results
def predict_gaze(self, frame: Union[np.ndarray, torch.Tensor]):
# Prepare input
if isinstance(frame, np.ndarray):
img = prep_input_numpy(frame, self.device)
elif isinstance(frame, torch.Tensor):
img = frame
else:
raise RuntimeError("Invalid dtype for input")
# Half precision on GPU
if self._use_half:
img = img.half()
# Forward pass (caller should wrap in torch.no_grad())
gaze_pitch, gaze_yaw = self.model(img)
pitch_predicted = self.softmax(gaze_pitch.float())
yaw_predicted = self.softmax(gaze_yaw.float())
# Get continuous predictions in degrees.
pitch_predicted = torch.sum(pitch_predicted.data * self.idx_tensor, dim=1) * 4 - 180
yaw_predicted = torch.sum(yaw_predicted.data * self.idx_tensor, dim=1) * 4 - 180
pitch_predicted = pitch_predicted.cpu().detach().numpy() * np.pi / 180.0
yaw_predicted = yaw_predicted.cpu().detach().numpy() * np.pi / 180.0
return pitch_predicted, yaw_predicted