Spaces:
Sleeping
Sleeping
File size: 4,045 Bytes
2eba0cc | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 | import pathlib
from typing import Union
import cv2
import numpy as np
import torch
import torch.nn as nn
from dataclasses import dataclass
from face_detection import RetinaFace
from .utils import prep_input_numpy, getArch
from .results import GazeResultContainer
class Pipeline:
def __init__(
self,
weights: pathlib.Path,
arch: str,
device: str = 'cpu',
include_detector:bool = True,
confidence_threshold:float = 0.5
):
# Save input parameters
self.weights = weights
self.include_detector = include_detector
self.device = device
self.confidence_threshold = confidence_threshold
# Create L2CS model
self.model = getArch(arch, 90)
self.model.load_state_dict(torch.load(self.weights, map_location=device))
self.model.to(self.device)
self.model.eval()
# Create RetinaFace if requested
if self.include_detector:
if device.type == 'cpu':
self.detector = RetinaFace()
else:
self.detector = RetinaFace(gpu_id=device.index)
self.softmax = nn.Softmax(dim=1)
self.idx_tensor = [idx for idx in range(90)]
self.idx_tensor = torch.FloatTensor(self.idx_tensor).to(self.device)
def step(self, frame: np.ndarray) -> GazeResultContainer:
# Creating containers
face_imgs = []
bboxes = []
landmarks = []
scores = []
if self.include_detector:
faces = self.detector(frame)
if faces is not None:
for box, landmark, score in faces:
# Apply threshold
if score < self.confidence_threshold:
continue
# Extract safe min and max of x,y
x_min=int(box[0])
if x_min < 0:
x_min = 0
y_min=int(box[1])
if y_min < 0:
y_min = 0
x_max=int(box[2])
y_max=int(box[3])
# Crop image
img = frame[y_min:y_max, x_min:x_max]
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = cv2.resize(img, (224, 224))
face_imgs.append(img)
# Save data
bboxes.append(box)
landmarks.append(landmark)
scores.append(score)
# Predict gaze
pitch, yaw = self.predict_gaze(np.stack(face_imgs))
else:
pitch = np.empty((0,1))
yaw = np.empty((0,1))
else:
pitch, yaw = self.predict_gaze(frame)
# Save data
results = GazeResultContainer(
pitch=pitch,
yaw=yaw,
bboxes=np.stack(bboxes),
landmarks=np.stack(landmarks),
scores=np.stack(scores)
)
return results
def predict_gaze(self, frame: Union[np.ndarray, torch.Tensor]):
# Prepare input
if isinstance(frame, np.ndarray):
img = prep_input_numpy(frame, self.device)
elif isinstance(frame, torch.Tensor):
img = frame
else:
raise RuntimeError("Invalid dtype for input")
# Predict
gaze_pitch, gaze_yaw = self.model(img)
pitch_predicted = self.softmax(gaze_pitch)
yaw_predicted = self.softmax(gaze_yaw)
# Get continuous predictions in degrees.
pitch_predicted = torch.sum(pitch_predicted.data * self.idx_tensor, dim=1) * 4 - 180
yaw_predicted = torch.sum(yaw_predicted.data * self.idx_tensor, dim=1) * 4 - 180
pitch_predicted= pitch_predicted.cpu().detach().numpy()* np.pi/180.0
yaw_predicted= yaw_predicted.cpu().detach().numpy()* np.pi/180.0
return pitch_predicted, yaw_predicted
|