import cv2 import numpy as np import mediapipe as mp from aura.gaze_tracking import GazeTracking import time import threading class ImageEnhancer: @staticmethod def enhance_image(frame): lab = cv2.cvtColor(frame, cv2.COLOR_BGR2LAB) l, a, b = cv2.split(lab) clahe = cv2.createCLAHE(clipLimit=4, tileGridSize=(8,8)) l_clahe = clahe.apply(l) lab_clahe = cv2.merge((l_clahe, a, b)) enhanced_image = cv2.cvtColor(lab_clahe, cv2.COLOR_LAB2BGR) gamma = 1.8 look_up_table = np.array([((i / 255.0) ** gamma) * 255 for i in range(256)]).astype("uint8") enhanced_image2 = cv2.LUT(enhanced_image, look_up_table) gray = cv2.cvtColor(enhanced_image2, cv2.COLOR_BGR2GRAY) edges = cv2.Canny(gray, 250, 500) edges_colored = cv2.cvtColor(edges, cv2.COLOR_GRAY2BGR) enhanced_image3 = cv2.addWeighted(enhanced_image2, 0.8, edges_colored, 0.2, 0) return enhanced_image3 class KalmanFilter: def __init__(self): self.kalman = cv2.KalmanFilter(4, 2) self.kalman.measurementMatrix = np.array([[1, 0, 0, 0], [0, 1, 0, 0]], np.float32) self.kalman.transitionMatrix = np.array([[1, 0, 1, 0], [0, 1, 0, 1], [0, 0, 1, 0], [0, 0, 0, 1]], np.float32) self.kalman.processNoiseCov = np.eye(4, dtype=np.float32) * 0.5 def correct(self, coord): return self.kalman.correct(np.array([coord[0], coord[1]], np.float32)) def predict(self): return self.kalman.predict() class GazeProcessor: def __init__(self, webcam): self.webcam = webcam self.webcam_lock = threading.Lock() self.gaze_flip = GazeTracking() self.gaze_orig = GazeTracking() self.kalman_filter = KalmanFilter() self.total_distractions = 0 self.focused = 0 self.right_distractions = 0 self.left_distractions = 0 def _get_gaze_info(self, gaze): if gaze.is_right(): return "Yes1", (0, 0, 255) elif gaze.is_left(): return "Yes", (0, 0, 255) elif gaze.is_center(): return "No", (0, 255, 0) return "", (255, 0, 0) def _get_eye_position(self, gaze): left_pupil = gaze.pupil_left_coords() right_pupil = gaze.pupil_right_coords() if left_pupil and right_pupil: avg_pupil = np.mean([left_pupil, right_pupil], axis=0) corrected_pupil = self.kalman_filter.correct(avg_pupil) predicted_pupil = self.kalman_filter.predict() return predicted_pupil return None def process_combined(self): mp_face_mesh = mp.solutions.face_mesh face_mesh = mp_face_mesh.FaceMesh(min_detection_confidence=0.5, min_tracking_confidence=0.5) mp_drawing = mp.solutions.drawing_utils drawing_spec = mp_drawing.DrawingSpec(thickness=1, circle_radius=1) while self.webcam.isOpened(): start_time = time.time() with self.webcam_lock: ret, frame = self.webcam.read() if not ret: break enhanced_frame = ImageEnhancer.enhance_image(frame) flipped_frame = cv2.flip(enhanced_frame, 1) self.gaze_flip.refresh(flipped_frame) self.gaze_orig.refresh(enhanced_frame) new_frame_orig = self.gaze_orig.annotated_frame() text_flip, _ = self._get_gaze_info(self.gaze_flip) text_orig, _ = self._get_gaze_info(self.gaze_orig) text_pose = '' image_rgb = cv2.cvtColor(enhanced_frame, cv2.COLOR_BGR2RGB) results = face_mesh.process(image_rgb) img_h, img_w, img_c = enhanced_frame.shape face_3d = [] face_2d = [] if results.multi_face_landmarks: for face_landmarks in results.multi_face_landmarks: for idx, lm in enumerate(face_landmarks.landmark): if idx in [33, 263, 1, 61, 291, 199]: if idx == 1: nose_2d = (lm.x * img_w, lm.y * img_h) nose_3d = (lm.x * img_w, lm.y * img_h, lm.z * 3000) x, y = int(lm.x * img_w), int(lm.y * img_h) face_2d.append([x, y]) face_3d.append([x, y, lm.z]) face_2d = np.array(face_2d, dtype=np.float64) face_3d = np.array(face_3d, dtype=np.float64) focal_length = 1 * img_w cam_matrix = np.array([[focal_length, 0, img_w / 2], [0, focal_length, img_h / 2], [0, 0, 1]]) dist_matrix = np.zeros((4, 1), dtype=np.float64) success, rot_vec, trans_vec = cv2.solvePnP(face_3d, face_2d, cam_matrix, dist_matrix) rmat, jac = cv2.Rodrigues(rot_vec) angles, mtxR, mtxQ, Qx, Qy, Qz = cv2.RQDecomp3x3(rmat) x = angles[0] * 360 y = angles[1] * 360 z = angles[2] * 360 if y < -13: text_pose = "Looking Right" elif y > 13: text_pose = "Looking Left" elif x < -13: text_pose = "Looking Down" elif x > 13: text_pose = "Looking Up" else: text_pose = "Forward" if text_pose != "Forward" or text_flip == "Yes1" or text_orig == "Yes1" or text_flip == 'Yes' or text_orig == 'Yes': self.total_distractions += 1 if text_pose == "Forward" and text_orig == "No" and text_flip == "No": self.focused += 1 if text_flip == "Yes1" or text_orig == "Yes" or text_pose == "Looking Left": self.left_distractions += 1 if text_flip == "Yes" or text_orig == "Yes1" or text_pose == "Looking Right": self.right_distractions += 1 cv2.putText(new_frame_orig, text_pose, (10, 140), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) mp_drawing.draw_landmarks( image=new_frame_orig, landmark_list=face_landmarks, connections=mp_face_mesh.FACEMESH_TESSELATION, landmark_drawing_spec=drawing_spec, connection_drawing_spec=drawing_spec) frame_height, frame_width = new_frame_orig.shape[:2] cv2.putText(new_frame_orig, f"Distractions: {self.total_distractions}", (10, frame_height - 450), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) cv2.putText(new_frame_orig, f"Focused: {self.focused}", (10, frame_height - 400), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) _, buffer = cv2.imencode('.jpg', new_frame_orig) frame = buffer.tobytes() yield frame def get_focus_and_distractions(self): return { 'distractions': self.total_distractions, 'focus': self.focused, 'left_distractions': self.left_distractions, 'right_distractions': self.right_distractions }