| |
| from scipy.spatial import distance as dist |
| from imutils import face_utils |
| from threading import Thread |
| import numpy as np |
| import cv2 as cv |
| import imutils |
| import dlib |
| import pygame |
| import argparse |
| import os |
|
|
| |
|
|
| |
| haar_cascade_face_detector = "dependencies/haarcascade_frontalface_default.xml" |
| face_detector = cv.CascadeClassifier(haar_cascade_face_detector) |
|
|
| |
| dlib_facial_landmark_predictor = "dependencies/shape_predictor_68_face_landmarks.dat" |
| landmark_predictor = dlib.shape_predictor(dlib_facial_landmark_predictor) |
|
|
| |
| font = cv.FONT_HERSHEY_SIMPLEX |
| |
| |
| EYE_ASPECT_RATIO_THRESHOLD = 0.25 |
| EYE_CLOSED_THRESHOLD = 20 |
| EYE_THRESH_COUNTER = 0 |
| DROWSY_COUNTER = 0 |
| drowsy_alert = False |
|
|
| |
| MOUTH_ASPECT_RATIO_THRESHOLD = 0.5 |
| MOUTH_OPEN_THRESHOLD = 15 |
| YAWN_THRESH_COUNTER = 0 |
| YAWN_COUNTER = 0 |
| yawn_alert = False |
|
|
| |
| FACE_LOST_THRESHOLD = 25 |
| FACE_LOST_COUNTER = 0 |
| HEAD_DOWN_COUNTER = 0 |
| head_down_alert = False |
|
|
| |
| pygame.mixer.init() |
| drowsiness_sound = pygame.mixer.Sound("dependencies/audio/drowsiness-detected.mp3") |
| yawn_sound = pygame.mixer.Sound("dependencies/audio/yawning-detected.mp3") |
| |
|
|
| |
| def play_alarm(sound_to_play): |
| if not pygame.mixer.get_busy(): |
| sound_to_play.play() |
|
|
| def generate_alert(final_eye_ratio, final_mouth_ratio): |
| global EYE_THRESH_COUNTER, YAWN_THRESH_COUNTER |
| global drowsy_alert, yawn_alert |
| global DROWSY_COUNTER, YAWN_COUNTER |
|
|
| |
| if final_eye_ratio < EYE_ASPECT_RATIO_THRESHOLD: |
| EYE_THRESH_COUNTER += 1 |
| if EYE_THRESH_COUNTER >= EYE_CLOSED_THRESHOLD: |
| if not drowsy_alert: |
| DROWSY_COUNTER += 1 |
| drowsy_alert = True |
| Thread(target=play_alarm, args=(drowsiness_sound,)).start() |
| else: |
| EYE_THRESH_COUNTER = 0 |
| drowsy_alert = False |
|
|
| |
| if final_mouth_ratio > MOUTH_ASPECT_RATIO_THRESHOLD: |
| YAWN_THRESH_COUNTER += 1 |
| if YAWN_THRESH_COUNTER >= MOUTH_OPEN_THRESHOLD: |
| if not yawn_alert: |
| YAWN_COUNTER += 1 |
| yawn_alert = True |
| Thread(target=play_alarm, args=(yawn_sound,)).start() |
| else: |
| YAWN_THRESH_COUNTER = 0 |
| yawn_alert = False |
|
|
| def detect_facial_landmarks(x, y, w, h, gray_frame): |
| face = dlib.rectangle(int(x), int(y), int(x + w), int(y + h)) |
| face_landmarks = landmark_predictor(gray_frame, face) |
| face_landmarks = face_utils.shape_to_np(face_landmarks) |
| return face_landmarks |
|
|
| def eye_aspect_ratio(eye): |
| A = dist.euclidean(eye[1], eye[5]) |
| B = dist.euclidean(eye[2], eye[4]) |
| C = dist.euclidean(eye[0], eye[3]) |
| ear = (A + B) / (2.0 * C) |
| return ear |
|
|
| def final_eye_aspect_ratio(shape): |
| (lStart, lEnd) = face_utils.FACIAL_LANDMARKS_IDXS["left_eye"] |
| (rStart, rEnd) = face_utils.FACIAL_LANDMARKS_IDXS["right_eye"] |
| left_eye = shape[lStart:lEnd] |
| right_eye = shape[rStart:rEnd] |
| left_ear = eye_aspect_ratio(left_eye) |
| right_ear = eye_aspect_ratio(right_eye) |
| final_ear = (left_ear + right_ear) / 2.0 |
| return final_ear, left_eye, right_eye |
|
|
| def mouth_aspect_ratio(mouth): |
| A = dist.euclidean(mouth[2], mouth[10]) |
| B = dist.euclidean(mouth[4], mouth[8]) |
| C = dist.euclidean(mouth[0], mouth[6]) |
| mar = (A + B) / (2.0 * C) |
| return mar |
|
|
| def final_mouth_aspect_ratio(shape): |
| (mStart, mEnd) = face_utils.FACIAL_LANDMARKS_IDXS["mouth"] |
| mouth = shape[mStart:mEnd] |
| return mouth_aspect_ratio(mouth), mouth |
|
|
| def head_pose_ratio(shape): |
| nose_tip = shape[30] |
| chin_tip = shape[8] |
| left_face_corner = shape[0] |
| right_face_corner = shape[16] |
| nose_to_chin_dist = dist.euclidean(nose_tip, chin_tip) |
| face_width = dist.euclidean(left_face_corner, right_face_corner) |
| if face_width == 0: |
| return 0.0 |
| hpr = nose_to_chin_dist / face_width |
| return hpr |
|
|
| def reset_counters(): |
| global EYE_THRESH_COUNTER, YAWN_THRESH_COUNTER, FACE_LOST_COUNTER |
| global DROWSY_COUNTER, YAWN_COUNTER, HEAD_DOWN_COUNTER |
| global drowsy_alert, yawn_alert, head_down_alert |
| EYE_THRESH_COUNTER, YAWN_THRESH_COUNTER, FACE_LOST_COUNTER = 0, 0, 0 |
| DROWSY_COUNTER, YAWN_COUNTER, HEAD_DOWN_COUNTER = 0, 0, 0 |
| drowsy_alert, yawn_alert, head_down_alert = False, False, False |
|
|
| def process_frame(frame): |
| global FACE_LOST_COUNTER, head_down_alert, HEAD_DOWN_COUNTER |
| frame = imutils.resize(frame, width=640) |
| gray_frame = cv.cvtColor(frame, cv.COLOR_BGR2GRAY) |
| faces = face_detector.detectMultiScale(gray_frame, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30), flags=cv.CASCADE_SCALE_IMAGE) |
| if len(faces) > 0: |
| FACE_LOST_COUNTER = 0 |
| head_down_alert = False |
| (x, y, w, h) = faces[0] |
| face_landmarks = detect_facial_landmarks(x, y, w, h, gray_frame) |
| final_ear, left_eye, right_eye = final_eye_aspect_ratio(face_landmarks) |
| final_mar, mouth = final_mouth_aspect_ratio(face_landmarks) |
| |
| |
| |
| |
| generate_alert(final_ear, final_mar) |
| cv.putText(frame, f"EAR: {final_ear:.2f}", (10, 30), font, 0.7, (0, 0, 255), 2) |
| cv.putText(frame, f"MAR: {final_mar:.2f}", (10, 60), font, 0.7, (0, 0, 255), 2) |
| else: |
| FACE_LOST_COUNTER += 1 |
| if FACE_LOST_COUNTER >= FACE_LOST_THRESHOLD and not head_down_alert: |
| HEAD_DOWN_COUNTER += 1 |
| head_down_alert = True |
| cv.putText(frame, f"Drowsy: {DROWSY_COUNTER}", (480, 30), font, 0.7, (255, 255, 0), 2) |
| cv.putText(frame, f"Yawn: {YAWN_COUNTER}", (480, 60), font, 0.7, (255, 255, 0), 2) |
| cv.putText(frame, f"Head Down: {HEAD_DOWN_COUNTER}", (480, 90), font, 0.7, (255, 255, 0), 2) |
| if drowsy_alert: cv.putText(frame, "DROWSINESS ALERT!", (150, 30), font, 0.9, (0, 0, 255), 2) |
| if yawn_alert: cv.putText(frame, "YAWN ALERT!", (200, 60), font, 0.9, (0, 0, 255), 2) |
| if head_down_alert: cv.putText(frame, "HEAD NOT VISIBLE!", (180, 90), font, 0.9, (0, 0, 255), 2) |
| return frame |
|
|
| def process_video(input_path, output_path=None): |
| reset_counters() |
| video_stream = cv.VideoCapture(input_path) |
| if not video_stream.isOpened(): |
| print(f"Error: Could not open video file {input_path}") |
| return False |
| |
| fps = int(video_stream.get(cv.CAP_PROP_FPS)) |
| width = int(video_stream.get(cv.CAP_PROP_FRAME_WIDTH)) |
| height = int(video_stream.get(cv.CAP_PROP_FRAME_HEIGHT)) |
| |
| print(f"Processing video: {input_path}") |
| print(f"Original Res: {width}x{height}, FPS: {fps}") |
| |
| video_writer = None |
| if output_path: |
| fourcc = cv.VideoWriter_fourcc(*'mp4v') |
| |
| |
| output_width = 640 |
| |
| output_height = int(height * (output_width / float(width))) |
| output_dims = (output_width, output_height) |
| video_writer = cv.VideoWriter(output_path, fourcc, fps, output_dims) |
| print(f"Outputting video with Res: {output_dims[0]}x{output_dims[1]}") |
|
|
| while True: |
| ret, frame = video_stream.read() |
| if not ret: break |
| |
| processed_frame = process_frame(frame) |
| if video_writer: video_writer.write(processed_frame) |
| |
| video_stream.release() |
| if video_writer: video_writer.release() |
| |
| print("Video processing complete!") |
| print(f"Final Stats - Drowsy: {DROWSY_COUNTER}, Yawn: {YAWN_COUNTER}, Head Down: {HEAD_DOWN_COUNTER}") |
| return True |
|
|
| def run_webcam(): |
| reset_counters() |
| video_stream = cv.VideoCapture(0) |
| if not video_stream.isOpened(): |
| print("Error: Could not open webcam") |
| return False |
| while True: |
| ret, frame = video_stream.read() |
| if not ret: |
| print("Failed to grab frame") |
| break |
| processed_frame = process_frame(frame) |
| cv.imshow("Live Drowsiness and Yawn Detection", processed_frame) |
| if cv.waitKey(1) & 0xFF == ord('q'): break |
| video_stream.release() |
| cv.destroyAllWindows() |
| return True |
|
|
| |
| if __name__ == "__main__": |
| parser = argparse.ArgumentParser(description='Drowsiness Detection System') |
| parser.add_argument('--mode', choices=['webcam', 'video'], default='webcam', help='Mode of operation') |
| parser.add_argument('--input', type=str, help='Input video file path for video mode') |
| parser.add_argument('--output', type=str, help='Output video file path for video mode') |
| args = parser.parse_args() |
| |
| if args.mode == 'webcam': |
| print("Starting webcam detection...") |
| run_webcam() |
| elif args.mode == 'video': |
| if not args.input: |
| print("Error: --input argument is required for video mode.") |
| elif not os.path.exists(args.input): |
| print(f"Error: Input file not found at {args.input}") |
| else: |
| process_video(args.input, args.output) |