|
|
import gradio as gr |
|
|
import numpy as np |
|
|
import cv2 |
|
|
from PIL import Image |
|
|
import io |
|
|
import os |
|
|
import json |
|
|
import time |
|
|
import argparse |
|
|
import tensorflow as tf |
|
|
from tensorflow import keras |
|
|
import math |
|
|
from collections import deque |
|
|
|
|
|
class SpeedDetector: |
|
|
def __init__(self, history_size=30): |
|
|
self.speed_history = deque(maxlen=history_size) |
|
|
self.last_update_time = None |
|
|
self.current_speed = 0 |
|
|
self.speed_change_threshold = 5 |
|
|
self.abnormal_speed_changes = 0 |
|
|
self.speed_deviation_sum = 0 |
|
|
self.speed_change_score = 0 |
|
|
|
|
|
|
|
|
self.prev_gray = None |
|
|
self.prev_points = None |
|
|
self.frame_idx = 0 |
|
|
self.speed_estimate = 60 |
|
|
|
|
|
def update_speed(self, speed_km_h): |
|
|
"""Update with current speed in km/h""" |
|
|
current_time = time.time() |
|
|
|
|
|
|
|
|
self.speed_history.append(speed_km_h) |
|
|
self.current_speed = speed_km_h |
|
|
|
|
|
|
|
|
if len(self.speed_history) < 5: |
|
|
return 0 |
|
|
|
|
|
|
|
|
speed_arr = np.array(self.speed_history) |
|
|
|
|
|
|
|
|
speed_std = np.std(speed_arr) |
|
|
|
|
|
|
|
|
for i in range(1, len(speed_arr)): |
|
|
change = abs(speed_arr[i] - speed_arr[i-1]) |
|
|
if change >= self.speed_change_threshold: |
|
|
self.abnormal_speed_changes += 1 |
|
|
|
|
|
|
|
|
changes = np.abs(np.diff(speed_arr)) |
|
|
avg_change = np.mean(changes) if len(changes) > 0 else 0 |
|
|
|
|
|
|
|
|
self.speed_deviation_sum = min(5, speed_std) / 5 |
|
|
abnormal_change_factor = min(1, self.abnormal_speed_changes / 5) |
|
|
avg_change_factor = min(1, avg_change / self.speed_change_threshold) |
|
|
|
|
|
|
|
|
self.speed_change_score = ( |
|
|
0.4 * self.speed_deviation_sum + |
|
|
0.4 * abnormal_change_factor + |
|
|
0.2 * avg_change_factor |
|
|
) |
|
|
|
|
|
return self.speed_change_score |
|
|
|
|
|
def detect_speed_from_frame(self, frame): |
|
|
"""Detect speed from video frame using optical flow""" |
|
|
if frame is None: |
|
|
return self.current_speed |
|
|
|
|
|
|
|
|
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) |
|
|
|
|
|
|
|
|
if self.prev_gray is None or self.frame_idx % 30 == 0: |
|
|
|
|
|
mask = np.zeros_like(gray) |
|
|
|
|
|
h, w = gray.shape |
|
|
mask[h//2:, :] = 255 |
|
|
|
|
|
corners = cv2.goodFeaturesToTrack(gray, maxCorners=100, qualityLevel=0.01, minDistance=10, mask=mask) |
|
|
if corners is not None and len(corners) > 0: |
|
|
self.prev_points = corners |
|
|
self.prev_gray = gray.copy() |
|
|
else: |
|
|
|
|
|
self.frame_idx += 1 |
|
|
return self.current_speed |
|
|
|
|
|
|
|
|
if self.prev_gray is not None and self.prev_points is not None: |
|
|
|
|
|
new_points, status, _ = cv2.calcOpticalFlowPyrLK(self.prev_gray, gray, self.prev_points, None) |
|
|
|
|
|
|
|
|
if new_points is not None and status is not None: |
|
|
good_new = new_points[status == 1] |
|
|
good_old = self.prev_points[status == 1] |
|
|
|
|
|
|
|
|
if len(good_new) > 0 and len(good_old) > 0: |
|
|
flow_magnitudes = np.sqrt( |
|
|
np.sum((good_new - good_old)**2, axis=1) |
|
|
) |
|
|
avg_flow = np.mean(flow_magnitudes) if len(flow_magnitudes) > 0 else 0 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
flow_threshold = 1.0 |
|
|
|
|
|
if avg_flow > flow_threshold: |
|
|
|
|
|
speed_change = min(5, max(-5, (avg_flow - flow_threshold) * 2)) |
|
|
|
|
|
|
|
|
speed_change = speed_change * 0.3 |
|
|
else: |
|
|
|
|
|
speed_change = -0.1 |
|
|
|
|
|
|
|
|
self.speed_estimate += speed_change |
|
|
|
|
|
self.speed_estimate = max(40, min(120, self.speed_estimate)) |
|
|
|
|
|
|
|
|
self.prev_points = good_new.reshape(-1, 1, 2) |
|
|
|
|
|
|
|
|
self.prev_gray = gray.copy() |
|
|
|
|
|
self.frame_idx += 1 |
|
|
|
|
|
|
|
|
|
|
|
detected_speed = self.speed_estimate |
|
|
|
|
|
|
|
|
self.update_speed(detected_speed) |
|
|
|
|
|
return detected_speed |
|
|
|
|
|
def get_speed_change_score(self): |
|
|
"""Return a score from 0-1 indicating abnormal speed changes""" |
|
|
return self.speed_change_score |
|
|
|
|
|
def reset(self): |
|
|
"""Reset the detector state""" |
|
|
self.speed_history.clear() |
|
|
self.abnormal_speed_changes = 0 |
|
|
self.speed_deviation_sum = 0 |
|
|
self.speed_change_score = 0 |
|
|
self.prev_gray = None |
|
|
self.prev_points = None |
|
|
self.frame_idx = 0 |
|
|
self.speed_estimate = 60 |
|
|
|
|
|
class DrowsinessDetector: |
|
|
def __init__(self): |
|
|
self.model = None |
|
|
self.input_shape = (224, 224, 3) |
|
|
self.face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') |
|
|
self.id2label = {0: "notdrowsy", 1: "drowsy"} |
|
|
self.label2id = {"notdrowsy": 0, "drowsy": 1} |
|
|
|
|
|
|
|
|
self.speed_detector = SpeedDetector() |
|
|
self.SPEED_CHANGE_WEIGHT = 0.15 |
|
|
|
|
|
|
|
|
self.landmark_detection_enabled = False |
|
|
try: |
|
|
import dlib |
|
|
self.detector = dlib.get_frontal_face_detector() |
|
|
predictor_path = "shape_predictor_68_face_landmarks.dat" |
|
|
if not os.path.exists(predictor_path): |
|
|
print(f"Warning: {predictor_path} not found. Downloading...") |
|
|
import urllib.request |
|
|
urllib.request.urlretrieve( |
|
|
"https://github.com/italojs/facial-landmarks-recognition/raw/master/shape_predictor_68_face_landmarks.dat", |
|
|
predictor_path |
|
|
) |
|
|
self.predictor = dlib.shape_predictor(predictor_path) |
|
|
self.landmark_detection_enabled = True |
|
|
print("Facial landmark detection enabled") |
|
|
except Exception as e: |
|
|
print(f"Warning: Facial landmark detection disabled: {e}") |
|
|
print("The system will use a simpler detection method. For better accuracy, install CMake and dlib.") |
|
|
|
|
|
|
|
|
self.EAR_THRESHOLD = 0.25 |
|
|
self.CONSECUTIVE_FRAMES = 20 |
|
|
self.ear_counter = 0 |
|
|
self.GAZE_THRESHOLD = 0.2 |
|
|
self.HEAD_POSE_THRESHOLD = 0.3 |
|
|
|
|
|
|
|
|
self.MODEL_WEIGHT = 0.45 |
|
|
self.EAR_WEIGHT = 0.2 |
|
|
self.GAZE_WEIGHT = 0.1 |
|
|
self.HEAD_POSE_WEIGHT = 0.1 |
|
|
|
|
|
|
|
|
self.prev_drowsy_count = 0 |
|
|
self.drowsy_history = [] |
|
|
self.current_speed = 0 |
|
|
|
|
|
def update_speed(self, speed_km_h): |
|
|
"""Update the current speed""" |
|
|
self.current_speed = speed_km_h |
|
|
return self.speed_detector.update_speed(speed_km_h) |
|
|
|
|
|
def reset_speed_detector(self): |
|
|
"""Reset the speed detector""" |
|
|
self.speed_detector.reset() |
|
|
|
|
|
def load_model(self): |
|
|
"""Load the CNN model from local files""" |
|
|
try: |
|
|
|
|
|
config_path = "huggingface_model/config.json" |
|
|
model_path = "drowsiness_model.h5" |
|
|
|
|
|
|
|
|
with open(config_path, 'r') as f: |
|
|
config = json.load(f) |
|
|
|
|
|
|
|
|
self.model = keras.models.load_model(model_path) |
|
|
|
|
|
|
|
|
print("Model loaded successfully") |
|
|
print(f"Model input shape: {self.model.input_shape}") |
|
|
self.model.summary() |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error loading CNN model: {str(e)}") |
|
|
raise |
|
|
|
|
|
def eye_aspect_ratio(self, eye): |
|
|
"""Calculate the eye aspect ratio""" |
|
|
|
|
|
A = dist.euclidean(eye[1], eye[5]) |
|
|
B = dist.euclidean(eye[2], eye[4]) |
|
|
|
|
|
|
|
|
C = dist.euclidean(eye[0], eye[3]) |
|
|
|
|
|
|
|
|
ear = (A + B) / (2.0 * C) |
|
|
return ear |
|
|
|
|
|
def calculate_gaze(self, eye_points, facial_landmarks): |
|
|
"""Calculate gaze direction""" |
|
|
left_eye_region = np.array([(facial_landmarks.part(i).x, facial_landmarks.part(i).y) for i in range(36, 42)]) |
|
|
right_eye_region = np.array([(facial_landmarks.part(i).x, facial_landmarks.part(i).y) for i in range(42, 48)]) |
|
|
|
|
|
|
|
|
left_eye_center = left_eye_region.mean(axis=0).astype("int") |
|
|
right_eye_center = right_eye_region.mean(axis=0).astype("int") |
|
|
|
|
|
|
|
|
dY = right_eye_center[1] - left_eye_center[1] |
|
|
dX = right_eye_center[0] - left_eye_center[0] |
|
|
angle = np.degrees(np.arctan2(dY, dX)) |
|
|
|
|
|
|
|
|
return abs(angle) / 180.0 |
|
|
|
|
|
def get_head_pose(self, shape): |
|
|
"""Calculate the head pose""" |
|
|
|
|
|
image_points = np.array([ |
|
|
(shape.part(30).x, shape.part(30).y), |
|
|
(shape.part(8).x, shape.part(8).y), |
|
|
(shape.part(36).x, shape.part(36).y), |
|
|
(shape.part(45).x, shape.part(45).y), |
|
|
(shape.part(48).x, shape.part(48).y), |
|
|
(shape.part(54).x, shape.part(54).y) |
|
|
], dtype="double") |
|
|
|
|
|
|
|
|
|
|
|
center_x = np.mean([p[0] for p in image_points]) |
|
|
center_y = np.mean([p[1] for p in image_points]) |
|
|
|
|
|
|
|
|
angle = 0 |
|
|
if len(image_points) > 2: |
|
|
point1 = image_points[0] |
|
|
point2 = image_points[1] |
|
|
angle = abs(math.atan2(point2[1] - point1[1], point2[0] - point1[0])) |
|
|
|
|
|
|
|
|
normalized_pose = min(1.0, abs(angle) / (math.pi/2)) |
|
|
return normalized_pose |
|
|
|
|
|
def detect_face(self, frame): |
|
|
"""Detect face in the frame""" |
|
|
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) |
|
|
faces = self.face_cascade.detectMultiScale(gray, 1.1, 4) |
|
|
if len(faces) > 0: |
|
|
(x, y, w, h) = faces[0] |
|
|
face = frame[y:y+h, x:x+w] |
|
|
return face, (x, y, w, h) |
|
|
return None, None |
|
|
|
|
|
def preprocess_image(self, image): |
|
|
"""Preprocess the input image for CNN""" |
|
|
if image is None: |
|
|
return None |
|
|
|
|
|
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) |
|
|
|
|
|
image = cv2.resize(image, (self.input_shape[0], self.input_shape[1])) |
|
|
|
|
|
image = image.astype(np.float32) / 255.0 |
|
|
|
|
|
image = np.expand_dims(image, axis=0) |
|
|
return image |
|
|
|
|
|
def predict(self, image): |
|
|
"""Make prediction on the input image using multiple features""" |
|
|
if self.model is None: |
|
|
raise ValueError("Model not loaded. Call load_model() first.") |
|
|
|
|
|
|
|
|
drowsy_prob = 0.0 |
|
|
face_coords = None |
|
|
ear_value = 1.0 |
|
|
gaze_value = 0.0 |
|
|
head_pose_value = 0.0 |
|
|
landmark_detection_success = False |
|
|
|
|
|
|
|
|
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) |
|
|
face, face_coords = self.detect_face(image) |
|
|
|
|
|
if face is None: |
|
|
return 0.0, None, "No face detected", {} |
|
|
|
|
|
|
|
|
inputs = self.preprocess_image(face) |
|
|
if inputs is None: |
|
|
return 0.0, face_coords, "Error processing image", {} |
|
|
|
|
|
outputs = self.model.predict(inputs) |
|
|
|
|
|
if outputs.shape[1] == 1: |
|
|
model_prob = outputs[0][0] |
|
|
|
|
|
if model_prob < 0 or model_prob > 1: |
|
|
model_prob = 1 / (1 + np.exp(-model_prob)) |
|
|
else: |
|
|
|
|
|
probs = tf.nn.softmax(outputs, axis=1).numpy() |
|
|
model_prob = probs[0, 1] |
|
|
|
|
|
|
|
|
speed_change_score = self.speed_detector.get_speed_change_score() |
|
|
|
|
|
|
|
|
metrics = { |
|
|
"model_prob": model_prob, |
|
|
"ear": 1.0, |
|
|
"gaze": 0.0, |
|
|
"head_pose": 0.0, |
|
|
"speed_change": speed_change_score |
|
|
} |
|
|
|
|
|
if self.landmark_detection_enabled: |
|
|
try: |
|
|
import dlib |
|
|
from scipy.spatial import distance as dist |
|
|
|
|
|
|
|
|
rects = self.detector(gray, 0) |
|
|
|
|
|
if len(rects) > 0: |
|
|
|
|
|
shape = self.predictor(gray, rects[0]) |
|
|
|
|
|
|
|
|
left_eye = [(shape.part(i).x, shape.part(i).y) for i in range(36, 42)] |
|
|
right_eye = [(shape.part(i).x, shape.part(i).y) for i in range(42, 48)] |
|
|
|
|
|
left_ear = self.eye_aspect_ratio(left_eye) |
|
|
right_ear = self.eye_aspect_ratio(right_eye) |
|
|
ear_value = (left_ear + right_ear) / 2.0 |
|
|
|
|
|
|
|
|
gaze_value = self.calculate_gaze(None, shape) |
|
|
|
|
|
|
|
|
head_pose_value = self.get_head_pose(shape) |
|
|
|
|
|
|
|
|
metrics["ear"] = ear_value |
|
|
metrics["gaze"] = gaze_value |
|
|
metrics["head_pose"] = head_pose_value |
|
|
|
|
|
landmark_detection_success = True |
|
|
except Exception as e: |
|
|
print(f"Error in landmark detection: {e}") |
|
|
else: |
|
|
|
|
|
|
|
|
|
|
|
if face_coords is not None: |
|
|
try: |
|
|
|
|
|
face_gray = cv2.cvtColor(face, cv2.COLOR_BGR2GRAY) |
|
|
face_height, face_width = face_gray.shape[:2] |
|
|
|
|
|
|
|
|
left_eye_region = face_gray[int(face_height*0.2):int(face_height*0.4), int(face_width*0.2):int(face_width*0.4)] |
|
|
right_eye_region = face_gray[int(face_height*0.2):int(face_height*0.4), int(face_width*0.6):int(face_width*0.8)] |
|
|
|
|
|
|
|
|
if left_eye_region.size > 0 and right_eye_region.size > 0: |
|
|
left_eye_avg = np.mean(left_eye_region) / 255.0 |
|
|
right_eye_avg = np.mean(right_eye_region) / 255.0 |
|
|
|
|
|
|
|
|
left_eye_closed = 1.0 - left_eye_avg |
|
|
right_eye_closed = 1.0 - right_eye_avg |
|
|
|
|
|
|
|
|
eye_closure = (left_eye_closed + right_eye_closed) / 2.0 |
|
|
|
|
|
|
|
|
|
|
|
estimated_ear = max(0.15, 0.4 - (eye_closure * 0.25)) |
|
|
ear_value = estimated_ear |
|
|
metrics["ear"] = ear_value |
|
|
except Exception as e: |
|
|
print(f"Error in simplified eye detection: {e}") |
|
|
|
|
|
|
|
|
if landmark_detection_success: |
|
|
|
|
|
eye_state = max(0, min(1, (self.EAR_THRESHOLD - ear_value) * 5)) |
|
|
|
|
|
|
|
|
weighted_avg = ( |
|
|
self.MODEL_WEIGHT * model_prob + |
|
|
self.EAR_WEIGHT * eye_state + |
|
|
self.GAZE_WEIGHT * gaze_value + |
|
|
self.HEAD_POSE_WEIGHT * head_pose_value + |
|
|
self.SPEED_CHANGE_WEIGHT * speed_change_score |
|
|
) |
|
|
|
|
|
|
|
|
drowsy_prob = weighted_avg |
|
|
else: |
|
|
|
|
|
|
|
|
if "ear" in metrics and metrics["ear"] < 1.0: |
|
|
|
|
|
eye_state = max(0, min(1, (self.EAR_THRESHOLD - metrics["ear"]) * 5)) |
|
|
drowsy_prob = (self.MODEL_WEIGHT * model_prob) + ((1 - self.MODEL_WEIGHT - self.SPEED_CHANGE_WEIGHT) * eye_state) + (self.SPEED_CHANGE_WEIGHT * speed_change_score) |
|
|
else: |
|
|
|
|
|
drowsy_prob = (model_prob * 0.85) + (speed_change_score * 0.15) |
|
|
|
|
|
|
|
|
self.drowsy_history.append(drowsy_prob) |
|
|
if len(self.drowsy_history) > 10: |
|
|
self.drowsy_history.pop(0) |
|
|
|
|
|
|
|
|
drowsy_prob = np.median(self.drowsy_history) |
|
|
|
|
|
return drowsy_prob, face_coords, None, metrics |
|
|
|
|
|
|
|
|
detector = DrowsinessDetector() |
|
|
|
|
|
def process_image(image): |
|
|
"""Process image input""" |
|
|
if image is None: |
|
|
return None, "No image provided" |
|
|
|
|
|
try: |
|
|
|
|
|
if image.size == 0 or image.shape[0] == 0 or image.shape[1] == 0: |
|
|
return None, "Invalid image dimensions" |
|
|
|
|
|
|
|
|
processed_image = image.copy() |
|
|
|
|
|
|
|
|
drowsy_prob, face_coords, error, metrics = detector.predict(processed_image) |
|
|
|
|
|
if error: |
|
|
return None, error |
|
|
|
|
|
if face_coords is None: |
|
|
|
|
|
cv2.putText(processed_image, "No face detected", (30, 30), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 0, 255), 2) |
|
|
return processed_image, "No face detected" |
|
|
|
|
|
|
|
|
x, y, w, h = face_coords |
|
|
|
|
|
|
|
|
is_drowsy = drowsy_prob >= 0.7 |
|
|
|
|
|
|
|
|
if drowsy_prob >= 0.85: |
|
|
alert_level = "High Risk" |
|
|
color = (0, 0, 255) |
|
|
elif drowsy_prob >= 0.7: |
|
|
alert_level = "Medium Risk" |
|
|
color = (0, 165, 255) |
|
|
else: |
|
|
alert_level = "Alert" |
|
|
color = (0, 255, 0) |
|
|
|
|
|
cv2.rectangle(processed_image, (x, y), (x+w, y+h), color, 2) |
|
|
|
|
|
|
|
|
y_offset = 25 |
|
|
cv2.putText(processed_image, f"{'Drowsy' if is_drowsy else 'Alert'} ({drowsy_prob:.2f})", |
|
|
(x, y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, color, 2) |
|
|
|
|
|
|
|
|
cv2.putText(processed_image, alert_level, (x, y-35), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2) |
|
|
|
|
|
|
|
|
cv2.putText(processed_image, f"Model: {metrics['model_prob']:.2f}", (10, processed_image.shape[0]-10-y_offset*3), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1) |
|
|
cv2.putText(processed_image, f"Eye Ratio: {metrics['ear']:.2f}", (10, processed_image.shape[0]-10-y_offset*2), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1) |
|
|
cv2.putText(processed_image, f"Head Pose: {metrics['head_pose']:.2f}", (10, processed_image.shape[0]-10-y_offset), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1) |
|
|
|
|
|
|
|
|
if metrics['model_prob'] > 0.9 and metrics['ear'] > 0.25: |
|
|
cv2.putText(processed_image, "Model conflict - verify manually", |
|
|
(10, processed_image.shape[0]-10-y_offset*4), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 165, 255), 1) |
|
|
|
|
|
return processed_image, f"Processed successfully. Drowsiness: {drowsy_prob:.2f}, Alert level: {alert_level}" |
|
|
|
|
|
except Exception as e: |
|
|
import traceback |
|
|
error_details = traceback.format_exc() |
|
|
print(f"Error processing image: {str(e)}\n{error_details}") |
|
|
return None, f"Error processing image: {str(e)}" |
|
|
|
|
|
def process_video(video, initial_speed=60): |
|
|
"""Process video input""" |
|
|
if video is None: |
|
|
return None, "No video provided" |
|
|
|
|
|
try: |
|
|
|
|
|
temp_input = None |
|
|
|
|
|
|
|
|
if isinstance(video, str): |
|
|
print(f"Processing video from path: {video}") |
|
|
|
|
|
cap = cv2.VideoCapture(video) |
|
|
else: |
|
|
print(f"Processing video from uploaded data") |
|
|
|
|
|
import tempfile |
|
|
temp_input = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) |
|
|
temp_input_path = temp_input.name |
|
|
with open(temp_input_path, "wb") as f: |
|
|
f.write(video) |
|
|
cap = cv2.VideoCapture(temp_input_path) |
|
|
|
|
|
if not cap.isOpened(): |
|
|
return None, "Error: Could not open video" |
|
|
|
|
|
|
|
|
fps = cap.get(cv2.CAP_PROP_FPS) |
|
|
if fps <= 0: |
|
|
fps = 30 |
|
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
|
|
|
print(f"Video properties: {width}x{height} at {fps}fps, total frames: {total_frames}") |
|
|
|
|
|
|
|
|
import io |
|
|
import base64 |
|
|
|
|
|
|
|
|
import tempfile |
|
|
temp_output = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) |
|
|
temp_output_path = temp_output.name |
|
|
|
|
|
|
|
|
if os.name == 'nt': |
|
|
|
|
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
|
|
else: |
|
|
|
|
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
|
|
|
|
|
|
|
|
out = cv2.VideoWriter(temp_output_path, fourcc, fps, (width, height)) |
|
|
if not out.isOpened(): |
|
|
return None, "Error: Could not create output video file" |
|
|
|
|
|
|
|
|
detector.reset_speed_detector() |
|
|
|
|
|
|
|
|
current_speed = initial_speed |
|
|
detector.speed_detector.speed_estimate = initial_speed |
|
|
|
|
|
|
|
|
frame_count = 0 |
|
|
processed_count = 0 |
|
|
face_detected_count = 0 |
|
|
drowsy_count = 0 |
|
|
high_risk_count = 0 |
|
|
ear_sum = 0 |
|
|
model_prob_sum = 0 |
|
|
|
|
|
while True: |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
print(f"End of video or error reading frame at frame {frame_count}") |
|
|
break |
|
|
|
|
|
frame_count += 1 |
|
|
|
|
|
|
|
|
current_speed = detector.speed_detector.detect_speed_from_frame(frame) |
|
|
|
|
|
try: |
|
|
|
|
|
processed_frame, message = process_image(frame) |
|
|
|
|
|
|
|
|
if processed_frame is not None: |
|
|
speed_text = f"Speed: {current_speed:.1f} km/h" |
|
|
cv2.putText(processed_frame, speed_text, (10, processed_frame.shape[0]-45), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1) |
|
|
|
|
|
|
|
|
speed_change_score = detector.speed_detector.get_speed_change_score() |
|
|
cv2.putText(processed_frame, f"Speed Variation: {speed_change_score:.2f}", |
|
|
(10, processed_frame.shape[0]-70), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1) |
|
|
|
|
|
if processed_frame is not None: |
|
|
out.write(processed_frame) |
|
|
processed_count += 1 |
|
|
if "No face detected" not in message: |
|
|
face_detected_count += 1 |
|
|
if "Drowsiness" in message: |
|
|
|
|
|
try: |
|
|
drowsy_text = message.split("Drowsiness: ")[1].split(",")[0] |
|
|
drowsy_prob = float(drowsy_text) |
|
|
|
|
|
|
|
|
if drowsy_prob >= 0.7: |
|
|
drowsy_count += 1 |
|
|
if drowsy_prob >= 0.85: |
|
|
high_risk_count += 1 |
|
|
|
|
|
|
|
|
_, _, _, metrics = detector.predict(frame) |
|
|
if 'ear' in metrics: |
|
|
ear_sum += metrics['ear'] |
|
|
if 'model_prob' in metrics: |
|
|
model_prob_sum += metrics['model_prob'] |
|
|
except: |
|
|
pass |
|
|
else: |
|
|
|
|
|
|
|
|
cv2.putText(frame, "Processing failed", (30, 30), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2) |
|
|
out.write(frame) |
|
|
processed_count += 1 |
|
|
print(f"Frame {frame_count}: Processing failed - {message}") |
|
|
except Exception as e: |
|
|
|
|
|
cv2.putText(frame, f"Error: {str(e)[:30]}", (30, 30), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2) |
|
|
out.write(frame) |
|
|
processed_count += 1 |
|
|
print(f"Frame {frame_count}: Exception - {str(e)}") |
|
|
|
|
|
|
|
|
if frame_count % 10 == 0: |
|
|
print(f"Processed {frame_count}/{total_frames} frames") |
|
|
|
|
|
|
|
|
cap.release() |
|
|
out.release() |
|
|
|
|
|
|
|
|
drowsy_percentage = (drowsy_count / face_detected_count * 100) if face_detected_count > 0 else 0 |
|
|
high_risk_percentage = (high_risk_count / face_detected_count * 100) if face_detected_count > 0 else 0 |
|
|
avg_ear = ear_sum / face_detected_count if face_detected_count > 0 else 0 |
|
|
avg_model_prob = model_prob_sum / face_detected_count if face_detected_count > 0 else 0 |
|
|
speed_score = detector.speed_detector.get_speed_change_score() |
|
|
|
|
|
|
|
|
if os.path.exists(temp_output_path) and os.path.getsize(temp_output_path) > 0: |
|
|
print(f"Video processed successfully with {processed_count} frames") |
|
|
print(f"Drowsy frames: {drowsy_count} ({drowsy_percentage:.1f}%), High risk frames: {high_risk_count} ({high_risk_percentage:.1f}%)") |
|
|
print(f"Average eye ratio: {avg_ear:.2f}, Average model probability: {avg_model_prob:.2f}") |
|
|
print(f"Speed change score: {speed_score:.2f}") |
|
|
|
|
|
|
|
|
false_positive_warning = "" |
|
|
if avg_model_prob > 0.8 and avg_ear > 0.25: |
|
|
false_positive_warning = " ⚠️ Possible false positive (eyes open but model detects drowsiness)" |
|
|
|
|
|
result_message = (f"Video processed successfully. Frames: {frame_count}, faces detected: {face_detected_count}, " |
|
|
f"drowsy: {drowsy_count} ({drowsy_percentage:.1f}%), high risk: {high_risk_count} ({high_risk_percentage:.1f}%)." |
|
|
f" Avg eye ratio: {avg_ear:.2f}, Speed score: {speed_score:.2f}{false_positive_warning}") |
|
|
|
|
|
|
|
|
video_result = temp_output_path |
|
|
|
|
|
return video_result, result_message |
|
|
else: |
|
|
print(f"Failed to create output video. Frames read: {frame_count}, processed: {processed_count}") |
|
|
return None, f"Error: Failed to create output video. Frames read: {frame_count}, processed: {processed_count}" |
|
|
|
|
|
except Exception as e: |
|
|
import traceback |
|
|
error_details = traceback.format_exc() |
|
|
print(f"Error processing video: {str(e)}\n{error_details}") |
|
|
return None, f"Error processing video: {str(e)}" |
|
|
finally: |
|
|
|
|
|
if 'out' in locals() and out is not None: |
|
|
out.release() |
|
|
if 'cap' in locals() and cap is not None: |
|
|
cap.release() |
|
|
|
|
|
|
|
|
if temp_input is not None: |
|
|
try: |
|
|
os.unlink(temp_input.name) |
|
|
except: |
|
|
pass |
|
|
|
|
|
def process_webcam(image): |
|
|
"""Process webcam input - returns processed image and status message""" |
|
|
return process_image(image) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
parser = argparse.ArgumentParser(description="Driver Drowsiness Detection App") |
|
|
parser.add_argument("--share", action="store_true", help="Create a public link (may trigger security warnings)") |
|
|
parser.add_argument("--port", type=int, default=7860, help="Port to run the app on") |
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
|
if args.share: |
|
|
print("WARNING: Running with --share may trigger security warnings on some systems.") |
|
|
print("The app will be accessible from the internet through a temporary URL.") |
|
|
|
|
|
|
|
|
import atexit |
|
|
import glob |
|
|
import shutil |
|
|
|
|
|
def cleanup_temp_files(): |
|
|
"""清理所有临时文件""" |
|
|
try: |
|
|
|
|
|
import tempfile |
|
|
temp_dir = tempfile.gettempdir() |
|
|
pattern = os.path.join(temp_dir, "tmp*") |
|
|
for file in glob.glob(pattern): |
|
|
try: |
|
|
if os.path.isfile(file): |
|
|
os.remove(file) |
|
|
except Exception as e: |
|
|
print(f"Failed to delete {file}: {e}") |
|
|
|
|
|
|
|
|
for ext in [".mp4", ".avi"]: |
|
|
pattern = os.path.join(temp_dir, f"*{ext}") |
|
|
for file in glob.glob(pattern): |
|
|
try: |
|
|
os.remove(file) |
|
|
except Exception as e: |
|
|
print(f"Failed to delete {file}: {e}") |
|
|
|
|
|
print("Cleaned up temporary files") |
|
|
except Exception as e: |
|
|
print(f"Error during cleanup: {e}") |
|
|
|
|
|
|
|
|
atexit.register(cleanup_temp_files) |
|
|
|
|
|
|
|
|
detector.load_model() |
|
|
|
|
|
|
|
|
with gr.Blocks(title="Driver Drowsiness Detection") as demo: |
|
|
gr.Markdown(""" |
|
|
# 🚗 Driver Drowsiness Detection System |
|
|
|
|
|
This system detects driver drowsiness using computer vision and deep learning. |
|
|
|
|
|
## Features: |
|
|
- Image analysis |
|
|
- Video processing with speed monitoring |
|
|
- Webcam detection (PC and mobile) |
|
|
- Multi-factor drowsiness prediction (face, eyes, head pose, speed changes) |
|
|
""") |
|
|
|
|
|
with gr.Tabs(): |
|
|
with gr.Tab("Image"): |
|
|
gr.Markdown("Upload an image for drowsiness detection") |
|
|
with gr.Row(): |
|
|
image_input = gr.Image(label="Input Image", type="numpy") |
|
|
image_output = gr.Image(label="Processed Image") |
|
|
with gr.Row(): |
|
|
status_output = gr.Textbox(label="Status") |
|
|
image_input.change( |
|
|
fn=process_image, |
|
|
inputs=[image_input], |
|
|
outputs=[image_output, status_output] |
|
|
) |
|
|
|
|
|
with gr.Tab("Video"): |
|
|
gr.Markdown(""" |
|
|
### 上傳駕駛視頻進行困倦檢測 |
|
|
|
|
|
系統將自動從視頻中檢測以下內容: |
|
|
- 駕駛員面部表情和眼睛狀態 |
|
|
- 車輛速度變化 (通過視頻中的光流分析) |
|
|
- 當車速變化超過 ±5 km/h 時將被視為異常駕駛行為 |
|
|
|
|
|
**注意:** 處理後的視頻不會保存到本地文件夾,請使用界面右上角的下載按鈕保存結果。 |
|
|
""") |
|
|
with gr.Row(): |
|
|
video_input = gr.Video(label="輸入視頻") |
|
|
video_output = gr.Video(label="處理後視頻 (點擊右上角下載)") |
|
|
with gr.Row(): |
|
|
initial_speed = gr.Slider(minimum=10, maximum=120, value=60, label="初始車速估計值 (km/h)", |
|
|
info="僅作為初始估計值,系統會自動從視頻中檢測實際速度變化") |
|
|
with gr.Row(): |
|
|
video_status = gr.Textbox(label="處理狀態") |
|
|
with gr.Row(): |
|
|
process_btn = gr.Button("處理視頻") |
|
|
clear_btn = gr.Button("清除") |
|
|
|
|
|
process_btn.click( |
|
|
fn=process_video, |
|
|
inputs=[video_input, initial_speed], |
|
|
outputs=[video_output, video_status] |
|
|
) |
|
|
|
|
|
clear_btn.click( |
|
|
fn=lambda: (None, "已清除結果"), |
|
|
inputs=[], |
|
|
outputs=[video_output, video_status] |
|
|
) |
|
|
|
|
|
with gr.Tab("Webcam"): |
|
|
gr.Markdown("Use your webcam or mobile camera for real-time drowsiness detection") |
|
|
with gr.Row(): |
|
|
webcam_input = gr.Image(label="Camera Feed", type="numpy", streaming=True) |
|
|
webcam_output = gr.Image(label="Processed Feed") |
|
|
with gr.Row(): |
|
|
speed_input = gr.Slider(minimum=0, maximum=150, value=60, label="Current Speed (km/h)") |
|
|
update_speed_btn = gr.Button("Update Speed") |
|
|
with gr.Row(): |
|
|
webcam_status = gr.Textbox(label="Status") |
|
|
|
|
|
def process_webcam_with_speed(image, speed): |
|
|
detector.update_speed(speed) |
|
|
return process_image(image) |
|
|
|
|
|
update_speed_btn.click( |
|
|
fn=lambda speed: f"Speed updated to {speed} km/h", |
|
|
inputs=[speed_input], |
|
|
outputs=[webcam_status] |
|
|
) |
|
|
|
|
|
webcam_input.change( |
|
|
fn=process_webcam_with_speed, |
|
|
inputs=[webcam_input, speed_input], |
|
|
outputs=[webcam_output, webcam_status] |
|
|
) |
|
|
|
|
|
gr.Markdown(""" |
|
|
## How It Works |
|
|
This system detects drowsiness using multiple factors: |
|
|
1. **Facial features** - Using a trained CNN model |
|
|
2. **Eye openness** - Measuring eye aspect ratio (EAR) |
|
|
3. **Head position** - Detecting head drooping |
|
|
4. **Automatic speed detection** - Using optical flow analysis to track vehicle movement and detect irregular speed changes |
|
|
|
|
|
The system automatically detects speed changes from the video frames using computer vision techniques: |
|
|
- **Optical flow** is used to track movement between frames |
|
|
- **Irregular speed changes** (±5 km/h) are detected as potential signs of drowsy driving |
|
|
- **No external speed data required** - everything is analyzed directly from the video content |
|
|
|
|
|
Combining these factors provides more reliable drowsiness detection than using facial features alone. |
|
|
""") |
|
|
|
|
|
|
|
|
demo.launch(share=args.share, server_port=args.port) |