|
|
import os |
|
|
import cv2 |
|
|
import mediapipe as mp |
|
|
import numpy as np |
|
|
import logging |
|
|
import time |
|
|
from filterpy.kalman import KalmanFilter |
|
|
import pyttsx3 |
|
|
import threading |
|
|
import streamlit as st |
|
|
from streamlit_webrtc import webrtc_streamer, VideoProcessorBase, RTCConfiguration |
|
|
|
|
|
|
|
|
logging.getLogger('mediapipe').setLevel(logging.ERROR) |
|
|
|
|
|
|
|
|
mp_pose = mp.solutions.pose |
|
|
mp_drawing = mp.solutions.drawing_utils |
|
|
|
|
|
|
|
|
os.makedirs("/app/mediapipe_models", exist_ok=True) |
|
|
|
|
|
pose = mp_pose.Pose( |
|
|
static_image_mode=False, |
|
|
min_detection_confidence=0.5, |
|
|
min_tracking_confidence=0.6, |
|
|
model_complexity=2, |
|
|
smooth_landmarks=True |
|
|
) |
|
|
|
|
|
|
|
|
engine = pyttsx3.init() |
|
|
engine.setProperty('rate', 150) |
|
|
engine.setProperty('volume', 0.9) |
|
|
|
|
|
|
|
|
def speak(text, force=False): |
|
|
if not hasattr(speak, 'last_text') or speak.last_text != text or force: |
|
|
speak.last_text = text |
|
|
st.write(f"Instruction: {text}") |
|
|
try: |
|
|
def run_speech(): |
|
|
engine.say(text) |
|
|
engine.runAndWait() |
|
|
threading.Thread(target=run_speech, daemon=True).start() |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ANGLE_SCALE = 1 |
|
|
|
|
|
|
|
|
def initialize_kalman_filter(): |
|
|
kf = KalmanFilter(dim_x=6, dim_z=3) |
|
|
kf.x = np.zeros(6) |
|
|
kf.F = np.array([ |
|
|
[1, 0, 0, 1, 0, 0], |
|
|
[0, 1, 0, 0, 1, 0], |
|
|
[0, 0, 1, 0, 0, 1], |
|
|
[0, 0, 0, 1, 0, 0], |
|
|
[0, 0, 0, 0, 1, 0], |
|
|
[0, 0, 0, 0, 0, 1] |
|
|
]) |
|
|
kf.H = np.array([ |
|
|
[1, 0, 0, 0, 0, 0], |
|
|
[0, 1, 0, 0, 0, 0], |
|
|
[0, 0, 1, 0, 0, 0] |
|
|
]) |
|
|
kf.P *= 10. |
|
|
kf.R = np.diag([1.0, 1.0, 1.0]) |
|
|
kf.Q = np.eye(6) * 0.05 |
|
|
return kf |
|
|
|
|
|
kf = initialize_kalman_filter() |
|
|
|
|
|
|
|
|
target_pose = [ |
|
|
{ |
|
|
"person_id": 0, |
|
|
"bbox": [ |
|
|
260.447998046875, |
|
|
434.9598693847656, |
|
|
263.357177734375, |
|
|
439.172119140625 |
|
|
], |
|
|
"keypoints": [ |
|
|
{"name": "Nose", "x": 240.35791015625, "y": 135.41705322265625, "score": 0.9791688919067383}, |
|
|
{"name": "L_Eye", "x": 265.16717529296875, "y": 110.43780517578125, "score": 0.9833072428857386}, |
|
|
{"name": "R_Eye", "x": 210.517822265625, "y": 114.45855712890625, "score": 0.9687361121177673}, |
|
|
{"name": "L_Ear", "x": 301.84814453125, "y": 135.83111572265625, "score": 0.9493670302238464}, |
|
|
{"name": "R_Ear", "x": 175.035888671875, "y": 143.1534423828125, "score": 0.9537781476974487}, |
|
|
{"name": "L_Shoulder", "x": 367.36688232421875, "y": 277.89508056640625, "score": 0.9714463949203491}, |
|
|
{"name": "R_Shoulder", "x": 132.6015625, "y": 287.1273193359375, "score": 0.9208009243011475}, |
|
|
{"name": "L_Elbow", "x": 404.8804931640625, "y": 457.8016357421875, "score": 1.0068358182907104}, |
|
|
{"name": "R_Elbow", "x": 121.6767578125, "y": 466.985595703125, "score": 0.9445005059242249}, |
|
|
{"name": "L_Wrist", "x": 316.5948486328125, "y": 564.1590576171875, "score": 0.9202994108200073}, |
|
|
{"name": "R_Wrist", "x": 218.354248046875, "y": 578.4954833984375, "score": 0.9106894731521606}, |
|
|
{"name": "L_Hip", "x": 343.258056640625, "y": 562.5377197265625, "score": 0.8454821705818176}, |
|
|
{"name": "R_Hip", "x": 191.992431640625, "y": 569.1612548828125, "score": 0.856957733631134}, |
|
|
{"name": "L_Knee", "x": 394.12591552734375, "y": 672.401611328125, "score": 0.8698152899742126}, |
|
|
{"name": "R_Knee", "x": 143.781005859375, "y": 696.0062255859375, "score": 0.8501293659210205}, |
|
|
{"name": "L_Ankle", "x": 353.07330322265625, "y": 853.671142578125, "score": 0.9136713147163391}, |
|
|
{"name": "R_Ankle", "x": 211.80206298828125, "y": 850.3348388671875, "score": 0.8354711532592773} |
|
|
] |
|
|
} |
|
|
] |
|
|
|
|
|
|
|
|
frame_width = 1280 |
|
|
frame_height = 720 |
|
|
target_keypoints = [(kp["x"], kp["y"]) for kp in target_pose[0]["keypoints"]] |
|
|
head_keypoint_indices = [0, 1, 2, 3, 4] |
|
|
head_keypoints = [target_keypoints[i] for i in head_keypoint_indices] |
|
|
target_head_center_x = sum(x for x, y in head_keypoints) / len(head_keypoints) |
|
|
target_head_center_y = sum(y for x, y in head_keypoints) / len(head_keypoints) |
|
|
display_center_x = frame_width / 2 |
|
|
display_center_y = frame_height * 0.2 |
|
|
translate_x = display_center_x - target_head_center_x |
|
|
translate_y = display_center_y - target_head_center_y |
|
|
centered_target_keypoints = [(x + translate_x, y + translate_y) for x, y in target_keypoints] |
|
|
head_keypoints_centered = [centered_target_keypoints[i] for i in head_keypoint_indices] |
|
|
x_coords = [x for x, y in head_keypoints_centered] |
|
|
y_coords = [y for x, y in head_keypoints_centered] |
|
|
bbox_min_x = max(0, min(x_coords) - 20) |
|
|
bbox_max_x = min(frame_width, max(x_coords) + 20) |
|
|
bbox_min_y = max(0, min(y_coords) - 20) |
|
|
bbox_max_y = min(frame_height, max(y_coords) + 20) |
|
|
|
|
|
|
|
|
def euclidean_distance(p1, p2): |
|
|
return np.sqrt((p1[0] - p2[0])**2 + (p1[1] - p2[1])**2) |
|
|
|
|
|
def is_head_pose_matched(user_landmarks, target_keypoints, distance_threshold=25): |
|
|
head_indices_mapping = {0: 0, 2: 1, 5: 2, 7: 3, 8: 4} |
|
|
for mp_idx, target_idx in head_indices_mapping.items(): |
|
|
if mp_idx < len(user_landmarks) and target_idx < len(target_keypoints): |
|
|
distance = euclidean_distance(user_landmarks[mp_idx], target_keypoints[target_idx]) |
|
|
if distance > distance_threshold: |
|
|
return False |
|
|
return True |
|
|
|
|
|
def is_full_body_visible(landmarks, frame_width, frame_height): |
|
|
key_landmarks = [ |
|
|
mp_pose.PoseLandmark.LEFT_SHOULDER, |
|
|
mp_pose.PoseLandmark.RIGHT_SHOULDER, |
|
|
mp_pose.PoseLandmark.LEFT_HIP, |
|
|
mp_pose.PoseLandmark.RIGHT_HIP, |
|
|
] |
|
|
for landmark in key_landmarks: |
|
|
lm = landmarks[landmark] |
|
|
if (lm.visibility < 0.6 or |
|
|
lm.x < 0.05 or lm.x > 0.95 or |
|
|
lm.y < 0.05 or lm.y > 0.95): |
|
|
return False |
|
|
return True |
|
|
|
|
|
def _calculate_raw_head_angles_user_method(landmark_list): |
|
|
required_indices = [mp_pose.PoseLandmark.NOSE, mp_pose.PoseLandmark.LEFT_EAR, mp_pose.PoseLandmark.RIGHT_EAR, |
|
|
mp_pose.PoseLandmark.LEFT_EYE, mp_pose.PoseLandmark.RIGHT_EYE] |
|
|
if landmark_list is None or len(landmark_list) <= max(idx.value for idx in required_indices): |
|
|
return None |
|
|
for l_idx_enum in required_indices: |
|
|
if landmark_list[l_idx_enum.value].visibility < 0.5: |
|
|
return None |
|
|
nose = landmark_list[mp_pose.PoseLandmark.NOSE.value] |
|
|
left_ear = landmark_list[mp_pose.PoseLandmark.LEFT_EAR.value] |
|
|
right_ear = landmark_list[mp_pose.PoseLandmark.RIGHT_EAR.value] |
|
|
left_eye = landmark_list[mp_pose.PoseLandmark.LEFT_EYE.value] |
|
|
right_eye = landmark_list[mp_pose.PoseLandmark.RIGHT_EYE.value] |
|
|
mid_ear = np.array([(left_ear.x + right_ear.x) / 2, |
|
|
(left_ear.y + right_ear.y) / 2, |
|
|
(left_ear.z + right_ear.z) / 2]) |
|
|
nose_vec = mid_ear - np.array([nose.x, nose.y, nose.z]) |
|
|
yaw = -np.degrees(np.arctan2(nose_vec[0], nose_vec[2] + 1e-6)) |
|
|
eye_mid = np.array([(left_eye.x + right_eye.x) / 2, |
|
|
(left_eye.y + right_eye.y) / 2, |
|
|
(left_eye.z + right_eye.z) / 2]) |
|
|
nose_to_eye = np.array([nose.x, nose.y, nose.z]) - eye_mid |
|
|
pitch = np.degrees(np.arctan2(nose_to_eye[1], np.sqrt(nose_to_eye[0]**2 + nose_to_eye[2]**2 + 1e-6))) |
|
|
ear_vec_2d = np.array([left_ear.x - right_ear.x, left_ear.y - right_ear.y]) |
|
|
roll = np.degrees(np.arctan2(ear_vec_2d[1], ear_vec_2d[0] + 1e-6)) |
|
|
return yaw, -(pitch - 50), roll |
|
|
|
|
|
def get_head_angles(pose_results): |
|
|
raw_yaw, raw_pitch, raw_roll = 0.0, 0.0, 0.0 |
|
|
if pose_results and pose_results.pose_landmarks: |
|
|
try: |
|
|
angles = _calculate_raw_head_angles_user_method( |
|
|
pose_results.pose_landmarks.landmark |
|
|
) |
|
|
if angles is not None: |
|
|
raw_yaw, raw_pitch, raw_roll = angles |
|
|
except Exception as e: |
|
|
logging.error(f"Error in get_head_angles: {e}") |
|
|
kf.predict() |
|
|
kf.update(np.array([raw_yaw, raw_pitch, raw_roll])) |
|
|
smoothed_yaw, smoothed_pitch, smoothed_roll = kf.x[:3] |
|
|
return smoothed_yaw * ANGLE_SCALE * 3, smoothed_pitch * ANGLE_SCALE, smoothed_roll * ANGLE_SCALE |
|
|
|
|
|
def wrap_angle_180(angle): |
|
|
wrapped_angle = np.fmod(angle + 180, 360) |
|
|
if wrapped_angle < 0: |
|
|
wrapped_angle += 360 |
|
|
return wrapped_angle - 180 |
|
|
|
|
|
|
|
|
class VideoProcessor(VideoProcessorBase): |
|
|
def __init__(self): |
|
|
self.visibility_confirmed = False |
|
|
self.match_start_time = None |
|
|
self.match_duration_threshold = 5 |
|
|
self.pose_held = False |
|
|
self.bppv_step_1 = False |
|
|
self.bppv_step_2 = False |
|
|
self.bppv_step_3 = False |
|
|
self.bppv_step_4 = False |
|
|
self.bppv_start_time = None |
|
|
self.bppv_duration_threshold = 30 |
|
|
self.neutral_hold_threshold = 5 |
|
|
self.bppv_pose_held_time = 0 |
|
|
self.mission_complete = False |
|
|
self.step_3_complete = False |
|
|
self.all_missions_complete = False |
|
|
self.last_speech_time = 0 |
|
|
self.speech_interval = 3 |
|
|
self.in_correct_pose_step_1 = False |
|
|
self.in_correct_pose_step_2 = False |
|
|
self.in_correct_pose_step_3 = False |
|
|
self.in_correct_pose_step_4 = False |
|
|
self.head_shake_count = 0 |
|
|
self.head_shake_complete = False |
|
|
self.last_yaw = 0 |
|
|
self.yaw_direction = 0 |
|
|
self.yaw_threshold = 15 |
|
|
self.target_yaw_min_step_1 = 25 |
|
|
self.target_yaw_max_step_1 = 65 |
|
|
self.target_yaw_min_step_2 = -20 |
|
|
self.target_yaw_max_step_2 = 20 |
|
|
self.target_pitch_min_step_2 = 70 |
|
|
self.target_pitch_max_step_2 = 110 |
|
|
self.target_roll_min_step_2 = -120 |
|
|
self.target_roll_max_step_2 = -80 |
|
|
self.target_yaw_min_step_3 = 153 |
|
|
self.target_yaw_max_step_3 = 193 |
|
|
self.target_pitch_min_step_3 = 17 |
|
|
self.target_pitch_max_step_3 = 57 |
|
|
self.target_roll_min_step_3 = 77 |
|
|
self.target_roll_max_step_3 = 117 |
|
|
self.target_yaw_min_step_4 = -160 |
|
|
self.target_yaw_max_step_4 = 160 |
|
|
self.target_pitch_min_step_4 = 0 |
|
|
self.target_pitch_max_step_4 = 30 |
|
|
self.target_roll_min_step_4 = -160 |
|
|
self.target_roll_max_step_4 = 160 |
|
|
|
|
|
def recv(self, frame): |
|
|
img = frame.to_ndarray(format="bgr24") |
|
|
img = cv2.flip(img, 1) |
|
|
img = cv2.resize(img, (frame_width, frame_height)) |
|
|
image_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) |
|
|
pose_results = pose.process(image_rgb) |
|
|
img = cv2.cvtColor(image_rgb, cv2.COLOR_RGB2BGR) |
|
|
|
|
|
current_time = time.time() |
|
|
|
|
|
if pose_results.pose_landmarks: |
|
|
landmarks = pose_results.pose_landmarks.landmark |
|
|
user_landmarks = [(lm.x * frame_width, lm.y * frame_height) for lm in landmarks] |
|
|
|
|
|
|
|
|
if not self.visibility_confirmed: |
|
|
if is_full_body_visible(landmarks, frame_width, frame_height): |
|
|
self.visibility_confirmed = True |
|
|
cv2.putText(img, "Visibility Confirmed!", (frame_width // 4, frame_height // 2 - 50), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA) |
|
|
if current_time - self.last_speech_time > self.speech_interval: |
|
|
speak("Full body visibility confirmed. Please adjust your head to match the position that your eye and nose point are fully inside the box and box should be green", force=True) |
|
|
self.last_speech_time = current_time |
|
|
else: |
|
|
cv2.putText(img, "Please move back for full body visibility", (frame_width // 4 - 50, frame_height // 2), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 3, cv2.LINE_AA) |
|
|
if current_time - self.last_speech_time > self.speech_interval: |
|
|
speak("Please move back to ensure your full body is visible in the frame.") |
|
|
self.last_speech_time = current_time |
|
|
self.match_start_time = None |
|
|
self.pose_held = False |
|
|
self.bppv_step_1 = False |
|
|
self.bppv_step_2 = False |
|
|
self.bppv_step_3 = False |
|
|
self.bppv_step_4 = False |
|
|
self.mission_complete = False |
|
|
self.step_3_complete = False |
|
|
self.all_missions_complete = False |
|
|
self.in_correct_pose_step_1 = False |
|
|
self.in_correct_pose_step_2 = False |
|
|
self.in_correct_pose_step_3 = False |
|
|
self.in_correct_pose_step_4 = False |
|
|
self.head_shake_count = 0 |
|
|
self.head_shake_complete = False |
|
|
self.yaw_direction = 0 |
|
|
|
|
|
|
|
|
elif self.visibility_confirmed and not self.pose_held: |
|
|
head_pose_matched = is_head_pose_matched(user_landmarks, centered_target_keypoints) |
|
|
bbox_color = (0, 255, 0) if head_pose_matched else (0, 0, 255) |
|
|
cv2.rectangle(img, (int(bbox_min_x), int(bbox_min_y)), (int(bbox_max_x), int(bbox_max_y)), |
|
|
bbox_color, 2) |
|
|
|
|
|
if head_pose_matched: |
|
|
if self.match_start_time is None: |
|
|
self.match_start_time = current_time |
|
|
if current_time - self.last_speech_time > self.speech_interval: |
|
|
speak("Hold your head in this position.") |
|
|
self.last_speech_time = current_time |
|
|
else: |
|
|
elapsed_time = current_time - self.match_start_time |
|
|
if elapsed_time >= self.match_duration_threshold: |
|
|
self.pose_held = True |
|
|
self.bppv_step_1 = True |
|
|
speak("Calibration complete. Now turn your head 45 degrees to the right and hold for 30 seconds.", force=True) |
|
|
self.last_speech_time = current_time |
|
|
self.bppv_start_time = current_time |
|
|
else: |
|
|
remaining_time = max(0, self.match_duration_threshold - elapsed_time) |
|
|
cv2.putText(img, f"Hold Head Pose for {remaining_time:.1f}s", |
|
|
(frame_width // 4, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) |
|
|
else: |
|
|
self.match_start_time = None |
|
|
if current_time - self.last_speech_time > self.speech_interval: |
|
|
speak("Adjust your head to make the box green for 5 seconds.", force=True) |
|
|
self.last_speech_time = current_time |
|
|
cv2.putText(img, "Adjust eye and nose in the centre of box", (frame_width // 4, 50), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) |
|
|
|
|
|
|
|
|
elif self.pose_held and self.bppv_step_1 and not self.mission_complete: |
|
|
current_head_yaw, current_head_pitch, current_head_roll = get_head_angles(pose_results) |
|
|
display_yaw = wrap_angle_180(current_head_yaw) |
|
|
display_pitch = wrap_angle_180(current_head_pitch) |
|
|
display_roll = wrap_angle_180(current_head_roll) |
|
|
|
|
|
yaw_correct = self.target_yaw_min_step_1 <= display_yaw <= self.target_yaw_max_step_1 |
|
|
|
|
|
if yaw_correct: |
|
|
if not self.in_correct_pose_step_1: |
|
|
speak("Hold this position for 30 seconds.", force=True) |
|
|
self.last_speech_time = current_time |
|
|
self.in_correct_pose_step_1 = True |
|
|
if self.bppv_start_time is None: |
|
|
self.bppv_start_time = current_time |
|
|
self.bppv_pose_held_time = current_time - self.bppv_start_time |
|
|
remaining_time = max(0, self.bppv_duration_threshold - self.bppv_pose_held_time) |
|
|
cv2.putText(img, f"Hold Head at this position for {remaining_time:.1f}s", |
|
|
(frame_width // 4, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) |
|
|
if current_time - self.last_speech_time > self.speech_interval: |
|
|
speak(f"Hold your head at this position {remaining_time:.1f} seconds remaining.") |
|
|
self.last_speech_time = current_time |
|
|
if self.bppv_pose_held_time >= self.bppv_duration_threshold: |
|
|
self.mission_complete = True |
|
|
self.bppv_step_2 = True |
|
|
speak("Step 1 complete. Now, slowly lie down on your left side, so that your right ear rests on the bed.Keep your head aligned—same position as before.Hold this pose for 30 seconds and stay relaxed.", force=True) |
|
|
self.last_speech_time = current_time |
|
|
self.bppv_start_time = None |
|
|
self.bppv_pose_held_time = 0 |
|
|
self.in_correct_pose_step_1 = False |
|
|
else: |
|
|
self.bppv_start_time = None |
|
|
self.in_correct_pose_step_1 = False |
|
|
if display_yaw < self.target_yaw_min_step_1: |
|
|
if current_time - self.last_speech_time > self.speech_interval: |
|
|
speak("Turn your head further to the right.", force=True) |
|
|
self.last_speech_time = current_time |
|
|
cv2.putText(img, "Turn head further right", (frame_width // 4, 50), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) |
|
|
elif display_yaw > self.target_yaw_max_step_1: |
|
|
if current_time - self.last_speech_time > self.speech_interval: |
|
|
speak("Turn your head back to the left.", force=True) |
|
|
self.last_speech_time = current_time |
|
|
cv2.putText(img, "Turn head back left", (frame_width // 4, 50), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) |
|
|
|
|
|
cv2.putText(img, f"Yaw: {int(display_yaw)}", (10, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) |
|
|
cv2.putText(img, f"Pitch: {int(display_pitch)}", (10, 150), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) |
|
|
cv2.putText(img, f"Roll: {int(display_roll)}", (10, 200), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) |
|
|
|
|
|
|
|
|
elif self.mission_complete and self.bppv_step_2 and not self.step_3_complete: |
|
|
current_head_yaw, current_head_pitch, current_head_roll = get_head_angles(pose_results) |
|
|
display_yaw = wrap_angle_180(current_head_yaw) |
|
|
display_pitch = wrap_angle_180(current_head_pitch) |
|
|
display_roll = wrap_angle_180(current_head_roll) |
|
|
|
|
|
yaw_correct = self.target_yaw_min_step_2 <= display_yaw <= self.target_yaw_max_step_2 |
|
|
pitch_correct = self.target_pitch_min_step_2 <= display_pitch <= self.target_pitch_max_step_2 |
|
|
roll_correct = self.target_roll_min_step_2 <= display_roll <= self.target_roll_max_step_2 |
|
|
pose_correct = yaw_correct and pitch_correct and roll_correct |
|
|
|
|
|
if pose_correct: |
|
|
if not self.in_correct_pose_step_2: |
|
|
speak("Hold this position for 30 seconds.", force=True) |
|
|
self.last_speech_time = current_time |
|
|
self.in_correct_pose_step_2 = True |
|
|
if self.bppv_start_time is None: |
|
|
self.bppv_start_time = current_time |
|
|
self.bppv_pose_held_time = current_time - self.bppv_start_time |
|
|
remaining_time = max(0, self.bppv_duration_threshold - self.bppv_pose_held_time) |
|
|
cv2.putText(img, f"Hold Head at this position for {remaining_time:.1f}s", |
|
|
(frame_width // 4, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) |
|
|
if current_time - self.last_speech_time > self.speech_interval: |
|
|
speak(f"Hold your head in this position. {remaining_time:.1f} seconds remaining.") |
|
|
self.last_speech_time = current_time |
|
|
if self.bppv_pose_held_time >= self.bppv_duration_threshold: |
|
|
self.step_3_complete = True |
|
|
self.bppv_step_3 = True |
|
|
speak("Step 2 complete. stay you head at the same angle, and roll your body to right and hold for 30 seconds.", force=True) |
|
|
self.last_speech_time = current_time |
|
|
self.bppv_start_time = None |
|
|
self.bppv_pose_held_time = 0 |
|
|
self.in_correct_pose_step_2 = False |
|
|
else: |
|
|
self.bppv_start_time = None |
|
|
self.in_correct_pose_step_2 = False |
|
|
error_messages = [] |
|
|
if not yaw_correct: |
|
|
if display_yaw < self.target_yaw_min_step_2: |
|
|
error_messages.append("Turn your head to the left.") |
|
|
if current_time - self.last_speech_time > self.speech_interval: |
|
|
speak("Turn your head to the left.", force=True) |
|
|
self.last_speech_time = current_time |
|
|
elif display_yaw > self.target_yaw_max_step_2: |
|
|
error_messages.append("Turn your head to the right.") |
|
|
if current_time - self.last_speech_time > self.speech_interval: |
|
|
speak("Turn your head to the right.", force=True) |
|
|
self.last_speech_time = current_time |
|
|
if not pitch_correct: |
|
|
if display_pitch < self.target_pitch_min_step_2: |
|
|
error_messages.append("Tilt your head further up.") |
|
|
if current_time - self.last_speech_time > self.speech_interval: |
|
|
speak("Tilt your head further up.", force=True) |
|
|
self.last_speech_time = current_time |
|
|
elif display_pitch > self.target_pitch_max_step_2: |
|
|
error_messages.append("Tilt your head down.") |
|
|
if current_time - self.last_speech_time > self.speech_interval: |
|
|
speak("Tilt your head down.", force=True) |
|
|
self.last_speech_time = current_time |
|
|
if not roll_correct: |
|
|
if display_roll < self.target_roll_min_step_2: |
|
|
error_messages.append("bend your head more to the left.") |
|
|
if current_time - self.last_speech_time > self.speech_interval: |
|
|
speak("bend your head more to the left.", force=True) |
|
|
self.last_speech_time = current_time |
|
|
elif display_roll > self.target_roll_max_step_2: |
|
|
error_messages.append("bend your head to the right.") |
|
|
if current_time - self.last_speech_time > self.speech_interval: |
|
|
speak("bend your head to the right.", force=True) |
|
|
self.last_speech_time = current_time |
|
|
error_text = " ".join(error_messages) if error_messages else "Adjust head to target pose." |
|
|
cv2.putText(img, error_text, (frame_width // 4, 50), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) |
|
|
|
|
|
cv2.putText(img, f"Yaw: {int(display_yaw)}", (10, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) |
|
|
cv2.putText(img, f"Pitch: {int(display_pitch)}", (10, 150), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) |
|
|
cv2.putText(img, f"Roll: {int(display_roll)}", (10, 200), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) |
|
|
|
|
|
|
|
|
elif self.step_3_complete and self.bppv_step_3 and not self.bppv_step_4: |
|
|
current_head_yaw, current_head_pitch, current_head_roll = get_head_angles(pose_results) |
|
|
display_yaw = wrap_angle_180(current_head_yaw) |
|
|
display_pitch = wrap_angle_180(current_head_pitch) |
|
|
display_roll = wrap_angle_180(current_head_roll) |
|
|
|
|
|
yaw_correct = self.target_yaw_min_step_3 <= display_yaw <= self.target_yaw_max_step_3 |
|
|
pitch_correct = self.target_pitch_min_step_3 <= display_pitch <= self.target_pitch_max_step_3 |
|
|
roll_correct = self.target_roll_min_step_3 <= display_roll <= self.target_roll_max_step_3 |
|
|
pose_correct = yaw_correct and pitch_correct and roll_correct |
|
|
|
|
|
if pose_correct: |
|
|
if not self.in_correct_pose_step_3: |
|
|
speak("Hold this position for 30 seconds.", force=True) |
|
|
self.last_speech_time = current_time |
|
|
self.in_correct_pose_step_3 = True |
|
|
if self.bppv_start_time is None: |
|
|
self.bppv_start_time = current_time |
|
|
self.bppv_pose_held_time = current_time - self.bppv_start_time |
|
|
remaining_time = max(0, self.bppv_duration_threshold - self.bppv_pose_held_time) |
|
|
cv2.putText(img, f"Hold Head at this position for {remaining_time:.1f}s", |
|
|
(frame_width // 4, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) |
|
|
if current_time - self.last_speech_time > self.speech_interval: |
|
|
speak(f"Hold your head in this position. {remaining_time:.1f} seconds remaining.") |
|
|
self.last_speech_time = current_time |
|
|
if self.bppv_pose_held_time >= self.bppv_duration_threshold: |
|
|
self.bppv_step_4 = True |
|
|
speak("Step 3 complete. Now shake your head side to side 2 to 3 times, then sit on the opposite side of the bed in a neutral position.", force=True) |
|
|
self.last_speech_time = current_time |
|
|
self.bppv_start_time = None |
|
|
self.bppv_pose_held_time = 0 |
|
|
self.in_correct_pose_step_3 = False |
|
|
self.last_yaw = display_yaw |
|
|
else: |
|
|
self.bppv_start_time = None |
|
|
self.in_correct_pose_step_3 = False |
|
|
error_messages = [] |
|
|
if not yaw_correct: |
|
|
if display_yaw < self.target_yaw_min_step_3: |
|
|
error_messages.append("Turn your head further to the left.") |
|
|
if current_time - self.last_speech_time > self.speech_interval: |
|
|
speak("Turn your head further to the left.", force=True) |
|
|
self.last_speech_time = current_time |
|
|
elif display_yaw > self.target_yaw_max_step_3: |
|
|
error_messages.append("Turn your head back to the right.") |
|
|
if current_time - self.last_speech_time > self.speech_interval: |
|
|
speak("Turn your head back to the right.", force=True) |
|
|
self.last_speech_time = current_time |
|
|
if not pitch_correct: |
|
|
if display_pitch < self.target_pitch_min_step_3: |
|
|
error_messages.append("Tilt your head further up.") |
|
|
if current_time - self.last_speech_time > self.speech_interval: |
|
|
speak("Tilt your head further up.", force=True) |
|
|
self.last_speech_time = current_time |
|
|
elif display_pitch > self.target_pitch_max_step_3: |
|
|
error_messages.append("Tilt your head down.") |
|
|
if current_time - self.last_speech_time > self.speech_interval: |
|
|
speak("Tilt your head down.", force=True) |
|
|
self.last_speech_time = current_time |
|
|
if not roll_correct: |
|
|
if display_roll < self.target_roll_min_step_3: |
|
|
error_messages.append("bend your head more to the right.") |
|
|
if current_time - self.last_speech_time > self.speech_interval: |
|
|
speak("bend your head more to the right.", force=True) |
|
|
self.last_speech_time = current_time |
|
|
elif display_roll > self.target_roll_max_step_3: |
|
|
error_messages.append("bend your head to the left.") |
|
|
if current_time - self.last_speech_time > self.speech_interval: |
|
|
speak("bend your head to the left.", force=True) |
|
|
self.last_speech_time = current_time |
|
|
error_text = " ".join(error_messages) if error_messages else "Adjust head to target pose." |
|
|
cv2.putText(img, error_text, (frame_width // 4, 50), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) |
|
|
|
|
|
cv2.putText(img, f"Yaw: {int(display_yaw)}", (10, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) |
|
|
cv2.putText(img, f"Pitch: {int(display_pitch)}", (10, 150), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) |
|
|
cv2.putText(img, f"Roll: {int(display_roll)}", (10, 200), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) |
|
|
|
|
|
|
|
|
elif self.bppv_step_4 and not self.all_missions_complete: |
|
|
current_head_yaw, current_head_pitch, current_head_roll = get_head_angles(pose_results) |
|
|
display_yaw = wrap_angle_180(current_head_yaw) |
|
|
display_pitch = wrap_angle_180(current_head_pitch) |
|
|
display_roll = wrap_angle_180(current_head_roll) |
|
|
|
|
|
if not self.head_shake_complete: |
|
|
yaw_change = display_yaw - self.last_yaw |
|
|
if yaw_change > self.yaw_threshold and self.yaw_direction != 1: |
|
|
self.yaw_direction = 1 |
|
|
self.head_shake_count += 0.5 |
|
|
elif yaw_change < -self.yaw_threshold and self.yaw_direction != -1: |
|
|
self.yaw_direction = -1 |
|
|
self.head_shake_count += 0.5 |
|
|
|
|
|
self.last_yaw = display_yaw |
|
|
|
|
|
if self.head_shake_count < 2: |
|
|
cv2.putText(img, f"Shake head side to side ({int(self.head_shake_count*2)}/2-3 shakes)", |
|
|
(frame_width // 4, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) |
|
|
if current_time - self.last_speech_time > self.speech_interval: |
|
|
speak("Keep shaking your head side to side.", force=True) |
|
|
self.last_speech_time = current_time |
|
|
else: |
|
|
self.head_shake_complete = True |
|
|
speak("Now sit on the opposite side of the bed in a neutral position.", force=True) |
|
|
self.last_speech_time = current_time |
|
|
self.bppv_start_time = None |
|
|
self.in_correct_pose_step_4 = False |
|
|
|
|
|
else: |
|
|
yaw_correct = (display_yaw < self.target_yaw_min_step_4) or (display_yaw > self.target_yaw_max_step_4) |
|
|
pitch_correct = (self.target_pitch_min_step_4 <= display_pitch <= self.target_pitch_max_step_4) |
|
|
roll_correct = (display_roll < self.target_roll_min_step_4) or (display_roll > self.target_roll_max_step_4) |
|
|
pose_correct = yaw_correct and pitch_correct and roll_correct |
|
|
|
|
|
if pose_correct: |
|
|
if not self.in_correct_pose_step_4: |
|
|
speak("Hold this neutral position for 30 seconds.", force=True) |
|
|
self.last_speech_time = current_time |
|
|
self.in_correct_pose_step_4 = True |
|
|
if self.bppv_start_time is None: |
|
|
self.bppv_start_time = current_time |
|
|
self.bppv_pose_held_time = current_time - self.bppv_start_time |
|
|
remaining_time = max(0, self.neutral_hold_threshold - self.bppv_pose_held_time) |
|
|
cv2.putText(img, f"Hold Neutral Position for {remaining_time:.1f}s", |
|
|
(frame_width // 4, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) |
|
|
if current_time - self.last_speech_time > self.speech_interval: |
|
|
speak(f"Hold this neutral position. {remaining_time:.1f} seconds remaining.") |
|
|
self.last_speech_time = current_time |
|
|
if self.bppv_pose_held_time >= self.neutral_hold_threshold: |
|
|
self.all_missions_complete = True |
|
|
speak("You have successfully completed the maneuver.", force=True) |
|
|
self.last_speech_time = current_time |
|
|
else: |
|
|
self.bppv_start_time = None |
|
|
self.in_correct_pose_step_4 = False |
|
|
error_messages = [] |
|
|
if not yaw_correct: |
|
|
if display_yaw >= 0: |
|
|
error_messages.append("Turn your head further right.") |
|
|
if current_time - self.last_speech_time > self.speech_interval: |
|
|
speak("Turn your head further right.", force=True) |
|
|
self.last_speech_time = current_time |
|
|
else: |
|
|
error_messages.append("Turn your head further left.") |
|
|
if current_time - self.last_speech_time > self.speech_interval: |
|
|
speak("Turn your head further left.", force=True) |
|
|
self.last_speech_time = current_time |
|
|
if not pitch_correct: |
|
|
if display_pitch < self.target_pitch_min_step_4: |
|
|
error_messages.append("Tilt your head further up.") |
|
|
if current_time - self.last_speech_time > self.speech_interval: |
|
|
speak("Tilt your head further up.", force=True) |
|
|
self.last_speech_time = current_time |
|
|
elif display_pitch > self.target_pitch_max_step_4: |
|
|
error_messages.append("Tilt your head down.") |
|
|
if current_time - self.last_speech_time > self.speech_interval: |
|
|
speak("Tilt your head down.", force=True) |
|
|
self.last_speech_time = current_time |
|
|
if not roll_correct: |
|
|
if display_roll < self.target_roll_min_step_4: |
|
|
error_messages.append("bend your head to the right.") |
|
|
if current_time - self.last_speech_time > self.speech_interval: |
|
|
speak("bend your head to the right.", force=True) |
|
|
self.last_speech_time = current_time |
|
|
elif display_roll > self.target_roll_max_step_4: |
|
|
error_messages.append("bend your head to the left.") |
|
|
if current_time - self.last_speech_time > self.speech_interval: |
|
|
speak("bend your head to the left.", force=True) |
|
|
self.last_speech_time = current_time |
|
|
error_text = " ".join(error_messages) if error_messages else "Adjust to neutral position." |
|
|
cv2.putText(img, error_text, (frame_width // 4, 50), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) |
|
|
|
|
|
cv2.putText(img, f"Yaw: {int(display_yaw)}", (10, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) |
|
|
cv2.putText(img, f"Pitch: {int(display_pitch)}", (10, 150), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) |
|
|
cv2.putText(img, f"Roll: {int(display_roll)}", (10, 200), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) |
|
|
|
|
|
|
|
|
elif self.all_missions_complete: |
|
|
cv2.putText(img, "Epley Maneuver Guider Complete!", (frame_width // 4, frame_height // 2), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 255, 0), 3, cv2.LINE_AA) |
|
|
current_head_yaw, current_head_pitch, current_head_roll = get_head_angles(pose_results) |
|
|
display_yaw = wrap_angle_180(current_head_yaw) |
|
|
display_pitch = wrap_angle_180(current_head_pitch) |
|
|
display_roll = wrap_angle_180(current_head_roll) |
|
|
cv2.putText(img, f"Yaw: {int(display_yaw)}", (10, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) |
|
|
cv2.putText(img, f"Pitch: {int(display_pitch)}", (10, 150), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) |
|
|
cv2.putText(img, f"Roll: {int(display_roll)}", (10, 200), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) |
|
|
|
|
|
mp_drawing.draw_landmarks(img, pose_results.pose_landmarks, mp_pose.POSE_CONNECTIONS, |
|
|
landmark_drawing_spec=mp_drawing.DrawingSpec(color=(255, 0, 0), thickness=2, circle_radius=4), |
|
|
connection_drawing_spec=mp_drawing.DrawingSpec(color=(255, 0, 0), thickness=2)) |
|
|
|
|
|
return img |
|
|
|
|
|
def main(): |
|
|
st.title("AI Based BPPV Maneuver Guider") |
|
|
st.write("Ensure your webcam is enabled and follow the instructions to perform the Epley Maneuver.") |
|
|
|
|
|
|
|
|
webrtc_streamer( |
|
|
key="bppv-guider", |
|
|
video_processor_factory=VideoProcessor, |
|
|
rtc_configuration=RTCConfiguration({"iceServers": [{"urls": ["stun:stun.l.google.com:19302"]}]}), |
|
|
media_stream_constraints={"video": True, "audio": False}, |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |