Spaces:
Build error
Build error
| import cv2 | |
| import numpy as np | |
| import mediapipe as mp | |
| from aura.gaze_tracking import GazeTracking | |
| import time | |
| import threading | |
| class ImageEnhancer: | |
| def enhance_image(frame): | |
| lab = cv2.cvtColor(frame, cv2.COLOR_BGR2LAB) | |
| l, a, b = cv2.split(lab) | |
| clahe = cv2.createCLAHE(clipLimit=4, tileGridSize=(8,8)) | |
| l_clahe = clahe.apply(l) | |
| lab_clahe = cv2.merge((l_clahe, a, b)) | |
| enhanced_image = cv2.cvtColor(lab_clahe, cv2.COLOR_LAB2BGR) | |
| gamma = 1.8 | |
| look_up_table = np.array([((i / 255.0) ** gamma) * 255 for i in range(256)]).astype("uint8") | |
| enhanced_image2 = cv2.LUT(enhanced_image, look_up_table) | |
| gray = cv2.cvtColor(enhanced_image2, cv2.COLOR_BGR2GRAY) | |
| edges = cv2.Canny(gray, 250, 500) | |
| edges_colored = cv2.cvtColor(edges, cv2.COLOR_GRAY2BGR) | |
| enhanced_image3 = cv2.addWeighted(enhanced_image2, 0.8, edges_colored, 0.2, 0) | |
| return enhanced_image3 | |
| class KalmanFilter: | |
| def __init__(self): | |
| self.kalman = cv2.KalmanFilter(4, 2) | |
| self.kalman.measurementMatrix = np.array([[1, 0, 0, 0], | |
| [0, 1, 0, 0]], np.float32) | |
| self.kalman.transitionMatrix = np.array([[1, 0, 1, 0], | |
| [0, 1, 0, 1], | |
| [0, 0, 1, 0], | |
| [0, 0, 0, 1]], np.float32) | |
| self.kalman.processNoiseCov = np.eye(4, dtype=np.float32) * 0.5 | |
| def correct(self, coord): | |
| return self.kalman.correct(np.array([coord[0], coord[1]], np.float32)) | |
| def predict(self): | |
| return self.kalman.predict() | |
| class GazeProcessor: | |
| def __init__(self, webcam): | |
| self.webcam = webcam | |
| self.webcam_lock = threading.Lock() | |
| self.gaze_flip = GazeTracking() | |
| self.gaze_orig = GazeTracking() | |
| self.kalman_filter = KalmanFilter() | |
| self.total_distractions = 0 | |
| self.focused = 0 | |
| self.right_distractions = 0 | |
| self.left_distractions = 0 | |
| def _get_gaze_info(self, gaze): | |
| if gaze.is_right(): | |
| return "Yes1", (0, 0, 255) | |
| elif gaze.is_left(): | |
| return "Yes", (0, 0, 255) | |
| elif gaze.is_center(): | |
| return "No", (0, 255, 0) | |
| return "", (255, 0, 0) | |
| def _get_eye_position(self, gaze): | |
| left_pupil = gaze.pupil_left_coords() | |
| right_pupil = gaze.pupil_right_coords() | |
| if left_pupil and right_pupil: | |
| avg_pupil = np.mean([left_pupil, right_pupil], axis=0) | |
| corrected_pupil = self.kalman_filter.correct(avg_pupil) | |
| predicted_pupil = self.kalman_filter.predict() | |
| return predicted_pupil | |
| return None | |
| def process_combined(self): | |
| mp_face_mesh = mp.solutions.face_mesh | |
| face_mesh = mp_face_mesh.FaceMesh(min_detection_confidence=0.5, min_tracking_confidence=0.5) | |
| mp_drawing = mp.solutions.drawing_utils | |
| drawing_spec = mp_drawing.DrawingSpec(thickness=1, circle_radius=1) | |
| while self.webcam.isOpened(): | |
| start_time = time.time() | |
| with self.webcam_lock: | |
| ret, frame = self.webcam.read() | |
| if not ret: | |
| break | |
| enhanced_frame = ImageEnhancer.enhance_image(frame) | |
| flipped_frame = cv2.flip(enhanced_frame, 1) | |
| self.gaze_flip.refresh(flipped_frame) | |
| self.gaze_orig.refresh(enhanced_frame) | |
| new_frame_orig = self.gaze_orig.annotated_frame() | |
| text_flip, _ = self._get_gaze_info(self.gaze_flip) | |
| text_orig, _ = self._get_gaze_info(self.gaze_orig) | |
| text_pose = '' | |
| image_rgb = cv2.cvtColor(enhanced_frame, cv2.COLOR_BGR2RGB) | |
| results = face_mesh.process(image_rgb) | |
| img_h, img_w, img_c = enhanced_frame.shape | |
| face_3d = [] | |
| face_2d = [] | |
| if results.multi_face_landmarks: | |
| for face_landmarks in results.multi_face_landmarks: | |
| for idx, lm in enumerate(face_landmarks.landmark): | |
| if idx in [33, 263, 1, 61, 291, 199]: | |
| if idx == 1: | |
| nose_2d = (lm.x * img_w, lm.y * img_h) | |
| nose_3d = (lm.x * img_w, lm.y * img_h, lm.z * 3000) | |
| x, y = int(lm.x * img_w), int(lm.y * img_h) | |
| face_2d.append([x, y]) | |
| face_3d.append([x, y, lm.z]) | |
| face_2d = np.array(face_2d, dtype=np.float64) | |
| face_3d = np.array(face_3d, dtype=np.float64) | |
| focal_length = 1 * img_w | |
| cam_matrix = np.array([[focal_length, 0, img_w / 2], | |
| [0, focal_length, img_h / 2], | |
| [0, 0, 1]]) | |
| dist_matrix = np.zeros((4, 1), dtype=np.float64) | |
| success, rot_vec, trans_vec = cv2.solvePnP(face_3d, face_2d, cam_matrix, dist_matrix) | |
| rmat, jac = cv2.Rodrigues(rot_vec) | |
| angles, mtxR, mtxQ, Qx, Qy, Qz = cv2.RQDecomp3x3(rmat) | |
| x = angles[0] * 360 | |
| y = angles[1] * 360 | |
| z = angles[2] * 360 | |
| if y < -13: | |
| text_pose = "Looking Right" | |
| elif y > 13: | |
| text_pose = "Looking Left" | |
| elif x < -13: | |
| text_pose = "Looking Down" | |
| elif x > 13: | |
| text_pose = "Looking Up" | |
| else: | |
| text_pose = "Forward" | |
| if text_pose != "Forward" or text_flip == "Yes1" or text_orig == "Yes1" or text_flip == 'Yes' or text_orig == 'Yes': | |
| self.total_distractions += 1 | |
| if text_pose == "Forward" and text_orig == "No" and text_flip == "No": | |
| self.focused += 1 | |
| if text_flip == "Yes1" or text_orig == "Yes" or text_pose == "Looking Left": | |
| self.left_distractions += 1 | |
| if text_flip == "Yes" or text_orig == "Yes1" or text_pose == "Looking Right": | |
| self.right_distractions += 1 | |
| cv2.putText(new_frame_orig, text_pose, (10, 140), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) | |
| mp_drawing.draw_landmarks( | |
| image=new_frame_orig, | |
| landmark_list=face_landmarks, | |
| connections=mp_face_mesh.FACEMESH_TESSELATION, | |
| landmark_drawing_spec=drawing_spec, | |
| connection_drawing_spec=drawing_spec) | |
| frame_height, frame_width = new_frame_orig.shape[:2] | |
| cv2.putText(new_frame_orig, f"Distractions: {self.total_distractions}", (10, frame_height - 450), | |
| cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) | |
| cv2.putText(new_frame_orig, f"Focused: {self.focused}", (10, frame_height - 400), | |
| cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) | |
| _, buffer = cv2.imencode('.jpg', new_frame_orig) | |
| frame = buffer.tobytes() | |
| yield frame | |
| def get_focus_and_distractions(self): | |
| return { | |
| 'distractions': self.total_distractions, | |
| 'focus': self.focused, | |
| 'left_distractions': self.left_distractions, | |
| 'right_distractions': self.right_distractions | |
| } |