BPPV / app.py
samir1120's picture
Update app.py
da0cdbf verified
import gradio as gr
import cv2
import mediapipe as mp
import numpy as np
import logging
import time
from filterpy.kalman import KalmanFilter
import os
# Suppress MediaPipe warnings
logging.getLogger('mediapipe').setLevel(logging.ERROR)
# Initialize MediaPipe Pose
mp_pose = mp.solutions.pose
mp_drawing = mp.solutions.drawing_utils
pose = mp_pose.Pose(
static_image_mode=False,
min_detection_confidence=0.5,
min_tracking_confidence=0.6,
model_complexity=2,
smooth_landmarks=True
)
# Define scaling factor for angles
ANGLE_SCALE = 1
# Initialize Kalman Filter for smoothing angles
def initialize_kalman_filter():
kf = KalmanFilter(dim_x=6, dim_z=3)
kf.x = np.zeros(6)
kf.F = np.array([
[1, 0, 0, 1, 0, 0],
[0, 1, 0, 0, 1, 0],
[0, 0, 1, 0, 0, 1],
[0, 0, 0, 1, 0, 0],
[0, 0, 0, 0, 1, 0],
[0, 0, 0, 0, 0, 1]
])
kf.H = np.array([
[1, 0, 0, 0, 0, 0],
[0, 1, 0, 0, 0, 0],
[0, 0, 1, 0, 0, 0]
])
kf.P *= 10.
kf.R = np.diag([1.0, 1.0, 1.0])
kf.Q = np.eye(6) * 0.05
return kf
kf = initialize_kalman_filter()
# Load target pose (same as original)
target_pose = [
{
"person_id": 0,
"bbox": [
260.447998046875,
434.9598693847656,
263.357177734375,
439.172119140625
],
"keypoints": [
{"name": "Nose", "x": 240.35791015625, "y": 135.41705322265625, "score": 0.9791688919067383},
{"name": "L_Eye", "x": 265.16717529296875, "y": 110.43780517578125, "score": 0.9833072428857386},
{"name": "R_Eye", "x": 210.517822265625, "y": 114.45855712890625, "score": 0.9687361121177673},
{"name": "L_Ear", "x": 301.84814453125, "y": 135.83111572265625, "score": 0.9493670302238464},
{"name": "R_Ear", "x": 175.035888671875, "y": 143.1534423828125, "score": 0.9537781476974487},
{"name": "L_Shoulder", "x": 367.36688232421875, "y": 277.89508056640625, "score": 0.9714463949203491},
{"name": "R_Shoulder", "x": 132.6015625, "y": 287.1273193359375, "score": 0.9208009243011475},
{"name": "L_Elbow", "x": 404.8804931640625, "y": 457.8016357421875, "score": 1.0068358182907104},
{"name": "R_Elbow", "x": 121.6767578125, "y": 466.985595703125, "score": 0.9445005059242249},
{"name": "L_Wrist", "x": 316.5948486328125, "y": 564.1590576171875, "score": 0.9202994108200073},
{"name": "R_Wrist", "x": 218.354248046875, "y": 578.4954833984375, "score": 0.9106894731521606},
{"name": "L_Hip", "x": 343.258056640625, "y": 562.5377197265625, "score": 0.8454821705818176},
{"name": "R_Hip", "x": 191.992431640625, "y": 569.1612548828125, "score": 0.856957733631134},
{"name": "L_Knee", "x": 394.12591552734375, "y": 672.401611328125, "score": 0.8698152899742126},
{"name": "R_Knee", "x": 143.781005859375, "y": 696.0062255859375, "score": 0.8501293659210205},
{"name": "L_Ankle", "x": 353.07330322265625, "y": 853.671142578125, "score": 0.9136713147163391},
{"name": "R_Ankle", "x": 211.80206298828125, "y": 850.3348388671875, "score": 0.8354711532592773}
]
}
]
# Extract and center target keypoints
frame_width = 1280
frame_height = 720
target_keypoints = [(kp["x"], kp["y"]) for kp in target_pose[0]["keypoints"]]
head_keypoint_indices = [0, 1, 2, 3, 4]
head_keypoints = [target_keypoints[i] for i in head_keypoint_indices]
target_head_center_x = sum(x for x, y in head_keypoints) / len(head_keypoints)
target_head_center_y = sum(y for x, y in head_keypoints) / len(head_keypoints)
display_center_x = frame_width / 2
display_center_y = frame_height * 0.2
translate_x = display_center_x - target_head_center_x
translate_y = display_center_y - target_head_center_y
centered_target_keypoints = [(x + translate_x, y + translate_y) for x, y in target_keypoints]
head_keypoints_centered = [centered_target_keypoints[i] for i in head_keypoint_indices]
x_coords = [x for x, y in head_keypoints_centered]
y_coords = [y for x, y in head_keypoints_centered]
bbox_min_x = max(0, min(x_coords) - 20)
bbox_max_x = min(frame_width, max(x_coords) + 20)
bbox_min_y = max(0, min(y_coords) - 20)
bbox_max_y = min(frame_height, max(y_coords) + 20)
# Helper functions
def euclidean_distance(p1, p2):
return np.sqrt((p1[0] - p2[0])**2 + (p1[1] - p2[1])**2)
def is_head_pose_matched(user_landmarks, target_keypoints, distance_threshold=25):
head_indices_mapping = {0: 0, 2: 1, 5: 2, 7: 3, 8: 4}
for mp_idx, target_idx in head_indices_mapping.items():
if mp_idx < len(user_landmarks) and target_idx < len(target_keypoints):
distance = euclidean_distance(user_landmarks[mp_idx], target_keypoints[target_idx])
if distance > distance_threshold:
return False
return True
def is_full_body_visible(landmarks, frame_width, frame_height):
key_landmarks = [
mp_pose.PoseLandmark.LEFT_SHOULDER,
mp_pose.PoseLandmark.RIGHT_SHOULDER,
mp_pose.PoseLandmark.LEFT_HIP,
mp_pose.PoseLandmark.RIGHT_HIP,
]
for landmark in key_landmarks:
lm = landmarks[landmark]
if (lm.visibility < 0.6 or
lm.x < 0.05 or lm.x > 0.95 or
lm.y < 0.05 or lm.y > 0.95):
return False
return True
def _calculate_raw_head_angles_user_method(landmark_list):
required_indices = [mp_pose.PoseLandmark.NOSE, mp_pose.PoseLandmark.LEFT_EAR, mp_pose.PoseLandmark.RIGHT_EAR,
mp_pose.PoseLandmark.LEFT_EYE, mp_pose.PoseLandmark.RIGHT_EYE]
if landmark_list is None or len(landmark_list) <= max(idx.value for idx in required_indices):
return None
for l_idx_enum in required_indices:
if landmark_list[l_idx_enum.value].visibility < 0.5:
return None
nose = landmark_list[mp_pose.PoseLandmark.NOSE.value]
left_ear = landmark_list[mp_pose.PoseLandmark.LEFT_EAR.value]
right_ear = landmark_list[mp_pose.PoseLandmark.RIGHT_EAR.value]
left_eye = landmark_list[mp_pose.PoseLandmark.LEFT_EYE.value]
right_eye = landmark_list[mp_pose.PoseLandmark.RIGHT_EYE.value]
mid_ear = np.array([(left_ear.x + right_ear.x) / 2,
(left_ear.y + right_ear.y) / 2,
(left_ear.z + right_ear.z) / 2])
nose_vec = mid_ear - np.array([nose.x, nose.y, nose.z])
yaw = -np.degrees(np.arctan2(nose_vec[0], nose_vec[2] + 1e-6))
eye_mid = np.array([(left_eye.x + right_eye.x) / 2,
(left_eye.y + right_eye.y) / 2,
(left_eye.z + right_eye.z) / 2])
nose_to_eye = np.array([nose.x, nose.y, nose.z]) - eye_mid
pitch = np.degrees(np.arctan2(nose_to_eye[1], np.sqrt(nose_to_eye[0]**2 + nose_to_eye[2]**2 + 1e-6)))
ear_vec_2d = np.array([left_ear.x - right_ear.x, left_ear.y - right_ear.y])
roll = np.degrees(np.arctan2(ear_vec_2d[1], ear_vec_2d[0] + 1e-6))
return yaw, -(pitch - 50), roll
def get_head_angles(pose_results):
raw_yaw, raw_pitch, raw_roll = 0.0, 0.0, 0.0
if pose_results and pose_results.pose_landmarks:
try:
angles = _calculate_raw_head_angles_user_method(
pose_results.pose_landmarks.landmark
)
if angles is not None:
raw_yaw, raw_pitch, raw_roll = angles
except Exception as e:
logging.error(f"Error in get_head_angles: {e}")
kf.predict()
kf.update(np.array([raw_yaw, raw_pitch, raw_roll]))
smoothed_yaw, smoothed_pitch, smoothed_roll = kf.x[:3]
return smoothed_yaw * ANGLE_SCALE * 3, smoothed_pitch * ANGLE_SCALE, smoothed_roll * ANGLE_SCALE
def wrap_angle_180(angle):
wrapped_angle = np.fmod(angle + 180, 360)
if wrapped_angle < 0:
wrapped_angle += 360
return wrapped_angle - 180
def process_frame(frame, state):
frame = cv2.resize(frame, (frame_width, frame_height))
image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
pose_results = pose.process(image_rgb)
frame = cv2.cvtColor(image_rgb, cv2.COLOR_RGB2BGR)
current_time = time.time()
instruction = ""
current_head_yaw, current_head_pitch, current_head_roll = 0, 0, 0
# Initialize or update state
if state is None:
state = {
"visibility_confirmed": False,
"pose_held": False,
"bppv_step_1": False,
"bppv_step_2": False,
"bppv_step_3": False,
"bppv_step_4": False,
"mission_complete": False,
"step_3_complete": False,
"all_missions_complete": False,
"match_start_time": None,
"bppv_start_time": None,
"bppv_pose_held_time": 0,
"last_speech_time": current_time,
"speech_interval": 3,
"in_correct_pose_step_1": False,
"in_correct_pose_step_2": False,
"in_correct_pose_step_3": False,
"in_correct_pose_step_4": False,
"head_shake_count": 0,
"head_shake_complete": False,
"last_yaw": 0,
"yaw_direction": 0,
"yaw_threshold": 15,
"match_duration_threshold": 5,
"bppv_duration_threshold": 30,
"neutral_hold_threshold": 5,
"target_yaw_min_step_1": 25,
"target_yaw_max_step_1": 65,
"target_yaw_min_step_2": -20,
"target_yaw_max_step_2": 20,
"target_pitch_min_step_2": 70,
"target_pitch_max_step_2": 110,
"target_roll_min_step_2": -120,
"target_roll_max_step_2": -80,
"target_yaw_min_step_3": 153,
"target_yaw_max_step_3": 193,
"target_pitch_min_step_3": 17,
"target_pitch_max_step_3": 57,
"target_roll_min_step_3": 77,
"target_roll_max_step_3": 117,
"target_yaw_min_step_4": -160,
"target_yaw_max_step_4": 160,
"target_pitch_min_step_4": 0,
"target_pitch_max_step_4": 30,
"target_roll_min_step_4": -160,
"target_roll_max_step_4": 160
}
if pose_results.pose_landmarks:
landmarks = pose_results.pose_landmarks.landmark
user_landmarks = [(lm.x * frame_width, lm.y * frame_height) for lm in landmarks]
# Stage 1: Full-body visibility check
if not state["visibility_confirmed"]:
if is_full_body_visible(landmarks, frame_width, frame_height):
state["visibility_confirmed"] = True
instruction = "Full body visibility confirmed. Please adjust your head to match the position that your eye and nose points are fully inside the box and box should be green."
cv2.putText(frame, "Visibility Confirmed!", (frame_width // 4, frame_height // 2 - 50),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)
else:
instruction = "Please move back to ensure your full body is visible in the frame."
cv2.putText(frame, "Please move back for full body visibility", (frame_width // 4 - 50, frame_height // 2),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 3, cv2.LINE_AA)
state["match_start_time"] = None
state["pose_held"] = False
state["bppv_step_1"] = False
state["bppv_step_2"] = False
state["bppv_step_3"] = False
state["bppv_step_4"] = False
state["mission_complete"] = False
state["step_3_complete"] = False
state["all_missions_complete"] = False
state["in_correct_pose_step_1"] = False
state["in_correct_pose_step_2"] = False
state["in_correct_pose_step_3"] = False
state["in_correct_pose_step_4"] = False
state["head_shake_count"] = 0
state["head_shake_complete"] = False
state["yaw_direction"] = 0
# Stage 2: Head pose matching and calibration
elif state["visibility_confirmed"] and not state["pose_held"]:
head_pose_matched = is_head_pose_matched(user_landmarks, centered_target_keypoints)
bbox_color = (0, 255, 0) if head_pose_matched else (0, 0, 255)
cv2.rectangle(frame, (int(bbox_min_x), int(bbox_min_y)), (int(bbox_max_x), int(bbox_max_y)),
bbox_color, 2)
if head_pose_matched:
if state["match_start_time"] is None:
state["match_start_time"] = current_time
instruction = "Hold your head in this position."
else:
elapsed_time = current_time - state["match_start_time"]
if elapsed_time >= state["match_duration_threshold"]:
state["pose_held"] = True
state["bppv_step_1"] = True
instruction = "Calibration complete. Now turn your head 45 degrees to the right and hold for 30 seconds."
state["bppv_start_time"] = current_time
else:
remaining_time = max(0, state["match_duration_threshold"] - elapsed_time)
instruction = f"Hold head pose for {remaining_time:.1f} seconds."
cv2.putText(frame, f"Hold Head Pose for {remaining_time:.1f}s",
(frame_width // 4, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
else:
state["match_start_time"] = None
instruction = "Adjust your head to make the box green for 5 seconds."
cv2.putText(frame, "Adjust eye and nose in the centre of box", (frame_width // 4, 50),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
# Stage 3: BPPV Step 1 - Turn head 45 degrees left and hold
elif state["pose_held"] and state["bppv_step_1"] and not state["mission_complete"]:
current_head_yaw, current_head_pitch, current_head_roll = get_head_angles(pose_results)
display_yaw = wrap_angle_180(current_head_yaw)
display_pitch = wrap_angle_180(current_head_pitch)
display_roll = wrap_angle_180(current_head_roll)
yaw_correct = state["target_yaw_min_step_1"] <= display_yaw <= state["target_yaw_max_step_1"]
if yaw_correct:
if not state["in_correct_pose_step_1"]:
instruction = "Hold this position for 30 seconds."
state["in_correct_pose_step_1"] = True
if state["bppv_start_time"] is None:
state["bppv_start_time"] = current_time
state["bppv_pose_held_time"] = current_time - state["bppv_start_time"]
remaining_time = max(0, state["bppv_duration_threshold"] - state["bppv_pose_held_time"])
cv2.putText(frame, f"Hold Head at this position for {remaining_time:.1f}s",
(frame_width // 4, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
instruction = f"Hold your head at this position. {remaining_time:.1f} seconds remaining."
if state["bppv_pose_held_time"] >= state["bppv_duration_threshold"]:
state["mission_complete"] = True
state["bppv_step_2"] = True
instruction = "Step 1 complete. Now, slowly lie down on your left side, so that your right ear rests on the bed. Keep your head aligned—same position as before. Hold this pose for 30 seconds and stay relaxed."
state["bppv_start_time"] = None
state["bppv_pose_held_time"] = 0
state["in_correct_pose_step_1"] = False
else:
state["bppv_start_time"] = None
state["in_correct_pose_step_1"] = False
if display_yaw < state["target_yaw_min_step_1"]:
instruction = "Turn your head further to the right."
cv2.putText(frame, "Turn head further right", (frame_width // 4, 50),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
elif display_yaw > state["target_yaw_max_step_1"]:
instruction = "Turn your head back to the left."
cv2.putText(frame, "Turn head back left", (frame_width // 4, 50),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
cv2.putText(frame, f"Yaw: {int(display_yaw)}", (10, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2)
cv2.putText(frame, f"Pitch: {int(display_pitch)}", (10, 150), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2)
cv2.putText(frame, f"Roll: {int(display_roll)}", (10, 200), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2)
# Stage 4: BPPV Step 2 - Yaw 0°, Pitch 90°, Roll -100°
elif state["mission_complete"] and state["bppv_step_2"] and not state["step_3_complete"]:
current_head_yaw, current_head_pitch, current_head_roll = get_head_angles(pose_results)
display_yaw = wrap_angle_180(current_head_yaw)
display_pitch = wrap_angle_180(current_head_pitch)
display_roll = wrap_angle_180(current_head_roll)
yaw_correct = state["target_yaw_min_step_2"] <= display_yaw <= state["target_yaw_max_step_2"]
pitch_correct = state["target_pitch_min_step_2"] <= display_pitch <= state["target_pitch_max_step_2"]
roll_correct = state["target_roll_min_step_2"] <= display_roll <= state["target_roll_max_step_2"]
pose_correct = yaw_correct and pitch_correct and roll_correct
if pose_correct:
if not state["in_correct_pose_step_2"]:
instruction = "Hold this position for 30 seconds."
state["in_correct_pose_step_2"] = True
if state["bppv_start_time"] is None:
state["bppv_start_time"] = current_time
state["bppv_pose_held_time"] = current_time - state["bppv_start_time"]
remaining_time = max(0, state["bppv_duration_threshold"] - state["bppv_pose_held_time"])
cv2.putText(frame, f"Hold Head at this position for {remaining_time:.1f}s",
(frame_width // 4, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
instruction = f"Hold your head in this position. {remaining_time:.1f} seconds remaining."
if state["bppv_pose_held_time"] >= state["bppv_duration_threshold"]:
state["step_3_complete"] = True
state["bppv_step_3"] = True
instruction = "Step 2 complete. Stay with your head at the same angle, and roll your body to the right and hold for 30 seconds."
state["bppv_start_time"] = None
state["bppv_pose_held_time"] = 0
state["in_correct_pose_step_2"] = False
else:
state["bppv_start_time"] = None
state["in_correct_pose_step_2"] = False
error_messages = []
if not yaw_correct:
if display_yaw < state["target_yaw_min_step_2"]:
error_messages.append("Turn your head to the left.")
elif display_yaw > state["target_yaw_max_step_2"]:
error_messages.append("Turn your head to the right.")
if not pitch_correct:
if display_pitch < state["target_pitch_min_step_2"]:
error_messages.append("Tilt your head further up.")
elif display_pitch > state["target_pitch_max_step_2"]:
error_messages.append("Tilt your head down.")
if not roll_correct:
if display_roll < state["target_roll_min_step_2"]:
error_messages.append("Bend your head more to the left.")
elif display_roll > state["target_roll_max_step_2"]:
error_messages.append("Bend your head to the right.")
instruction = " ".join(error_messages) if error_messages else "Adjust head to target pose."
cv2.putText(frame, instruction, (frame_width // 4, 50),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
cv2.putText(frame, f"Yaw: {int(display_yaw)}", (10, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2)
cv2.putText(frame, f"Pitch: {int(display_pitch)}", (10, 150), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2)
cv2.putText(frame, f"Roll: {int(display_roll)}", (10, 200), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2)
# Stage 5: BPPV Step 3 - Yaw 173°, Pitch 37°, Roll 97°
elif state["step_3_complete"] and state["bppv_step_3"] and not state["bppv_step_4"]:
current_head_yaw, current_head_pitch, current_head_roll = get_head_angles(pose_results)
display_yaw = wrap_angle_180(current_head_yaw)
display_pitch = wrap_angle_180(current_head_pitch)
display_roll = wrap_angle_180(current_head_roll)
yaw_correct = state["target_yaw_min_step_3"] <= display_yaw <= state["target_yaw_max_step_3"]
pitch_correct = state["target_pitch_min_step_3"] <= display_pitch <= state["target_pitch_max_step_3"]
roll_correct = state["target_roll_min_step_3"] <= display_roll <= state["target_roll_max_step_3"]
pose_correct = yaw_correct and pitch_correct and roll_correct
if pose_correct:
if not state["in_correct_pose_step_3"]:
instruction = "Hold this position for 30 seconds."
state["in_correct_pose_step_3"] = True
if state["bppv_start_time"] is None:
state["bppv_start_time"] = current_time
state["bppv_pose_held_time"] = current_time - state["bppv_start_time"]
remaining_time = max(0, state["bppv_duration_threshold"] - state["bppv_pose_held_time"])
cv2.putText(frame, f"Hold Head at this position for {remaining_time:.1f}s",
(frame_width // 4, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
instruction = f"Hold your head in this position. {remaining_time:.1f} seconds remaining."
if state["bppv_pose_held_time"] >= state["bppv_duration_threshold"]:
state["bppv_step_4"] = True
instruction = "Step 3 complete. Now shake your head side to side 2 to 3 times, then sit on the opposite side of the bed in a neutral position."
state["bppv_start_time"] = None
state["bppv_pose_held_time"] = 0
state["in_correct_pose_step_3"] = False
state["last_yaw"] = display_yaw
else:
state["bppv_start_time"] = None
state["in_correct_pose_step_3"] = False
error_messages = []
if not yaw_correct:
if display_yaw < state["target_yaw_min_step_3"]:
error_messages.append("Turn your head further to the left.")
elif display_yaw > state["target_yaw_max_step_3"]:
error_messages.append("Turn your head back to the right.")
if not pitch_correct:
if display_pitch < state["target_pitch_min_step_3"]:
error_messages.append("Tilt your head further up.")
elif display_pitch > state["target_pitch_max_step_3"]:
error_messages.append("Tilt your head down.")
if not roll_correct:
if display_roll < state["target_roll_min_step_3"]:
error_messages.append("Bend your head more to the right.")
elif display_roll > state["target_roll_max_step_3"]:
error_messages.append("Bend your head to the left.")
instruction = " ".join(error_messages) if error_messages else "Adjust head to target pose."
cv2.putText(frame, instruction, (frame_width // 4, 50),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
cv2.putText(frame, f"Yaw: {int(display_yaw)}", (10, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2)
cv2.putText(frame, f"Pitch: {int(display_pitch)}", (10, 150), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2)
cv2.putText(frame, f"Roll: {int(display_roll)}", (10, 200), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2)
# Stage 6: BPPV Step 4 - Shake head 2-3 times and sit in neutral position
elif state["bppv_step_4"] and not state["all_missions_complete"]:
current_head_yaw, current_head_pitch, current_head_roll = get_head_angles(pose_results)
display_yaw = wrap_angle_180(current_head_yaw)
display_pitch = wrap_angle_180(current_head_pitch)
display_roll = wrap_angle_180(current_head_roll)
if not state["head_shake_complete"]:
yaw_change = display_yaw - state["last_yaw"]
if yaw_change > state["yaw_threshold"] and state["yaw_direction"] != 1:
state["yaw_direction"] = 1
state["head_shake_count"] += 0.5
elif yaw_change < -state["yaw_threshold"] and state["yaw_direction"] != -1:
state["yaw_direction"] = -1
state["head_shake_count"] += 0.5
state["last_yaw"] = display_yaw
if state["head_shake_count"] < 2:
instruction = f"Shake head side to side ({int(state['head_shake_count']*2)}/2-3 shakes)"
cv2.putText(frame, instruction, (frame_width // 4, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
else:
state["head_shake_complete"] = True
instruction = "Now sit on the opposite side of the bed in a neutral position."
state["bppv_start_time"] = None
state["in_correct_pose_step_4"] = False
else:
yaw_correct = (display_yaw < state["target_yaw_min_step_4"]) or (display_yaw > state["target_yaw_max_step_4"])
pitch_correct = state["target_pitch_min_step_4"] <= display_pitch <= state["target_pitch_max_step_4"]
roll_correct = (display_roll < state["target_roll_min_step_4"]) or (display_roll > state["target_roll_max_step_4"])
pose_correct = yaw_correct and pitch_correct and roll_correct
if pose_correct:
if not state["in_correct_pose_step_4"]:
instruction = "Hold this neutral position for 5 seconds."
state["in_correct_pose_step_4"] = True
if state["bppv_start_time"] is None:
state["bppv_start_time"] = current_time
state["bppv_pose_held_time"] = current_time - state["bppv_start_time"]
remaining_time = max(0, state["neutral_hold_threshold"] - state["bppv_pose_held_time"])
cv2.putText(frame, f"Hold Neutral Position for {remaining_time:.1f}s",
(frame_width // 4, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
instruction = f"Hold this neutral position. {remaining_time:.1f} seconds remaining."
if state["bppv_pose_held_time"] >= state["neutral_hold_threshold"]:
state["all_missions_complete"] = True
instruction = "You have successfully completed the maneuver."
else:
state["bppv_start_time"] = None
state["in_correct_pose_step_4"] = False
error_messages = []
if not yaw_correct:
if display_yaw >= 0:
error_messages.append("Turn your head further right.")
else:
error_messages.append("Turn your head further left.")
if not pitch_correct:
if display_pitch < state["target_pitch_min_step_4"]:
error_messages.append("Tilt your head further up.")
elif display_pitch > state["target_pitch_max_step_4"]:
error_messages.append("Tilt your head down.")
if not roll_correct:
if display_roll < state["target_roll_min_step_4"]:
error_messages.append("Bend your head to the right.")
elif display_roll > state["target_roll_max_step_4"]:
error_messages.append("Bend your head to the left.")
instruction = " ".join(error_messages) if error_messages else "Adjust to neutral position."
cv2.putText(frame, instruction, (frame_width // 4, 50),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
cv2.putText(frame, f"Yaw: {int(display_yaw)}", (10, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2)
cv2.putText(frame, f"Pitch: {int(display_pitch)}", (10, 150), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2)
cv2.putText(frame, f"Roll: {int(display_roll)}", (10, 200), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2)
# Stage 7: All Missions Complete
elif state["all_missions_complete"]:
instruction = "Epley Maneuver Guider Complete!"
cv2.putText(frame, instruction, (frame_width // 4, frame_height // 2),
cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 255, 0), 3, cv2.LINE_AA)
current_head_yaw, current_head_pitch, current_head_roll = get_head_angles(pose_results)
display_yaw = wrap_angle_180(current_head_yaw)
display_pitch = wrap_angle_180(current_head_pitch)
display_roll = wrap_angle_180(current_head_roll)
cv2.putText(frame, f"Yaw: {int(display_yaw)}", (10, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2)
cv2.putText(frame, f"Pitch: {int(display_pitch)}", (10, 150), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2)
cv2.putText(frame, f"Roll: {int(display_roll)}", (10, 200), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2)
mp_drawing.draw_landmarks(frame, pose_results.pose_landmarks, mp_pose.POSE_CONNECTIONS,
landmark_drawing_spec=mp_drawing.DrawingSpec(color=(255, 0, 0), thickness=2, circle_radius=4),
connection_drawing_spec=mp_drawing.DrawingSpec(color=(255, 0, 0), thickness=2))
return frame, instruction, state
def webcam_stream(frame, state):
"""
Process webcam frames in real-time for Gradio streaming.
"""
if frame is None:
return None, "No frame received.", state
frame, instruction, state = process_frame(frame, state)
return frame, instruction, state
# Gradio interface
with gr.Blocks() as demo:
gr.Markdown("# AI-Based BPPV Maneuver Guider")
gr.Markdown("Use your webcam to guide you through the Epley Maneuver for BPPV treatment. Follow the on-screen instructions. Note: Webcam streaming works best when running locally.")
with gr.Row():
with gr.Column():
webcam_input = gr.Video(label="Webcam Feed", source="webcam", live=True)
reset_button = gr.Button("Reset State")
with gr.Column():
output_image = gr.Image(label="Processed Frame", streaming=True)
instruction_output = gr.Textbox(label="Instructions")
state = gr.State()
# Stream webcam frames
webcam_input.stream(
fn=webcam_stream,
inputs=[webcam_input, state],
outputs=[output_image, instruction_output, state],
_js="""(inputs) => {
// Ensure webcam is accessed
return inputs;
}"""
)
# Reset state button
reset_button.click(
fn=lambda: None,
inputs=None,
outputs=state
)
if __name__ == "__main__":
demo.launch()