import gradio as gr import cv2 import mediapipe as mp import numpy as np import logging import time from filterpy.kalman import KalmanFilter import os # Suppress MediaPipe warnings logging.getLogger('mediapipe').setLevel(logging.ERROR) # Initialize MediaPipe Pose mp_pose = mp.solutions.pose mp_drawing = mp.solutions.drawing_utils pose = mp_pose.Pose( static_image_mode=False, min_detection_confidence=0.5, min_tracking_confidence=0.6, model_complexity=2, smooth_landmarks=True ) # Define scaling factor for angles ANGLE_SCALE = 1 # Initialize Kalman Filter for smoothing angles def initialize_kalman_filter(): kf = KalmanFilter(dim_x=6, dim_z=3) kf.x = np.zeros(6) kf.F = np.array([ [1, 0, 0, 1, 0, 0], [0, 1, 0, 0, 1, 0], [0, 0, 1, 0, 0, 1], [0, 0, 0, 1, 0, 0], [0, 0, 0, 0, 1, 0], [0, 0, 0, 0, 0, 1] ]) kf.H = np.array([ [1, 0, 0, 0, 0, 0], [0, 1, 0, 0, 0, 0], [0, 0, 1, 0, 0, 0] ]) kf.P *= 10. kf.R = np.diag([1.0, 1.0, 1.0]) kf.Q = np.eye(6) * 0.05 return kf kf = initialize_kalman_filter() # Load target pose (same as original) target_pose = [ { "person_id": 0, "bbox": [ 260.447998046875, 434.9598693847656, 263.357177734375, 439.172119140625 ], "keypoints": [ {"name": "Nose", "x": 240.35791015625, "y": 135.41705322265625, "score": 0.9791688919067383}, {"name": "L_Eye", "x": 265.16717529296875, "y": 110.43780517578125, "score": 0.9833072428857386}, {"name": "R_Eye", "x": 210.517822265625, "y": 114.45855712890625, "score": 0.9687361121177673}, {"name": "L_Ear", "x": 301.84814453125, "y": 135.83111572265625, "score": 0.9493670302238464}, {"name": "R_Ear", "x": 175.035888671875, "y": 143.1534423828125, "score": 0.9537781476974487}, {"name": "L_Shoulder", "x": 367.36688232421875, "y": 277.89508056640625, "score": 0.9714463949203491}, {"name": "R_Shoulder", "x": 132.6015625, "y": 287.1273193359375, "score": 0.9208009243011475}, {"name": "L_Elbow", "x": 404.8804931640625, "y": 457.8016357421875, "score": 1.0068358182907104}, {"name": "R_Elbow", "x": 121.6767578125, "y": 466.985595703125, "score": 0.9445005059242249}, {"name": "L_Wrist", "x": 316.5948486328125, "y": 564.1590576171875, "score": 0.9202994108200073}, {"name": "R_Wrist", "x": 218.354248046875, "y": 578.4954833984375, "score": 0.9106894731521606}, {"name": "L_Hip", "x": 343.258056640625, "y": 562.5377197265625, "score": 0.8454821705818176}, {"name": "R_Hip", "x": 191.992431640625, "y": 569.1612548828125, "score": 0.856957733631134}, {"name": "L_Knee", "x": 394.12591552734375, "y": 672.401611328125, "score": 0.8698152899742126}, {"name": "R_Knee", "x": 143.781005859375, "y": 696.0062255859375, "score": 0.8501293659210205}, {"name": "L_Ankle", "x": 353.07330322265625, "y": 853.671142578125, "score": 0.9136713147163391}, {"name": "R_Ankle", "x": 211.80206298828125, "y": 850.3348388671875, "score": 0.8354711532592773} ] } ] # Extract and center target keypoints frame_width = 1280 frame_height = 720 target_keypoints = [(kp["x"], kp["y"]) for kp in target_pose[0]["keypoints"]] head_keypoint_indices = [0, 1, 2, 3, 4] head_keypoints = [target_keypoints[i] for i in head_keypoint_indices] target_head_center_x = sum(x for x, y in head_keypoints) / len(head_keypoints) target_head_center_y = sum(y for x, y in head_keypoints) / len(head_keypoints) display_center_x = frame_width / 2 display_center_y = frame_height * 0.2 translate_x = display_center_x - target_head_center_x translate_y = display_center_y - target_head_center_y centered_target_keypoints = [(x + translate_x, y + translate_y) for x, y in target_keypoints] head_keypoints_centered = [centered_target_keypoints[i] for i in head_keypoint_indices] x_coords = [x for x, y in head_keypoints_centered] y_coords = [y for x, y in head_keypoints_centered] bbox_min_x = max(0, min(x_coords) - 20) bbox_max_x = min(frame_width, max(x_coords) + 20) bbox_min_y = max(0, min(y_coords) - 20) bbox_max_y = min(frame_height, max(y_coords) + 20) # Helper functions def euclidean_distance(p1, p2): return np.sqrt((p1[0] - p2[0])**2 + (p1[1] - p2[1])**2) def is_head_pose_matched(user_landmarks, target_keypoints, distance_threshold=25): head_indices_mapping = {0: 0, 2: 1, 5: 2, 7: 3, 8: 4} for mp_idx, target_idx in head_indices_mapping.items(): if mp_idx < len(user_landmarks) and target_idx < len(target_keypoints): distance = euclidean_distance(user_landmarks[mp_idx], target_keypoints[target_idx]) if distance > distance_threshold: return False return True def is_full_body_visible(landmarks, frame_width, frame_height): key_landmarks = [ mp_pose.PoseLandmark.LEFT_SHOULDER, mp_pose.PoseLandmark.RIGHT_SHOULDER, mp_pose.PoseLandmark.LEFT_HIP, mp_pose.PoseLandmark.RIGHT_HIP, ] for landmark in key_landmarks: lm = landmarks[landmark] if (lm.visibility < 0.6 or lm.x < 0.05 or lm.x > 0.95 or lm.y < 0.05 or lm.y > 0.95): return False return True def _calculate_raw_head_angles_user_method(landmark_list): required_indices = [mp_pose.PoseLandmark.NOSE, mp_pose.PoseLandmark.LEFT_EAR, mp_pose.PoseLandmark.RIGHT_EAR, mp_pose.PoseLandmark.LEFT_EYE, mp_pose.PoseLandmark.RIGHT_EYE] if landmark_list is None or len(landmark_list) <= max(idx.value for idx in required_indices): return None for l_idx_enum in required_indices: if landmark_list[l_idx_enum.value].visibility < 0.5: return None nose = landmark_list[mp_pose.PoseLandmark.NOSE.value] left_ear = landmark_list[mp_pose.PoseLandmark.LEFT_EAR.value] right_ear = landmark_list[mp_pose.PoseLandmark.RIGHT_EAR.value] left_eye = landmark_list[mp_pose.PoseLandmark.LEFT_EYE.value] right_eye = landmark_list[mp_pose.PoseLandmark.RIGHT_EYE.value] mid_ear = np.array([(left_ear.x + right_ear.x) / 2, (left_ear.y + right_ear.y) / 2, (left_ear.z + right_ear.z) / 2]) nose_vec = mid_ear - np.array([nose.x, nose.y, nose.z]) yaw = -np.degrees(np.arctan2(nose_vec[0], nose_vec[2] + 1e-6)) eye_mid = np.array([(left_eye.x + right_eye.x) / 2, (left_eye.y + right_eye.y) / 2, (left_eye.z + right_eye.z) / 2]) nose_to_eye = np.array([nose.x, nose.y, nose.z]) - eye_mid pitch = np.degrees(np.arctan2(nose_to_eye[1], np.sqrt(nose_to_eye[0]**2 + nose_to_eye[2]**2 + 1e-6))) ear_vec_2d = np.array([left_ear.x - right_ear.x, left_ear.y - right_ear.y]) roll = np.degrees(np.arctan2(ear_vec_2d[1], ear_vec_2d[0] + 1e-6)) return yaw, -(pitch - 50), roll def get_head_angles(pose_results): raw_yaw, raw_pitch, raw_roll = 0.0, 0.0, 0.0 if pose_results and pose_results.pose_landmarks: try: angles = _calculate_raw_head_angles_user_method( pose_results.pose_landmarks.landmark ) if angles is not None: raw_yaw, raw_pitch, raw_roll = angles except Exception as e: logging.error(f"Error in get_head_angles: {e}") kf.predict() kf.update(np.array([raw_yaw, raw_pitch, raw_roll])) smoothed_yaw, smoothed_pitch, smoothed_roll = kf.x[:3] return smoothed_yaw * ANGLE_SCALE * 3, smoothed_pitch * ANGLE_SCALE, smoothed_roll * ANGLE_SCALE def wrap_angle_180(angle): wrapped_angle = np.fmod(angle + 180, 360) if wrapped_angle < 0: wrapped_angle += 360 return wrapped_angle - 180 def process_frame(frame, state): frame = cv2.resize(frame, (frame_width, frame_height)) image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) pose_results = pose.process(image_rgb) frame = cv2.cvtColor(image_rgb, cv2.COLOR_RGB2BGR) current_time = time.time() instruction = "" current_head_yaw, current_head_pitch, current_head_roll = 0, 0, 0 # Initialize or update state if state is None: state = { "visibility_confirmed": False, "pose_held": False, "bppv_step_1": False, "bppv_step_2": False, "bppv_step_3": False, "bppv_step_4": False, "mission_complete": False, "step_3_complete": False, "all_missions_complete": False, "match_start_time": None, "bppv_start_time": None, "bppv_pose_held_time": 0, "last_speech_time": current_time, "speech_interval": 3, "in_correct_pose_step_1": False, "in_correct_pose_step_2": False, "in_correct_pose_step_3": False, "in_correct_pose_step_4": False, "head_shake_count": 0, "head_shake_complete": False, "last_yaw": 0, "yaw_direction": 0, "yaw_threshold": 15, "match_duration_threshold": 5, "bppv_duration_threshold": 30, "neutral_hold_threshold": 5, "target_yaw_min_step_1": 25, "target_yaw_max_step_1": 65, "target_yaw_min_step_2": -20, "target_yaw_max_step_2": 20, "target_pitch_min_step_2": 70, "target_pitch_max_step_2": 110, "target_roll_min_step_2": -120, "target_roll_max_step_2": -80, "target_yaw_min_step_3": 153, "target_yaw_max_step_3": 193, "target_pitch_min_step_3": 17, "target_pitch_max_step_3": 57, "target_roll_min_step_3": 77, "target_roll_max_step_3": 117, "target_yaw_min_step_4": -160, "target_yaw_max_step_4": 160, "target_pitch_min_step_4": 0, "target_pitch_max_step_4": 30, "target_roll_min_step_4": -160, "target_roll_max_step_4": 160 } if pose_results.pose_landmarks: landmarks = pose_results.pose_landmarks.landmark user_landmarks = [(lm.x * frame_width, lm.y * frame_height) for lm in landmarks] # Stage 1: Full-body visibility check if not state["visibility_confirmed"]: if is_full_body_visible(landmarks, frame_width, frame_height): state["visibility_confirmed"] = True instruction = "Full body visibility confirmed. Please adjust your head to match the position that your eye and nose points are fully inside the box and box should be green." cv2.putText(frame, "Visibility Confirmed!", (frame_width // 4, frame_height // 2 - 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA) else: instruction = "Please move back to ensure your full body is visible in the frame." cv2.putText(frame, "Please move back for full body visibility", (frame_width // 4 - 50, frame_height // 2), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 3, cv2.LINE_AA) state["match_start_time"] = None state["pose_held"] = False state["bppv_step_1"] = False state["bppv_step_2"] = False state["bppv_step_3"] = False state["bppv_step_4"] = False state["mission_complete"] = False state["step_3_complete"] = False state["all_missions_complete"] = False state["in_correct_pose_step_1"] = False state["in_correct_pose_step_2"] = False state["in_correct_pose_step_3"] = False state["in_correct_pose_step_4"] = False state["head_shake_count"] = 0 state["head_shake_complete"] = False state["yaw_direction"] = 0 # Stage 2: Head pose matching and calibration elif state["visibility_confirmed"] and not state["pose_held"]: head_pose_matched = is_head_pose_matched(user_landmarks, centered_target_keypoints) bbox_color = (0, 255, 0) if head_pose_matched else (0, 0, 255) cv2.rectangle(frame, (int(bbox_min_x), int(bbox_min_y)), (int(bbox_max_x), int(bbox_max_y)), bbox_color, 2) if head_pose_matched: if state["match_start_time"] is None: state["match_start_time"] = current_time instruction = "Hold your head in this position." else: elapsed_time = current_time - state["match_start_time"] if elapsed_time >= state["match_duration_threshold"]: state["pose_held"] = True state["bppv_step_1"] = True instruction = "Calibration complete. Now turn your head 45 degrees to the right and hold for 30 seconds." state["bppv_start_time"] = current_time else: remaining_time = max(0, state["match_duration_threshold"] - elapsed_time) instruction = f"Hold head pose for {remaining_time:.1f} seconds." cv2.putText(frame, f"Hold Head Pose for {remaining_time:.1f}s", (frame_width // 4, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) else: state["match_start_time"] = None instruction = "Adjust your head to make the box green for 5 seconds." cv2.putText(frame, "Adjust eye and nose in the centre of box", (frame_width // 4, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) # Stage 3: BPPV Step 1 - Turn head 45 degrees left and hold elif state["pose_held"] and state["bppv_step_1"] and not state["mission_complete"]: current_head_yaw, current_head_pitch, current_head_roll = get_head_angles(pose_results) display_yaw = wrap_angle_180(current_head_yaw) display_pitch = wrap_angle_180(current_head_pitch) display_roll = wrap_angle_180(current_head_roll) yaw_correct = state["target_yaw_min_step_1"] <= display_yaw <= state["target_yaw_max_step_1"] if yaw_correct: if not state["in_correct_pose_step_1"]: instruction = "Hold this position for 30 seconds." state["in_correct_pose_step_1"] = True if state["bppv_start_time"] is None: state["bppv_start_time"] = current_time state["bppv_pose_held_time"] = current_time - state["bppv_start_time"] remaining_time = max(0, state["bppv_duration_threshold"] - state["bppv_pose_held_time"]) cv2.putText(frame, f"Hold Head at this position for {remaining_time:.1f}s", (frame_width // 4, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) instruction = f"Hold your head at this position. {remaining_time:.1f} seconds remaining." if state["bppv_pose_held_time"] >= state["bppv_duration_threshold"]: state["mission_complete"] = True state["bppv_step_2"] = True instruction = "Step 1 complete. Now, slowly lie down on your left side, so that your right ear rests on the bed. Keep your head aligned—same position as before. Hold this pose for 30 seconds and stay relaxed." state["bppv_start_time"] = None state["bppv_pose_held_time"] = 0 state["in_correct_pose_step_1"] = False else: state["bppv_start_time"] = None state["in_correct_pose_step_1"] = False if display_yaw < state["target_yaw_min_step_1"]: instruction = "Turn your head further to the right." cv2.putText(frame, "Turn head further right", (frame_width // 4, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) elif display_yaw > state["target_yaw_max_step_1"]: instruction = "Turn your head back to the left." cv2.putText(frame, "Turn head back left", (frame_width // 4, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) cv2.putText(frame, f"Yaw: {int(display_yaw)}", (10, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) cv2.putText(frame, f"Pitch: {int(display_pitch)}", (10, 150), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) cv2.putText(frame, f"Roll: {int(display_roll)}", (10, 200), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) # Stage 4: BPPV Step 2 - Yaw 0°, Pitch 90°, Roll -100° elif state["mission_complete"] and state["bppv_step_2"] and not state["step_3_complete"]: current_head_yaw, current_head_pitch, current_head_roll = get_head_angles(pose_results) display_yaw = wrap_angle_180(current_head_yaw) display_pitch = wrap_angle_180(current_head_pitch) display_roll = wrap_angle_180(current_head_roll) yaw_correct = state["target_yaw_min_step_2"] <= display_yaw <= state["target_yaw_max_step_2"] pitch_correct = state["target_pitch_min_step_2"] <= display_pitch <= state["target_pitch_max_step_2"] roll_correct = state["target_roll_min_step_2"] <= display_roll <= state["target_roll_max_step_2"] pose_correct = yaw_correct and pitch_correct and roll_correct if pose_correct: if not state["in_correct_pose_step_2"]: instruction = "Hold this position for 30 seconds." state["in_correct_pose_step_2"] = True if state["bppv_start_time"] is None: state["bppv_start_time"] = current_time state["bppv_pose_held_time"] = current_time - state["bppv_start_time"] remaining_time = max(0, state["bppv_duration_threshold"] - state["bppv_pose_held_time"]) cv2.putText(frame, f"Hold Head at this position for {remaining_time:.1f}s", (frame_width // 4, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) instruction = f"Hold your head in this position. {remaining_time:.1f} seconds remaining." if state["bppv_pose_held_time"] >= state["bppv_duration_threshold"]: state["step_3_complete"] = True state["bppv_step_3"] = True instruction = "Step 2 complete. Stay with your head at the same angle, and roll your body to the right and hold for 30 seconds." state["bppv_start_time"] = None state["bppv_pose_held_time"] = 0 state["in_correct_pose_step_2"] = False else: state["bppv_start_time"] = None state["in_correct_pose_step_2"] = False error_messages = [] if not yaw_correct: if display_yaw < state["target_yaw_min_step_2"]: error_messages.append("Turn your head to the left.") elif display_yaw > state["target_yaw_max_step_2"]: error_messages.append("Turn your head to the right.") if not pitch_correct: if display_pitch < state["target_pitch_min_step_2"]: error_messages.append("Tilt your head further up.") elif display_pitch > state["target_pitch_max_step_2"]: error_messages.append("Tilt your head down.") if not roll_correct: if display_roll < state["target_roll_min_step_2"]: error_messages.append("Bend your head more to the left.") elif display_roll > state["target_roll_max_step_2"]: error_messages.append("Bend your head to the right.") instruction = " ".join(error_messages) if error_messages else "Adjust head to target pose." cv2.putText(frame, instruction, (frame_width // 4, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) cv2.putText(frame, f"Yaw: {int(display_yaw)}", (10, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) cv2.putText(frame, f"Pitch: {int(display_pitch)}", (10, 150), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) cv2.putText(frame, f"Roll: {int(display_roll)}", (10, 200), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) # Stage 5: BPPV Step 3 - Yaw 173°, Pitch 37°, Roll 97° elif state["step_3_complete"] and state["bppv_step_3"] and not state["bppv_step_4"]: current_head_yaw, current_head_pitch, current_head_roll = get_head_angles(pose_results) display_yaw = wrap_angle_180(current_head_yaw) display_pitch = wrap_angle_180(current_head_pitch) display_roll = wrap_angle_180(current_head_roll) yaw_correct = state["target_yaw_min_step_3"] <= display_yaw <= state["target_yaw_max_step_3"] pitch_correct = state["target_pitch_min_step_3"] <= display_pitch <= state["target_pitch_max_step_3"] roll_correct = state["target_roll_min_step_3"] <= display_roll <= state["target_roll_max_step_3"] pose_correct = yaw_correct and pitch_correct and roll_correct if pose_correct: if not state["in_correct_pose_step_3"]: instruction = "Hold this position for 30 seconds." state["in_correct_pose_step_3"] = True if state["bppv_start_time"] is None: state["bppv_start_time"] = current_time state["bppv_pose_held_time"] = current_time - state["bppv_start_time"] remaining_time = max(0, state["bppv_duration_threshold"] - state["bppv_pose_held_time"]) cv2.putText(frame, f"Hold Head at this position for {remaining_time:.1f}s", (frame_width // 4, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) instruction = f"Hold your head in this position. {remaining_time:.1f} seconds remaining." if state["bppv_pose_held_time"] >= state["bppv_duration_threshold"]: state["bppv_step_4"] = True instruction = "Step 3 complete. Now shake your head side to side 2 to 3 times, then sit on the opposite side of the bed in a neutral position." state["bppv_start_time"] = None state["bppv_pose_held_time"] = 0 state["in_correct_pose_step_3"] = False state["last_yaw"] = display_yaw else: state["bppv_start_time"] = None state["in_correct_pose_step_3"] = False error_messages = [] if not yaw_correct: if display_yaw < state["target_yaw_min_step_3"]: error_messages.append("Turn your head further to the left.") elif display_yaw > state["target_yaw_max_step_3"]: error_messages.append("Turn your head back to the right.") if not pitch_correct: if display_pitch < state["target_pitch_min_step_3"]: error_messages.append("Tilt your head further up.") elif display_pitch > state["target_pitch_max_step_3"]: error_messages.append("Tilt your head down.") if not roll_correct: if display_roll < state["target_roll_min_step_3"]: error_messages.append("Bend your head more to the right.") elif display_roll > state["target_roll_max_step_3"]: error_messages.append("Bend your head to the left.") instruction = " ".join(error_messages) if error_messages else "Adjust head to target pose." cv2.putText(frame, instruction, (frame_width // 4, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) cv2.putText(frame, f"Yaw: {int(display_yaw)}", (10, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) cv2.putText(frame, f"Pitch: {int(display_pitch)}", (10, 150), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) cv2.putText(frame, f"Roll: {int(display_roll)}", (10, 200), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) # Stage 6: BPPV Step 4 - Shake head 2-3 times and sit in neutral position elif state["bppv_step_4"] and not state["all_missions_complete"]: current_head_yaw, current_head_pitch, current_head_roll = get_head_angles(pose_results) display_yaw = wrap_angle_180(current_head_yaw) display_pitch = wrap_angle_180(current_head_pitch) display_roll = wrap_angle_180(current_head_roll) if not state["head_shake_complete"]: yaw_change = display_yaw - state["last_yaw"] if yaw_change > state["yaw_threshold"] and state["yaw_direction"] != 1: state["yaw_direction"] = 1 state["head_shake_count"] += 0.5 elif yaw_change < -state["yaw_threshold"] and state["yaw_direction"] != -1: state["yaw_direction"] = -1 state["head_shake_count"] += 0.5 state["last_yaw"] = display_yaw if state["head_shake_count"] < 2: instruction = f"Shake head side to side ({int(state['head_shake_count']*2)}/2-3 shakes)" cv2.putText(frame, instruction, (frame_width // 4, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) else: state["head_shake_complete"] = True instruction = "Now sit on the opposite side of the bed in a neutral position." state["bppv_start_time"] = None state["in_correct_pose_step_4"] = False else: yaw_correct = (display_yaw < state["target_yaw_min_step_4"]) or (display_yaw > state["target_yaw_max_step_4"]) pitch_correct = state["target_pitch_min_step_4"] <= display_pitch <= state["target_pitch_max_step_4"] roll_correct = (display_roll < state["target_roll_min_step_4"]) or (display_roll > state["target_roll_max_step_4"]) pose_correct = yaw_correct and pitch_correct and roll_correct if pose_correct: if not state["in_correct_pose_step_4"]: instruction = "Hold this neutral position for 5 seconds." state["in_correct_pose_step_4"] = True if state["bppv_start_time"] is None: state["bppv_start_time"] = current_time state["bppv_pose_held_time"] = current_time - state["bppv_start_time"] remaining_time = max(0, state["neutral_hold_threshold"] - state["bppv_pose_held_time"]) cv2.putText(frame, f"Hold Neutral Position for {remaining_time:.1f}s", (frame_width // 4, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) instruction = f"Hold this neutral position. {remaining_time:.1f} seconds remaining." if state["bppv_pose_held_time"] >= state["neutral_hold_threshold"]: state["all_missions_complete"] = True instruction = "You have successfully completed the maneuver." else: state["bppv_start_time"] = None state["in_correct_pose_step_4"] = False error_messages = [] if not yaw_correct: if display_yaw >= 0: error_messages.append("Turn your head further right.") else: error_messages.append("Turn your head further left.") if not pitch_correct: if display_pitch < state["target_pitch_min_step_4"]: error_messages.append("Tilt your head further up.") elif display_pitch > state["target_pitch_max_step_4"]: error_messages.append("Tilt your head down.") if not roll_correct: if display_roll < state["target_roll_min_step_4"]: error_messages.append("Bend your head to the right.") elif display_roll > state["target_roll_max_step_4"]: error_messages.append("Bend your head to the left.") instruction = " ".join(error_messages) if error_messages else "Adjust to neutral position." cv2.putText(frame, instruction, (frame_width // 4, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) cv2.putText(frame, f"Yaw: {int(display_yaw)}", (10, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) cv2.putText(frame, f"Pitch: {int(display_pitch)}", (10, 150), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) cv2.putText(frame, f"Roll: {int(display_roll)}", (10, 200), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) # Stage 7: All Missions Complete elif state["all_missions_complete"]: instruction = "Epley Maneuver Guider Complete!" cv2.putText(frame, instruction, (frame_width // 4, frame_height // 2), cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 255, 0), 3, cv2.LINE_AA) current_head_yaw, current_head_pitch, current_head_roll = get_head_angles(pose_results) display_yaw = wrap_angle_180(current_head_yaw) display_pitch = wrap_angle_180(current_head_pitch) display_roll = wrap_angle_180(current_head_roll) cv2.putText(frame, f"Yaw: {int(display_yaw)}", (10, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) cv2.putText(frame, f"Pitch: {int(display_pitch)}", (10, 150), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) cv2.putText(frame, f"Roll: {int(display_roll)}", (10, 200), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) mp_drawing.draw_landmarks(frame, pose_results.pose_landmarks, mp_pose.POSE_CONNECTIONS, landmark_drawing_spec=mp_drawing.DrawingSpec(color=(255, 0, 0), thickness=2, circle_radius=4), connection_drawing_spec=mp_drawing.DrawingSpec(color=(255, 0, 0), thickness=2)) return frame, instruction, state def webcam_stream(frame, state): """ Process webcam frames in real-time for Gradio streaming. """ if frame is None: return None, "No frame received.", state frame, instruction, state = process_frame(frame, state) return frame, instruction, state # Gradio interface with gr.Blocks() as demo: gr.Markdown("# AI-Based BPPV Maneuver Guider") gr.Markdown("Use your webcam to guide you through the Epley Maneuver for BPPV treatment. Follow the on-screen instructions. Note: Webcam streaming works best when running locally.") with gr.Row(): with gr.Column(): webcam_input = gr.Video(label="Webcam Feed", source="webcam", live=True) reset_button = gr.Button("Reset State") with gr.Column(): output_image = gr.Image(label="Processed Frame", streaming=True) instruction_output = gr.Textbox(label="Instructions") state = gr.State() # Stream webcam frames webcam_input.stream( fn=webcam_stream, inputs=[webcam_input, state], outputs=[output_image, instruction_output, state], _js="""(inputs) => { // Ensure webcam is accessed return inputs; }""" ) # Reset state button reset_button.click( fn=lambda: None, inputs=None, outputs=state ) if __name__ == "__main__": demo.launch()