| | import gradio as gr |
| | import cv2 |
| | import mediapipe as mp |
| | import numpy as np |
| | import logging |
| | import time |
| | from filterpy.kalman import KalmanFilter |
| | import os |
| |
|
| | |
| | logging.getLogger('mediapipe').setLevel(logging.ERROR) |
| |
|
| | |
| | mp_pose = mp.solutions.pose |
| | mp_drawing = mp.solutions.drawing_utils |
| |
|
| | pose = mp_pose.Pose( |
| | static_image_mode=False, |
| | min_detection_confidence=0.5, |
| | min_tracking_confidence=0.6, |
| | model_complexity=2, |
| | smooth_landmarks=True |
| | ) |
| |
|
| | |
| | ANGLE_SCALE = 1 |
| |
|
| | |
| | def initialize_kalman_filter(): |
| | kf = KalmanFilter(dim_x=6, dim_z=3) |
| | kf.x = np.zeros(6) |
| | kf.F = np.array([ |
| | [1, 0, 0, 1, 0, 0], |
| | [0, 1, 0, 0, 1, 0], |
| | [0, 0, 1, 0, 0, 1], |
| | [0, 0, 0, 1, 0, 0], |
| | [0, 0, 0, 0, 1, 0], |
| | [0, 0, 0, 0, 0, 1] |
| | ]) |
| | kf.H = np.array([ |
| | [1, 0, 0, 0, 0, 0], |
| | [0, 1, 0, 0, 0, 0], |
| | [0, 0, 1, 0, 0, 0] |
| | ]) |
| | kf.P *= 10. |
| | kf.R = np.diag([1.0, 1.0, 1.0]) |
| | kf.Q = np.eye(6) * 0.05 |
| | return kf |
| |
|
| | kf = initialize_kalman_filter() |
| |
|
| | |
| | target_pose = [ |
| | { |
| | "person_id": 0, |
| | "bbox": [ |
| | 260.447998046875, |
| | 434.9598693847656, |
| | 263.357177734375, |
| | 439.172119140625 |
| | ], |
| | "keypoints": [ |
| | {"name": "Nose", "x": 240.35791015625, "y": 135.41705322265625, "score": 0.9791688919067383}, |
| | {"name": "L_Eye", "x": 265.16717529296875, "y": 110.43780517578125, "score": 0.9833072428857386}, |
| | {"name": "R_Eye", "x": 210.517822265625, "y": 114.45855712890625, "score": 0.9687361121177673}, |
| | {"name": "L_Ear", "x": 301.84814453125, "y": 135.83111572265625, "score": 0.9493670302238464}, |
| | {"name": "R_Ear", "x": 175.035888671875, "y": 143.1534423828125, "score": 0.9537781476974487}, |
| | {"name": "L_Shoulder", "x": 367.36688232421875, "y": 277.89508056640625, "score": 0.9714463949203491}, |
| | {"name": "R_Shoulder", "x": 132.6015625, "y": 287.1273193359375, "score": 0.9208009243011475}, |
| | {"name": "L_Elbow", "x": 404.8804931640625, "y": 457.8016357421875, "score": 1.0068358182907104}, |
| | {"name": "R_Elbow", "x": 121.6767578125, "y": 466.985595703125, "score": 0.9445005059242249}, |
| | {"name": "L_Wrist", "x": 316.5948486328125, "y": 564.1590576171875, "score": 0.9202994108200073}, |
| | {"name": "R_Wrist", "x": 218.354248046875, "y": 578.4954833984375, "score": 0.9106894731521606}, |
| | {"name": "L_Hip", "x": 343.258056640625, "y": 562.5377197265625, "score": 0.8454821705818176}, |
| | {"name": "R_Hip", "x": 191.992431640625, "y": 569.1612548828125, "score": 0.856957733631134}, |
| | {"name": "L_Knee", "x": 394.12591552734375, "y": 672.401611328125, "score": 0.8698152899742126}, |
| | {"name": "R_Knee", "x": 143.781005859375, "y": 696.0062255859375, "score": 0.8501293659210205}, |
| | {"name": "L_Ankle", "x": 353.07330322265625, "y": 853.671142578125, "score": 0.9136713147163391}, |
| | {"name": "R_Ankle", "x": 211.80206298828125, "y": 850.3348388671875, "score": 0.8354711532592773} |
| | ] |
| | } |
| | ] |
| |
|
| | |
| | frame_width = 1280 |
| | frame_height = 720 |
| | target_keypoints = [(kp["x"], kp["y"]) for kp in target_pose[0]["keypoints"]] |
| | head_keypoint_indices = [0, 1, 2, 3, 4] |
| | head_keypoints = [target_keypoints[i] for i in head_keypoint_indices] |
| | target_head_center_x = sum(x for x, y in head_keypoints) / len(head_keypoints) |
| | target_head_center_y = sum(y for x, y in head_keypoints) / len(head_keypoints) |
| | display_center_x = frame_width / 2 |
| | display_center_y = frame_height * 0.2 |
| | translate_x = display_center_x - target_head_center_x |
| | translate_y = display_center_y - target_head_center_y |
| | centered_target_keypoints = [(x + translate_x, y + translate_y) for x, y in target_keypoints] |
| | head_keypoints_centered = [centered_target_keypoints[i] for i in head_keypoint_indices] |
| | x_coords = [x for x, y in head_keypoints_centered] |
| | y_coords = [y for x, y in head_keypoints_centered] |
| | bbox_min_x = max(0, min(x_coords) - 20) |
| | bbox_max_x = min(frame_width, max(x_coords) + 20) |
| | bbox_min_y = max(0, min(y_coords) - 20) |
| | bbox_max_y = min(frame_height, max(y_coords) + 20) |
| |
|
| | |
| | def euclidean_distance(p1, p2): |
| | return np.sqrt((p1[0] - p2[0])**2 + (p1[1] - p2[1])**2) |
| |
|
| | def is_head_pose_matched(user_landmarks, target_keypoints, distance_threshold=25): |
| | head_indices_mapping = {0: 0, 2: 1, 5: 2, 7: 3, 8: 4} |
| | for mp_idx, target_idx in head_indices_mapping.items(): |
| | if mp_idx < len(user_landmarks) and target_idx < len(target_keypoints): |
| | distance = euclidean_distance(user_landmarks[mp_idx], target_keypoints[target_idx]) |
| | if distance > distance_threshold: |
| | return False |
| | return True |
| |
|
| | def is_full_body_visible(landmarks, frame_width, frame_height): |
| | key_landmarks = [ |
| | mp_pose.PoseLandmark.LEFT_SHOULDER, |
| | mp_pose.PoseLandmark.RIGHT_SHOULDER, |
| | mp_pose.PoseLandmark.LEFT_HIP, |
| | mp_pose.PoseLandmark.RIGHT_HIP, |
| | ] |
| | for landmark in key_landmarks: |
| | lm = landmarks[landmark] |
| | if (lm.visibility < 0.6 or |
| | lm.x < 0.05 or lm.x > 0.95 or |
| | lm.y < 0.05 or lm.y > 0.95): |
| | return False |
| | return True |
| |
|
| | def _calculate_raw_head_angles_user_method(landmark_list): |
| | required_indices = [mp_pose.PoseLandmark.NOSE, mp_pose.PoseLandmark.LEFT_EAR, mp_pose.PoseLandmark.RIGHT_EAR, |
| | mp_pose.PoseLandmark.LEFT_EYE, mp_pose.PoseLandmark.RIGHT_EYE] |
| | if landmark_list is None or len(landmark_list) <= max(idx.value for idx in required_indices): |
| | return None |
| | for l_idx_enum in required_indices: |
| | if landmark_list[l_idx_enum.value].visibility < 0.5: |
| | return None |
| | nose = landmark_list[mp_pose.PoseLandmark.NOSE.value] |
| | left_ear = landmark_list[mp_pose.PoseLandmark.LEFT_EAR.value] |
| | right_ear = landmark_list[mp_pose.PoseLandmark.RIGHT_EAR.value] |
| | left_eye = landmark_list[mp_pose.PoseLandmark.LEFT_EYE.value] |
| | right_eye = landmark_list[mp_pose.PoseLandmark.RIGHT_EYE.value] |
| | mid_ear = np.array([(left_ear.x + right_ear.x) / 2, |
| | (left_ear.y + right_ear.y) / 2, |
| | (left_ear.z + right_ear.z) / 2]) |
| | nose_vec = mid_ear - np.array([nose.x, nose.y, nose.z]) |
| | yaw = -np.degrees(np.arctan2(nose_vec[0], nose_vec[2] + 1e-6)) |
| | eye_mid = np.array([(left_eye.x + right_eye.x) / 2, |
| | (left_eye.y + right_eye.y) / 2, |
| | (left_eye.z + right_eye.z) / 2]) |
| | nose_to_eye = np.array([nose.x, nose.y, nose.z]) - eye_mid |
| | pitch = np.degrees(np.arctan2(nose_to_eye[1], np.sqrt(nose_to_eye[0]**2 + nose_to_eye[2]**2 + 1e-6))) |
| | ear_vec_2d = np.array([left_ear.x - right_ear.x, left_ear.y - right_ear.y]) |
| | roll = np.degrees(np.arctan2(ear_vec_2d[1], ear_vec_2d[0] + 1e-6)) |
| | return yaw, -(pitch - 50), roll |
| |
|
| | def get_head_angles(pose_results): |
| | raw_yaw, raw_pitch, raw_roll = 0.0, 0.0, 0.0 |
| | if pose_results and pose_results.pose_landmarks: |
| | try: |
| | angles = _calculate_raw_head_angles_user_method( |
| | pose_results.pose_landmarks.landmark |
| | ) |
| | if angles is not None: |
| | raw_yaw, raw_pitch, raw_roll = angles |
| | except Exception as e: |
| | logging.error(f"Error in get_head_angles: {e}") |
| | kf.predict() |
| | kf.update(np.array([raw_yaw, raw_pitch, raw_roll])) |
| | smoothed_yaw, smoothed_pitch, smoothed_roll = kf.x[:3] |
| | return smoothed_yaw * ANGLE_SCALE * 3, smoothed_pitch * ANGLE_SCALE, smoothed_roll * ANGLE_SCALE |
| |
|
| | def wrap_angle_180(angle): |
| | wrapped_angle = np.fmod(angle + 180, 360) |
| | if wrapped_angle < 0: |
| | wrapped_angle += 360 |
| | return wrapped_angle - 180 |
| |
|
| | def process_frame(frame, state): |
| | frame = cv2.resize(frame, (frame_width, frame_height)) |
| | image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
| | pose_results = pose.process(image_rgb) |
| | frame = cv2.cvtColor(image_rgb, cv2.COLOR_RGB2BGR) |
| |
|
| | current_time = time.time() |
| | instruction = "" |
| | current_head_yaw, current_head_pitch, current_head_roll = 0, 0, 0 |
| |
|
| | |
| | if state is None: |
| | state = { |
| | "visibility_confirmed": False, |
| | "pose_held": False, |
| | "bppv_step_1": False, |
| | "bppv_step_2": False, |
| | "bppv_step_3": False, |
| | "bppv_step_4": False, |
| | "mission_complete": False, |
| | "step_3_complete": False, |
| | "all_missions_complete": False, |
| | "match_start_time": None, |
| | "bppv_start_time": None, |
| | "bppv_pose_held_time": 0, |
| | "last_speech_time": current_time, |
| | "speech_interval": 3, |
| | "in_correct_pose_step_1": False, |
| | "in_correct_pose_step_2": False, |
| | "in_correct_pose_step_3": False, |
| | "in_correct_pose_step_4": False, |
| | "head_shake_count": 0, |
| | "head_shake_complete": False, |
| | "last_yaw": 0, |
| | "yaw_direction": 0, |
| | "yaw_threshold": 15, |
| | "match_duration_threshold": 5, |
| | "bppv_duration_threshold": 30, |
| | "neutral_hold_threshold": 5, |
| | "target_yaw_min_step_1": 25, |
| | "target_yaw_max_step_1": 65, |
| | "target_yaw_min_step_2": -20, |
| | "target_yaw_max_step_2": 20, |
| | "target_pitch_min_step_2": 70, |
| | "target_pitch_max_step_2": 110, |
| | "target_roll_min_step_2": -120, |
| | "target_roll_max_step_2": -80, |
| | "target_yaw_min_step_3": 153, |
| | "target_yaw_max_step_3": 193, |
| | "target_pitch_min_step_3": 17, |
| | "target_pitch_max_step_3": 57, |
| | "target_roll_min_step_3": 77, |
| | "target_roll_max_step_3": 117, |
| | "target_yaw_min_step_4": -160, |
| | "target_yaw_max_step_4": 160, |
| | "target_pitch_min_step_4": 0, |
| | "target_pitch_max_step_4": 30, |
| | "target_roll_min_step_4": -160, |
| | "target_roll_max_step_4": 160 |
| | } |
| |
|
| | if pose_results.pose_landmarks: |
| | landmarks = pose_results.pose_landmarks.landmark |
| | user_landmarks = [(lm.x * frame_width, lm.y * frame_height) for lm in landmarks] |
| |
|
| | |
| | if not state["visibility_confirmed"]: |
| | if is_full_body_visible(landmarks, frame_width, frame_height): |
| | state["visibility_confirmed"] = True |
| | instruction = "Full body visibility confirmed. Please adjust your head to match the position that your eye and nose points are fully inside the box and box should be green." |
| | cv2.putText(frame, "Visibility Confirmed!", (frame_width // 4, frame_height // 2 - 50), |
| | cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA) |
| | else: |
| | instruction = "Please move back to ensure your full body is visible in the frame." |
| | cv2.putText(frame, "Please move back for full body visibility", (frame_width // 4 - 50, frame_height // 2), |
| | cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 3, cv2.LINE_AA) |
| | state["match_start_time"] = None |
| | state["pose_held"] = False |
| | state["bppv_step_1"] = False |
| | state["bppv_step_2"] = False |
| | state["bppv_step_3"] = False |
| | state["bppv_step_4"] = False |
| | state["mission_complete"] = False |
| | state["step_3_complete"] = False |
| | state["all_missions_complete"] = False |
| | state["in_correct_pose_step_1"] = False |
| | state["in_correct_pose_step_2"] = False |
| | state["in_correct_pose_step_3"] = False |
| | state["in_correct_pose_step_4"] = False |
| | state["head_shake_count"] = 0 |
| | state["head_shake_complete"] = False |
| | state["yaw_direction"] = 0 |
| |
|
| | |
| | elif state["visibility_confirmed"] and not state["pose_held"]: |
| | head_pose_matched = is_head_pose_matched(user_landmarks, centered_target_keypoints) |
| | bbox_color = (0, 255, 0) if head_pose_matched else (0, 0, 255) |
| | cv2.rectangle(frame, (int(bbox_min_x), int(bbox_min_y)), (int(bbox_max_x), int(bbox_max_y)), |
| | bbox_color, 2) |
| |
|
| | if head_pose_matched: |
| | if state["match_start_time"] is None: |
| | state["match_start_time"] = current_time |
| | instruction = "Hold your head in this position." |
| | else: |
| | elapsed_time = current_time - state["match_start_time"] |
| | if elapsed_time >= state["match_duration_threshold"]: |
| | state["pose_held"] = True |
| | state["bppv_step_1"] = True |
| | instruction = "Calibration complete. Now turn your head 45 degrees to the right and hold for 30 seconds." |
| | state["bppv_start_time"] = current_time |
| | else: |
| | remaining_time = max(0, state["match_duration_threshold"] - elapsed_time) |
| | instruction = f"Hold head pose for {remaining_time:.1f} seconds." |
| | cv2.putText(frame, f"Hold Head Pose for {remaining_time:.1f}s", |
| | (frame_width // 4, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) |
| | else: |
| | state["match_start_time"] = None |
| | instruction = "Adjust your head to make the box green for 5 seconds." |
| | cv2.putText(frame, "Adjust eye and nose in the centre of box", (frame_width // 4, 50), |
| | cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) |
| |
|
| | |
| | elif state["pose_held"] and state["bppv_step_1"] and not state["mission_complete"]: |
| | current_head_yaw, current_head_pitch, current_head_roll = get_head_angles(pose_results) |
| | display_yaw = wrap_angle_180(current_head_yaw) |
| | display_pitch = wrap_angle_180(current_head_pitch) |
| | display_roll = wrap_angle_180(current_head_roll) |
| |
|
| | yaw_correct = state["target_yaw_min_step_1"] <= display_yaw <= state["target_yaw_max_step_1"] |
| |
|
| | if yaw_correct: |
| | if not state["in_correct_pose_step_1"]: |
| | instruction = "Hold this position for 30 seconds." |
| | state["in_correct_pose_step_1"] = True |
| | if state["bppv_start_time"] is None: |
| | state["bppv_start_time"] = current_time |
| | state["bppv_pose_held_time"] = current_time - state["bppv_start_time"] |
| | remaining_time = max(0, state["bppv_duration_threshold"] - state["bppv_pose_held_time"]) |
| | cv2.putText(frame, f"Hold Head at this position for {remaining_time:.1f}s", |
| | (frame_width // 4, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) |
| | instruction = f"Hold your head at this position. {remaining_time:.1f} seconds remaining." |
| | if state["bppv_pose_held_time"] >= state["bppv_duration_threshold"]: |
| | state["mission_complete"] = True |
| | state["bppv_step_2"] = True |
| | instruction = "Step 1 complete. Now, slowly lie down on your left side, so that your right ear rests on the bed. Keep your head aligned—same position as before. Hold this pose for 30 seconds and stay relaxed." |
| | state["bppv_start_time"] = None |
| | state["bppv_pose_held_time"] = 0 |
| | state["in_correct_pose_step_1"] = False |
| | else: |
| | state["bppv_start_time"] = None |
| | state["in_correct_pose_step_1"] = False |
| | if display_yaw < state["target_yaw_min_step_1"]: |
| | instruction = "Turn your head further to the right." |
| | cv2.putText(frame, "Turn head further right", (frame_width // 4, 50), |
| | cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) |
| | elif display_yaw > state["target_yaw_max_step_1"]: |
| | instruction = "Turn your head back to the left." |
| | cv2.putText(frame, "Turn head back left", (frame_width // 4, 50), |
| | cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) |
| |
|
| | cv2.putText(frame, f"Yaw: {int(display_yaw)}", (10, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) |
| | cv2.putText(frame, f"Pitch: {int(display_pitch)}", (10, 150), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) |
| | cv2.putText(frame, f"Roll: {int(display_roll)}", (10, 200), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) |
| |
|
| | |
| | elif state["mission_complete"] and state["bppv_step_2"] and not state["step_3_complete"]: |
| | current_head_yaw, current_head_pitch, current_head_roll = get_head_angles(pose_results) |
| | display_yaw = wrap_angle_180(current_head_yaw) |
| | display_pitch = wrap_angle_180(current_head_pitch) |
| | display_roll = wrap_angle_180(current_head_roll) |
| |
|
| | yaw_correct = state["target_yaw_min_step_2"] <= display_yaw <= state["target_yaw_max_step_2"] |
| | pitch_correct = state["target_pitch_min_step_2"] <= display_pitch <= state["target_pitch_max_step_2"] |
| | roll_correct = state["target_roll_min_step_2"] <= display_roll <= state["target_roll_max_step_2"] |
| | pose_correct = yaw_correct and pitch_correct and roll_correct |
| |
|
| | if pose_correct: |
| | if not state["in_correct_pose_step_2"]: |
| | instruction = "Hold this position for 30 seconds." |
| | state["in_correct_pose_step_2"] = True |
| | if state["bppv_start_time"] is None: |
| | state["bppv_start_time"] = current_time |
| | state["bppv_pose_held_time"] = current_time - state["bppv_start_time"] |
| | remaining_time = max(0, state["bppv_duration_threshold"] - state["bppv_pose_held_time"]) |
| | cv2.putText(frame, f"Hold Head at this position for {remaining_time:.1f}s", |
| | (frame_width // 4, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) |
| | instruction = f"Hold your head in this position. {remaining_time:.1f} seconds remaining." |
| | if state["bppv_pose_held_time"] >= state["bppv_duration_threshold"]: |
| | state["step_3_complete"] = True |
| | state["bppv_step_3"] = True |
| | instruction = "Step 2 complete. Stay with your head at the same angle, and roll your body to the right and hold for 30 seconds." |
| | state["bppv_start_time"] = None |
| | state["bppv_pose_held_time"] = 0 |
| | state["in_correct_pose_step_2"] = False |
| | else: |
| | state["bppv_start_time"] = None |
| | state["in_correct_pose_step_2"] = False |
| | error_messages = [] |
| | if not yaw_correct: |
| | if display_yaw < state["target_yaw_min_step_2"]: |
| | error_messages.append("Turn your head to the left.") |
| | elif display_yaw > state["target_yaw_max_step_2"]: |
| | error_messages.append("Turn your head to the right.") |
| | if not pitch_correct: |
| | if display_pitch < state["target_pitch_min_step_2"]: |
| | error_messages.append("Tilt your head further up.") |
| | elif display_pitch > state["target_pitch_max_step_2"]: |
| | error_messages.append("Tilt your head down.") |
| | if not roll_correct: |
| | if display_roll < state["target_roll_min_step_2"]: |
| | error_messages.append("Bend your head more to the left.") |
| | elif display_roll > state["target_roll_max_step_2"]: |
| | error_messages.append("Bend your head to the right.") |
| | instruction = " ".join(error_messages) if error_messages else "Adjust head to target pose." |
| | cv2.putText(frame, instruction, (frame_width // 4, 50), |
| | cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) |
| |
|
| | cv2.putText(frame, f"Yaw: {int(display_yaw)}", (10, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) |
| | cv2.putText(frame, f"Pitch: {int(display_pitch)}", (10, 150), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) |
| | cv2.putText(frame, f"Roll: {int(display_roll)}", (10, 200), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) |
| |
|
| | |
| | elif state["step_3_complete"] and state["bppv_step_3"] and not state["bppv_step_4"]: |
| | current_head_yaw, current_head_pitch, current_head_roll = get_head_angles(pose_results) |
| | display_yaw = wrap_angle_180(current_head_yaw) |
| | display_pitch = wrap_angle_180(current_head_pitch) |
| | display_roll = wrap_angle_180(current_head_roll) |
| |
|
| | yaw_correct = state["target_yaw_min_step_3"] <= display_yaw <= state["target_yaw_max_step_3"] |
| | pitch_correct = state["target_pitch_min_step_3"] <= display_pitch <= state["target_pitch_max_step_3"] |
| | roll_correct = state["target_roll_min_step_3"] <= display_roll <= state["target_roll_max_step_3"] |
| | pose_correct = yaw_correct and pitch_correct and roll_correct |
| |
|
| | if pose_correct: |
| | if not state["in_correct_pose_step_3"]: |
| | instruction = "Hold this position for 30 seconds." |
| | state["in_correct_pose_step_3"] = True |
| | if state["bppv_start_time"] is None: |
| | state["bppv_start_time"] = current_time |
| | state["bppv_pose_held_time"] = current_time - state["bppv_start_time"] |
| | remaining_time = max(0, state["bppv_duration_threshold"] - state["bppv_pose_held_time"]) |
| | cv2.putText(frame, f"Hold Head at this position for {remaining_time:.1f}s", |
| | (frame_width // 4, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) |
| | instruction = f"Hold your head in this position. {remaining_time:.1f} seconds remaining." |
| | if state["bppv_pose_held_time"] >= state["bppv_duration_threshold"]: |
| | state["bppv_step_4"] = True |
| | instruction = "Step 3 complete. Now shake your head side to side 2 to 3 times, then sit on the opposite side of the bed in a neutral position." |
| | state["bppv_start_time"] = None |
| | state["bppv_pose_held_time"] = 0 |
| | state["in_correct_pose_step_3"] = False |
| | state["last_yaw"] = display_yaw |
| | else: |
| | state["bppv_start_time"] = None |
| | state["in_correct_pose_step_3"] = False |
| | error_messages = [] |
| | if not yaw_correct: |
| | if display_yaw < state["target_yaw_min_step_3"]: |
| | error_messages.append("Turn your head further to the left.") |
| | elif display_yaw > state["target_yaw_max_step_3"]: |
| | error_messages.append("Turn your head back to the right.") |
| | if not pitch_correct: |
| | if display_pitch < state["target_pitch_min_step_3"]: |
| | error_messages.append("Tilt your head further up.") |
| | elif display_pitch > state["target_pitch_max_step_3"]: |
| | error_messages.append("Tilt your head down.") |
| | if not roll_correct: |
| | if display_roll < state["target_roll_min_step_3"]: |
| | error_messages.append("Bend your head more to the right.") |
| | elif display_roll > state["target_roll_max_step_3"]: |
| | error_messages.append("Bend your head to the left.") |
| | instruction = " ".join(error_messages) if error_messages else "Adjust head to target pose." |
| | cv2.putText(frame, instruction, (frame_width // 4, 50), |
| | cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) |
| |
|
| | cv2.putText(frame, f"Yaw: {int(display_yaw)}", (10, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) |
| | cv2.putText(frame, f"Pitch: {int(display_pitch)}", (10, 150), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) |
| | cv2.putText(frame, f"Roll: {int(display_roll)}", (10, 200), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) |
| |
|
| | |
| | elif state["bppv_step_4"] and not state["all_missions_complete"]: |
| | current_head_yaw, current_head_pitch, current_head_roll = get_head_angles(pose_results) |
| | display_yaw = wrap_angle_180(current_head_yaw) |
| | display_pitch = wrap_angle_180(current_head_pitch) |
| | display_roll = wrap_angle_180(current_head_roll) |
| |
|
| | if not state["head_shake_complete"]: |
| | yaw_change = display_yaw - state["last_yaw"] |
| | if yaw_change > state["yaw_threshold"] and state["yaw_direction"] != 1: |
| | state["yaw_direction"] = 1 |
| | state["head_shake_count"] += 0.5 |
| | elif yaw_change < -state["yaw_threshold"] and state["yaw_direction"] != -1: |
| | state["yaw_direction"] = -1 |
| | state["head_shake_count"] += 0.5 |
| |
|
| | state["last_yaw"] = display_yaw |
| |
|
| | if state["head_shake_count"] < 2: |
| | instruction = f"Shake head side to side ({int(state['head_shake_count']*2)}/2-3 shakes)" |
| | cv2.putText(frame, instruction, (frame_width // 4, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) |
| | else: |
| | state["head_shake_complete"] = True |
| | instruction = "Now sit on the opposite side of the bed in a neutral position." |
| | state["bppv_start_time"] = None |
| | state["in_correct_pose_step_4"] = False |
| |
|
| | else: |
| | yaw_correct = (display_yaw < state["target_yaw_min_step_4"]) or (display_yaw > state["target_yaw_max_step_4"]) |
| | pitch_correct = state["target_pitch_min_step_4"] <= display_pitch <= state["target_pitch_max_step_4"] |
| | roll_correct = (display_roll < state["target_roll_min_step_4"]) or (display_roll > state["target_roll_max_step_4"]) |
| | pose_correct = yaw_correct and pitch_correct and roll_correct |
| |
|
| | if pose_correct: |
| | if not state["in_correct_pose_step_4"]: |
| | instruction = "Hold this neutral position for 5 seconds." |
| | state["in_correct_pose_step_4"] = True |
| | if state["bppv_start_time"] is None: |
| | state["bppv_start_time"] = current_time |
| | state["bppv_pose_held_time"] = current_time - state["bppv_start_time"] |
| | remaining_time = max(0, state["neutral_hold_threshold"] - state["bppv_pose_held_time"]) |
| | cv2.putText(frame, f"Hold Neutral Position for {remaining_time:.1f}s", |
| | (frame_width // 4, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) |
| | instruction = f"Hold this neutral position. {remaining_time:.1f} seconds remaining." |
| | if state["bppv_pose_held_time"] >= state["neutral_hold_threshold"]: |
| | state["all_missions_complete"] = True |
| | instruction = "You have successfully completed the maneuver." |
| | else: |
| | state["bppv_start_time"] = None |
| | state["in_correct_pose_step_4"] = False |
| | error_messages = [] |
| | if not yaw_correct: |
| | if display_yaw >= 0: |
| | error_messages.append("Turn your head further right.") |
| | else: |
| | error_messages.append("Turn your head further left.") |
| | if not pitch_correct: |
| | if display_pitch < state["target_pitch_min_step_4"]: |
| | error_messages.append("Tilt your head further up.") |
| | elif display_pitch > state["target_pitch_max_step_4"]: |
| | error_messages.append("Tilt your head down.") |
| | if not roll_correct: |
| | if display_roll < state["target_roll_min_step_4"]: |
| | error_messages.append("Bend your head to the right.") |
| | elif display_roll > state["target_roll_max_step_4"]: |
| | error_messages.append("Bend your head to the left.") |
| | instruction = " ".join(error_messages) if error_messages else "Adjust to neutral position." |
| | cv2.putText(frame, instruction, (frame_width // 4, 50), |
| | cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) |
| |
|
| | cv2.putText(frame, f"Yaw: {int(display_yaw)}", (10, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) |
| | cv2.putText(frame, f"Pitch: {int(display_pitch)}", (10, 150), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) |
| | cv2.putText(frame, f"Roll: {int(display_roll)}", (10, 200), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) |
| |
|
| | |
| | elif state["all_missions_complete"]: |
| | instruction = "Epley Maneuver Guider Complete!" |
| | cv2.putText(frame, instruction, (frame_width // 4, frame_height // 2), |
| | cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 255, 0), 3, cv2.LINE_AA) |
| | current_head_yaw, current_head_pitch, current_head_roll = get_head_angles(pose_results) |
| | display_yaw = wrap_angle_180(current_head_yaw) |
| | display_pitch = wrap_angle_180(current_head_pitch) |
| | display_roll = wrap_angle_180(current_head_roll) |
| | cv2.putText(frame, f"Yaw: {int(display_yaw)}", (10, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) |
| | cv2.putText(frame, f"Pitch: {int(display_pitch)}", (10, 150), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) |
| | cv2.putText(frame, f"Roll: {int(display_roll)}", (10, 200), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) |
| |
|
| | mp_drawing.draw_landmarks(frame, pose_results.pose_landmarks, mp_pose.POSE_CONNECTIONS, |
| | landmark_drawing_spec=mp_drawing.DrawingSpec(color=(255, 0, 0), thickness=2, circle_radius=4), |
| | connection_drawing_spec=mp_drawing.DrawingSpec(color=(255, 0, 0), thickness=2)) |
| |
|
| | return frame, instruction, state |
| |
|
| | def webcam_stream(frame, state): |
| | """ |
| | Process webcam frames in real-time for Gradio streaming. |
| | """ |
| | if frame is None: |
| | return None, "No frame received.", state |
| | frame, instruction, state = process_frame(frame, state) |
| | return frame, instruction, state |
| |
|
| | |
| | with gr.Blocks() as demo: |
| | gr.Markdown("# AI-Based BPPV Maneuver Guider") |
| | gr.Markdown("Use your webcam to guide you through the Epley Maneuver for BPPV treatment. Follow the on-screen instructions. Note: Webcam streaming works best when running locally.") |
| |
|
| | with gr.Row(): |
| | with gr.Column(): |
| | webcam_input = gr.Video(label="Webcam Feed", source="webcam", live=True) |
| | reset_button = gr.Button("Reset State") |
| | with gr.Column(): |
| | output_image = gr.Image(label="Processed Frame", streaming=True) |
| | instruction_output = gr.Textbox(label="Instructions") |
| |
|
| | state = gr.State() |
| |
|
| | |
| | webcam_input.stream( |
| | fn=webcam_stream, |
| | inputs=[webcam_input, state], |
| | outputs=[output_image, instruction_output, state], |
| | _js="""(inputs) => { |
| | // Ensure webcam is accessed |
| | return inputs; |
| | }""" |
| | ) |
| |
|
| | |
| | reset_button.click( |
| | fn=lambda: None, |
| | inputs=None, |
| | outputs=state |
| | ) |
| |
|
| | if __name__ == "__main__": |
| | demo.launch() |