File size: 40,704 Bytes
56df4e3 ab21ecf 56df4e3 ab21ecf 56df4e3 0ce4306 ab21ecf 64751dd 9ad85ee ab21ecf 56df4e3 ab21ecf 56df4e3 ab21ecf 56df4e3 ab21ecf e250d86 b45fc62 56df4e3 b45fc62 56df4e3 ab21ecf 56df4e3 ab21ecf e250d86 ab21ecf b45fc62 ab21ecf b45fc62 ab21ecf b45fc62 ab21ecf b45fc62 ab21ecf b45fc62 ab21ecf 56df4e3 9ad85ee ab21ecf 9ad85ee ab21ecf 56df4e3 ab21ecf 64751dd b45fc62 ab21ecf b45fc62 ab21ecf b45fc62 ab21ecf 64751dd ab21ecf 9ad85ee ab21ecf 64751dd b45fc62 ab21ecf b45fc62 ab21ecf b45fc62 9ad85ee b45fc62 ab21ecf 56df4e3 ab21ecf 56df4e3 ab21ecf 64751dd ab21ecf 56df4e3 ab21ecf 56df4e3 ab21ecf 56df4e3 ab21ecf 56df4e3 ab21ecf 56df4e3 ab21ecf 56df4e3 ab21ecf 9ad85ee 56df4e3 ab21ecf 56df4e3 ab21ecf 56df4e3 ab21ecf 56df4e3 ab21ecf b45fc62 ab21ecf 56df4e3 ab21ecf 64751dd ab21ecf 64751dd ab21ecf 64751dd ab21ecf |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 |
import gradio as gr
import numpy as np
import cv2
from PIL import Image
import io
import os
import json
import time
import argparse
import tensorflow as tf
from tensorflow import keras
import math
from collections import deque
class SpeedDetector:
def __init__(self, history_size=30):
self.speed_history = deque(maxlen=history_size)
self.last_update_time = None
self.current_speed = 0
self.speed_change_threshold = 5 # km/h
self.abnormal_speed_changes = 0
self.speed_deviation_sum = 0
self.speed_change_score = 0
# For optical flow speed estimation
self.prev_gray = None
self.prev_points = None
self.frame_idx = 0
self.speed_estimate = 60 # Initial estimate
def update_speed(self, speed_km_h):
"""Update with current speed in km/h"""
current_time = time.time()
# Add to history
self.speed_history.append(speed_km_h)
self.current_speed = speed_km_h
# Not enough data yet
if len(self.speed_history) < 5:
return 0
# Calculate speed variation metrics
speed_arr = np.array(self.speed_history)
# 1. Standard deviation of speed
speed_std = np.std(speed_arr)
# 2. Detect abrupt changes
for i in range(1, len(speed_arr)):
change = abs(speed_arr[i] - speed_arr[i-1])
if change >= self.speed_change_threshold:
self.abnormal_speed_changes += 1
# 3. Calculate average rate of change
changes = np.abs(np.diff(speed_arr))
avg_change = np.mean(changes) if len(changes) > 0 else 0
# Combine into a score (0-1 range)
self.speed_deviation_sum = min(5, speed_std) / 5 # Normalize to 0-1
abnormal_change_factor = min(1, self.abnormal_speed_changes / 5)
avg_change_factor = min(1, avg_change / self.speed_change_threshold)
# Weighted combination
self.speed_change_score = (
0.4 * self.speed_deviation_sum +
0.4 * abnormal_change_factor +
0.2 * avg_change_factor
)
return self.speed_change_score
def detect_speed_from_frame(self, frame):
"""Detect speed from video frame using optical flow"""
if frame is None:
return self.current_speed
# Convert frame to grayscale
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# For the first frame, initialize points to track
if self.prev_gray is None or self.frame_idx % 30 == 0: # Reset tracking points every 30 frames
# Detect good features to track
mask = np.zeros_like(gray)
# Focus on the lower portion of the frame (road)
h, w = gray.shape
mask[h//2:, :] = 255
corners = cv2.goodFeaturesToTrack(gray, maxCorners=100, qualityLevel=0.01, minDistance=10, mask=mask)
if corners is not None and len(corners) > 0:
self.prev_points = corners
self.prev_gray = gray.copy()
else:
# No good points to track
self.frame_idx += 1
return self.current_speed
# Calculate optical flow if we have previous points
if self.prev_gray is not None and self.prev_points is not None:
# Calculate optical flow
new_points, status, _ = cv2.calcOpticalFlowPyrLK(self.prev_gray, gray, self.prev_points, None)
# Filter only valid points
if new_points is not None and status is not None:
good_new = new_points[status == 1]
good_old = self.prev_points[status == 1]
# Calculate flow magnitude
if len(good_new) > 0 and len(good_old) > 0:
flow_magnitudes = np.sqrt(
np.sum((good_new - good_old)**2, axis=1)
)
avg_flow = np.mean(flow_magnitudes) if len(flow_magnitudes) > 0 else 0
# Map optical flow to speed change
# Higher flow = faster movement
# This is a simplified mapping and would need calibration for real-world use
flow_threshold = 1.0 # Adjust based on testing
if avg_flow > flow_threshold:
# Movement detected, estimate acceleration
speed_change = min(5, max(-5, (avg_flow - flow_threshold) * 2))
# Add some temporal smoothing to avoid sudden changes
speed_change = speed_change * 0.3 # Reduce magnitude for smoother change
else:
# Minimal movement, slight deceleration (coasting)
speed_change = -0.1
# Update speed with detected change
self.speed_estimate += speed_change
# Keep speed in reasonable range
self.speed_estimate = max(40, min(120, self.speed_estimate))
# Update tracking points
self.prev_points = good_new.reshape(-1, 1, 2)
# Update previous gray frame
self.prev_gray = gray.copy()
self.frame_idx += 1
# Check for dashboard speedometer (would require more sophisticated OCR in a real system)
# For now, just use our estimated speed
detected_speed = self.speed_estimate
# Update current speed and trigger speed change detection
self.update_speed(detected_speed)
return detected_speed
def get_speed_change_score(self):
"""Return a score from 0-1 indicating abnormal speed changes"""
return self.speed_change_score
def reset(self):
"""Reset the detector state"""
self.speed_history.clear()
self.abnormal_speed_changes = 0
self.speed_deviation_sum = 0
self.speed_change_score = 0
self.prev_gray = None
self.prev_points = None
self.frame_idx = 0
self.speed_estimate = 60 # Reset to initial estimate
class DrowsinessDetector:
def __init__(self):
self.model = None
self.input_shape = (224, 224, 3) # Updated to match model's expected input shape
self.face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
self.id2label = {0: "notdrowsy", 1: "drowsy"}
self.label2id = {"notdrowsy": 0, "drowsy": 1}
# Speed detector
self.speed_detector = SpeedDetector()
self.SPEED_CHANGE_WEIGHT = 0.15 # Weight for speed changes in drowsiness calculation
# 嘗試動態 import dlib,並設置 fallback
self.landmark_detection_enabled = False
try:
import dlib
self.detector = dlib.get_frontal_face_detector()
predictor_path = "shape_predictor_68_face_landmarks.dat"
if not os.path.exists(predictor_path):
print(f"Warning: {predictor_path} not found. Downloading...")
import urllib.request
urllib.request.urlretrieve(
"https://github.com/italojs/facial-landmarks-recognition/raw/master/shape_predictor_68_face_landmarks.dat",
predictor_path
)
self.predictor = dlib.shape_predictor(predictor_path)
self.landmark_detection_enabled = True
print("Facial landmark detection enabled")
except Exception as e:
print(f"Warning: Facial landmark detection disabled: {e}")
print("The system will use a simpler detection method. For better accuracy, install CMake and dlib.")
# Constants for drowsiness detection
self.EAR_THRESHOLD = 0.25 # Eye aspect ratio threshold
self.CONSECUTIVE_FRAMES = 20
self.ear_counter = 0
self.GAZE_THRESHOLD = 0.2 # Gaze direction threshold
self.HEAD_POSE_THRESHOLD = 0.3 # Head pose threshold
# Parameters for weighted ensemble
self.MODEL_WEIGHT = 0.45 # Reduced to accommodate speed factor
self.EAR_WEIGHT = 0.2
self.GAZE_WEIGHT = 0.1
self.HEAD_POSE_WEIGHT = 0.1
# For tracking across frames
self.prev_drowsy_count = 0
self.drowsy_history = []
self.current_speed = 0 # Current speed in km/h
def update_speed(self, speed_km_h):
"""Update the current speed"""
self.current_speed = speed_km_h
return self.speed_detector.update_speed(speed_km_h)
def reset_speed_detector(self):
"""Reset the speed detector"""
self.speed_detector.reset()
def load_model(self):
"""Load the CNN model from local files"""
try:
# Use local model files
config_path = "huggingface_model/config.json"
model_path = "drowsiness_model.h5"
# Load config
with open(config_path, 'r') as f:
config = json.load(f)
# Load the Keras model directly
self.model = keras.models.load_model(model_path)
# Print model summary for debugging
print("Model loaded successfully")
print(f"Model input shape: {self.model.input_shape}")
self.model.summary()
except Exception as e:
print(f"Error loading CNN model: {str(e)}")
raise
def eye_aspect_ratio(self, eye):
"""Calculate the eye aspect ratio"""
# Compute the euclidean distances between the two sets of vertical eye landmarks
A = dist.euclidean(eye[1], eye[5])
B = dist.euclidean(eye[2], eye[4])
# Compute the euclidean distance between the horizontal eye landmarks
C = dist.euclidean(eye[0], eye[3])
# Calculate the eye aspect ratio
ear = (A + B) / (2.0 * C)
return ear
def calculate_gaze(self, eye_points, facial_landmarks):
"""Calculate gaze direction"""
left_eye_region = np.array([(facial_landmarks.part(i).x, facial_landmarks.part(i).y) for i in range(36, 42)])
right_eye_region = np.array([(facial_landmarks.part(i).x, facial_landmarks.part(i).y) for i in range(42, 48)])
# Compute eye centers
left_eye_center = left_eye_region.mean(axis=0).astype("int")
right_eye_center = right_eye_region.mean(axis=0).astype("int")
# Compute the angle between eye centers
dY = right_eye_center[1] - left_eye_center[1]
dX = right_eye_center[0] - left_eye_center[0]
angle = np.degrees(np.arctan2(dY, dX))
# Normalize the angle
return abs(angle) / 180.0
def get_head_pose(self, shape):
"""Calculate the head pose"""
# Get specific facial landmarks for head pose estimation
image_points = np.array([
(shape.part(30).x, shape.part(30).y), # Nose tip
(shape.part(8).x, shape.part(8).y), # Chin
(shape.part(36).x, shape.part(36).y), # Left eye left corner
(shape.part(45).x, shape.part(45).y), # Right eye right corner
(shape.part(48).x, shape.part(48).y), # Left mouth corner
(shape.part(54).x, shape.part(54).y) # Right mouth corner
], dtype="double")
# A simple head pose estimation using the angle of the face
# Calculate center of the face
center_x = np.mean([p[0] for p in image_points])
center_y = np.mean([p[1] for p in image_points])
# Calculate angle with respect to vertical
angle = 0
if len(image_points) > 2:
point1 = image_points[0] # Nose
point2 = image_points[1] # Chin
angle = abs(math.atan2(point2[1] - point1[1], point2[0] - point1[0]))
# Normalize to 0-1 range where 0 is upright and 1 is drooping
normalized_pose = min(1.0, abs(angle) / (math.pi/2))
return normalized_pose
def detect_face(self, frame):
"""Detect face in the frame"""
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
faces = self.face_cascade.detectMultiScale(gray, 1.1, 4)
if len(faces) > 0:
(x, y, w, h) = faces[0] # Get the first face
face = frame[y:y+h, x:x+w]
return face, (x, y, w, h)
return None, None
def preprocess_image(self, image):
"""Preprocess the input image for CNN"""
if image is None:
return None
# Convert to RGB
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
# Resize to model input size (224x224)
image = cv2.resize(image, (self.input_shape[0], self.input_shape[1]))
# Normalize
image = image.astype(np.float32) / 255.0
# Add batch dimension
image = np.expand_dims(image, axis=0)
return image
def predict(self, image):
"""Make prediction on the input image using multiple features"""
if self.model is None:
raise ValueError("Model not loaded. Call load_model() first.")
# Initialize results
drowsy_prob = 0.0
face_coords = None
ear_value = 1.0 # Default to wide open eyes
gaze_value = 0.0
head_pose_value = 0.0
landmark_detection_success = False
# Detect face
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
face, face_coords = self.detect_face(image)
if face is None:
return 0.0, None, "No face detected", {}
# Get model prediction
inputs = self.preprocess_image(face)
if inputs is None:
return 0.0, face_coords, "Error processing image", {}
outputs = self.model.predict(inputs)
# Get the drowsiness probability from the model
if outputs.shape[1] == 1:
model_prob = outputs[0][0]
# Convert to probability if needed
if model_prob < 0 or model_prob > 1:
model_prob = 1 / (1 + np.exp(-model_prob))
else:
# For multi-class model
probs = tf.nn.softmax(outputs, axis=1).numpy()
model_prob = probs[0, 1] # Probability of class 1 (drowsy)
# Get speed change score from detector
speed_change_score = self.speed_detector.get_speed_change_score()
# Get additional features if landmark detection is enabled
metrics = {
"model_prob": model_prob,
"ear": 1.0,
"gaze": 0.0,
"head_pose": 0.0,
"speed_change": speed_change_score
}
if self.landmark_detection_enabled:
try:
import dlib
from scipy.spatial import distance as dist
# Detect faces with dlib for landmark detection
rects = self.detector(gray, 0)
if len(rects) > 0:
# Get facial landmarks
shape = self.predictor(gray, rects[0])
# Get eye aspect ratio
left_eye = [(shape.part(i).x, shape.part(i).y) for i in range(36, 42)]
right_eye = [(shape.part(i).x, shape.part(i).y) for i in range(42, 48)]
left_ear = self.eye_aspect_ratio(left_eye)
right_ear = self.eye_aspect_ratio(right_eye)
ear_value = (left_ear + right_ear) / 2.0
# Get gaze direction
gaze_value = self.calculate_gaze(None, shape)
# Get head pose
head_pose_value = self.get_head_pose(shape)
# Update metrics
metrics["ear"] = ear_value
metrics["gaze"] = gaze_value
metrics["head_pose"] = head_pose_value
landmark_detection_success = True
except Exception as e:
print(f"Error in landmark detection: {e}")
else:
# Use a simplified heuristic approach when dlib is not available
# Calculate an estimated eye ratio from the grayscale intensity in eye regions
# This is a simplified approach that is not as accurate as the EAR method
if face_coords is not None:
try:
# Try to estimate eye regions based on face proportions
face_gray = cv2.cvtColor(face, cv2.COLOR_BGR2GRAY)
face_height, face_width = face_gray.shape[:2]
# Estimate eye regions (these are approximate and may not be accurate for all faces)
left_eye_region = face_gray[int(face_height*0.2):int(face_height*0.4), int(face_width*0.2):int(face_width*0.4)]
right_eye_region = face_gray[int(face_height*0.2):int(face_height*0.4), int(face_width*0.6):int(face_width*0.8)]
# Simplified metric: use average intensity - lower values might indicate closed eyes
if left_eye_region.size > 0 and right_eye_region.size > 0:
left_eye_avg = np.mean(left_eye_region) / 255.0
right_eye_avg = np.mean(right_eye_region) / 255.0
# Invert so that darker regions (potentially closed eyes) have higher values
left_eye_closed = 1.0 - left_eye_avg
right_eye_closed = 1.0 - right_eye_avg
# Combine into a simple eye closure metric (0-1 range, higher means more closed)
eye_closure = (left_eye_closed + right_eye_closed) / 2.0
# Convert to a rough approximation of EAR
# Lower values indicate more closed eyes (like EAR)
estimated_ear = max(0.15, 0.4 - (eye_closure * 0.25))
ear_value = estimated_ear
metrics["ear"] = ear_value
except Exception as e:
print(f"Error in simplified eye detection: {e}")
# Combine features for final drowsiness probability
if landmark_detection_success:
# Calculate eye state factor (1.0 when eyes closed, 0.0 when fully open)
eye_state = max(0, min(1, (self.EAR_THRESHOLD - ear_value) * 5))
# Weight the factors
weighted_avg = (
self.MODEL_WEIGHT * model_prob +
self.EAR_WEIGHT * eye_state +
self.GAZE_WEIGHT * gaze_value +
self.HEAD_POSE_WEIGHT * head_pose_value +
self.SPEED_CHANGE_WEIGHT * speed_change_score # Add speed change factor
)
# Update drowsy probability
drowsy_prob = weighted_avg
else:
# If landmark detection failed, use simplified approach
# Use model probability with higher weight
if "ear" in metrics and metrics["ear"] < 1.0:
# We have the simplified eye metric
eye_state = max(0, min(1, (self.EAR_THRESHOLD - metrics["ear"]) * 5))
drowsy_prob = (self.MODEL_WEIGHT * model_prob) + ((1 - self.MODEL_WEIGHT - self.SPEED_CHANGE_WEIGHT) * eye_state) + (self.SPEED_CHANGE_WEIGHT * speed_change_score)
else:
# Only model and speed are available
drowsy_prob = (model_prob * 0.85) + (speed_change_score * 0.15)
# Apply smoothing with history
self.drowsy_history.append(drowsy_prob)
if len(self.drowsy_history) > 10:
self.drowsy_history.pop(0)
# Use median filtering for robustness
drowsy_prob = np.median(self.drowsy_history)
return drowsy_prob, face_coords, None, metrics
# Create a global instance
detector = DrowsinessDetector()
def process_image(image):
"""Process image input"""
if image is None:
return None, "No image provided"
try:
# Check for valid image
if image.size == 0 or image.shape[0] == 0 or image.shape[1] == 0:
return None, "Invalid image dimensions"
# Make a copy of the image to avoid modifying the original
processed_image = image.copy()
# Make prediction
drowsy_prob, face_coords, error, metrics = detector.predict(processed_image)
if error:
return None, error
if face_coords is None:
# No face detected - add text to the image and return it
cv2.putText(processed_image, "No face detected", (30, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 0, 255), 2)
return processed_image, "No face detected"
# Draw bounding box
x, y, w, h = face_coords
# Use a higher threshold (0.7) to reduce false positives
is_drowsy = drowsy_prob >= 0.7
# Determine alert level and color
if drowsy_prob >= 0.85:
alert_level = "High Risk"
color = (0, 0, 255) # Red
elif drowsy_prob >= 0.7:
alert_level = "Medium Risk"
color = (0, 165, 255) # Orange
else:
alert_level = "Alert"
color = (0, 255, 0) # Green
cv2.rectangle(processed_image, (x, y), (x+w, y+h), color, 2)
# Add the metrics as text on image
y_offset = 25
cv2.putText(processed_image, f"{'Drowsy' if is_drowsy else 'Alert'} ({drowsy_prob:.2f})",
(x, y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, color, 2)
# Add alert level
cv2.putText(processed_image, alert_level, (x, y-35),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
# Add metrics in bottom left
cv2.putText(processed_image, f"Model: {metrics['model_prob']:.2f}", (10, processed_image.shape[0]-10-y_offset*3),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
cv2.putText(processed_image, f"Eye Ratio: {metrics['ear']:.2f}", (10, processed_image.shape[0]-10-y_offset*2),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
cv2.putText(processed_image, f"Head Pose: {metrics['head_pose']:.2f}", (10, processed_image.shape[0]-10-y_offset),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
# Add confidence disclaimer for high model probabilities but good eye metrics
if metrics['model_prob'] > 0.9 and metrics['ear'] > 0.25:
cv2.putText(processed_image, "Model conflict - verify manually",
(10, processed_image.shape[0]-10-y_offset*4),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 165, 255), 1)
return processed_image, f"Processed successfully. Drowsiness: {drowsy_prob:.2f}, Alert level: {alert_level}"
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"Error processing image: {str(e)}\n{error_details}")
return None, f"Error processing image: {str(e)}"
def process_video(video, initial_speed=60):
"""Process video input"""
if video is None:
return None, "No video provided"
try:
# 创建内存缓冲区而不是临时文件
temp_input = None
# Handle video input (can be file path or video data)
if isinstance(video, str):
print(f"Processing video from path: {video}")
# 直接读取原始文件,不复制到临时目录
cap = cv2.VideoCapture(video)
else:
print(f"Processing video from uploaded data")
# 读取上传的视频数据到内存
import tempfile
temp_input = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False)
temp_input_path = temp_input.name
with open(temp_input_path, "wb") as f:
f.write(video)
cap = cv2.VideoCapture(temp_input_path)
if not cap.isOpened():
return None, "Error: Could not open video"
# Get input video properties
fps = cap.get(cv2.CAP_PROP_FPS)
if fps <= 0:
fps = 30 # Default to 30fps if invalid
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print(f"Video properties: {width}x{height} at {fps}fps, total frames: {total_frames}")
# 创建内存缓冲区而不是临时输出文件
import io
import base64
# 使用临时文件来存储处理后的视频(处理完毕后会删除)
import tempfile
temp_output = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False)
temp_output_path = temp_output.name
# Try different codecs on Windows
if os.name == 'nt': # Windows
# 使用mp4v编码以确保兼容性
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
else:
# On other platforms, use MP4V
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
# Create video writer
out = cv2.VideoWriter(temp_output_path, fourcc, fps, (width, height))
if not out.isOpened():
return None, "Error: Could not create output video file"
# Reset speed detector at the start of each video
detector.reset_speed_detector()
# Initialize speed value with the provided initial speed
current_speed = initial_speed
detector.speed_detector.speed_estimate = initial_speed
# Process each frame
frame_count = 0
processed_count = 0
face_detected_count = 0
drowsy_count = 0
high_risk_count = 0
ear_sum = 0
model_prob_sum = 0
while True:
ret, frame = cap.read()
if not ret:
print(f"End of video or error reading frame at frame {frame_count}")
break
frame_count += 1
# Detect speed from the current frame
current_speed = detector.speed_detector.detect_speed_from_frame(frame)
try:
# Try to process the frame
processed_frame, message = process_image(frame)
# Add speed info to the frame
if processed_frame is not None:
speed_text = f"Speed: {current_speed:.1f} km/h"
cv2.putText(processed_frame, speed_text, (10, processed_frame.shape[0]-45),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
# Add speed change score
speed_change_score = detector.speed_detector.get_speed_change_score()
cv2.putText(processed_frame, f"Speed Variation: {speed_change_score:.2f}",
(10, processed_frame.shape[0]-70),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
if processed_frame is not None:
out.write(processed_frame)
processed_count += 1
if "No face detected" not in message:
face_detected_count += 1
if "Drowsiness" in message:
# Extract drowsiness probability
try:
drowsy_text = message.split("Drowsiness: ")[1].split(",")[0]
drowsy_prob = float(drowsy_text)
# Track drowsiness stats
if drowsy_prob >= 0.7:
drowsy_count += 1
if drowsy_prob >= 0.85:
high_risk_count += 1
# Get metrics from the frame
_, _, _, metrics = detector.predict(frame)
if 'ear' in metrics:
ear_sum += metrics['ear']
if 'model_prob' in metrics:
model_prob_sum += metrics['model_prob']
except:
pass
else:
# Fallback: If processing fails, just use the original frame
# Add text indicating processing failed
cv2.putText(frame, "Processing failed", (30, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
out.write(frame)
processed_count += 1
print(f"Frame {frame_count}: Processing failed - {message}")
except Exception as e:
# If any error occurs during processing, use original frame
cv2.putText(frame, f"Error: {str(e)[:30]}", (30, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
out.write(frame)
processed_count += 1
print(f"Frame {frame_count}: Exception - {str(e)}")
# Print progress for every 10th frame
if frame_count % 10 == 0:
print(f"Processed {frame_count}/{total_frames} frames")
# Release resources
cap.release()
out.release()
# Calculate statistics
drowsy_percentage = (drowsy_count / face_detected_count * 100) if face_detected_count > 0 else 0
high_risk_percentage = (high_risk_count / face_detected_count * 100) if face_detected_count > 0 else 0
avg_ear = ear_sum / face_detected_count if face_detected_count > 0 else 0
avg_model_prob = model_prob_sum / face_detected_count if face_detected_count > 0 else 0
speed_score = detector.speed_detector.get_speed_change_score()
# Check if video was created successfully and return it directly
if os.path.exists(temp_output_path) and os.path.getsize(temp_output_path) > 0:
print(f"Video processed successfully with {processed_count} frames")
print(f"Drowsy frames: {drowsy_count} ({drowsy_percentage:.1f}%), High risk frames: {high_risk_count} ({high_risk_percentage:.1f}%)")
print(f"Average eye ratio: {avg_ear:.2f}, Average model probability: {avg_model_prob:.2f}")
print(f"Speed change score: {speed_score:.2f}")
# If model prob is high but eye ratio is also high (open eyes), flag potential false positive
false_positive_warning = ""
if avg_model_prob > 0.8 and avg_ear > 0.25:
false_positive_warning = " ⚠️ Possible false positive (eyes open but model detects drowsiness)"
result_message = (f"Video processed successfully. Frames: {frame_count}, faces detected: {face_detected_count}, "
f"drowsy: {drowsy_count} ({drowsy_percentage:.1f}%), high risk: {high_risk_count} ({high_risk_percentage:.1f}%)."
f" Avg eye ratio: {avg_ear:.2f}, Speed score: {speed_score:.2f}{false_positive_warning}")
# 直接返回文件而不保留它
video_result = temp_output_path
return video_result, result_message
else:
print(f"Failed to create output video. Frames read: {frame_count}, processed: {processed_count}")
return None, f"Error: Failed to create output video. Frames read: {frame_count}, processed: {processed_count}"
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"Error processing video: {str(e)}\n{error_details}")
return None, f"Error processing video: {str(e)}"
finally:
# Clean up resources
if 'out' in locals() and out is not None:
out.release()
if 'cap' in locals() and cap is not None:
cap.release()
# 删除临时输入文件(如果存在)
if temp_input is not None:
try:
os.unlink(temp_input.name)
except:
pass
def process_webcam(image):
"""Process webcam input - returns processed image and status message"""
return process_image(image)
# Launch the app
if __name__ == "__main__":
# Parse command line arguments
parser = argparse.ArgumentParser(description="Driver Drowsiness Detection App")
parser.add_argument("--share", action="store_true", help="Create a public link (may trigger security warnings)")
parser.add_argument("--port", type=int, default=7860, help="Port to run the app on")
args = parser.parse_args()
# Print warning if share is enabled
if args.share:
print("WARNING: Running with --share may trigger security warnings on some systems.")
print("The app will be accessible from the internet through a temporary URL.")
# 注册退出时的清理函数
import atexit
import glob
import shutil
def cleanup_temp_files():
"""清理所有临时文件"""
try:
# 删除所有可能留下的临时文件
import tempfile
temp_dir = tempfile.gettempdir()
pattern = os.path.join(temp_dir, "tmp*")
for file in glob.glob(pattern):
try:
if os.path.isfile(file):
os.remove(file)
except Exception as e:
print(f"Failed to delete {file}: {e}")
# 确保没有留下.mp4或.avi文件
for ext in [".mp4", ".avi"]:
pattern = os.path.join(temp_dir, f"*{ext}")
for file in glob.glob(pattern):
try:
os.remove(file)
except Exception as e:
print(f"Failed to delete {file}: {e}")
print("Cleaned up temporary files")
except Exception as e:
print(f"Error during cleanup: {e}")
# 注册清理函数
atexit.register(cleanup_temp_files)
# Load the model at startup
detector.load_model()
# Create interface
with gr.Blocks(title="Driver Drowsiness Detection") as demo:
gr.Markdown("""
# 🚗 Driver Drowsiness Detection System
This system detects driver drowsiness using computer vision and deep learning.
## Features:
- Image analysis
- Video processing with speed monitoring
- Webcam detection (PC and mobile)
- Multi-factor drowsiness prediction (face, eyes, head pose, speed changes)
""")
with gr.Tabs():
with gr.Tab("Image"):
gr.Markdown("Upload an image for drowsiness detection")
with gr.Row():
image_input = gr.Image(label="Input Image", type="numpy")
image_output = gr.Image(label="Processed Image")
with gr.Row():
status_output = gr.Textbox(label="Status")
image_input.change(
fn=process_image,
inputs=[image_input],
outputs=[image_output, status_output]
)
with gr.Tab("Video"):
gr.Markdown("""
### 上傳駕駛視頻進行困倦檢測
系統將自動從視頻中檢測以下內容:
- 駕駛員面部表情和眼睛狀態
- 車輛速度變化 (通過視頻中的光流分析)
- 當車速變化超過 ±5 km/h 時將被視為異常駕駛行為
**注意:** 處理後的視頻不會保存到本地文件夾,請使用界面右上角的下載按鈕保存結果。
""")
with gr.Row():
video_input = gr.Video(label="輸入視頻")
video_output = gr.Video(label="處理後視頻 (點擊右上角下載)")
with gr.Row():
initial_speed = gr.Slider(minimum=10, maximum=120, value=60, label="初始車速估計值 (km/h)",
info="僅作為初始估計值,系統會自動從視頻中檢測實際速度變化")
with gr.Row():
video_status = gr.Textbox(label="處理狀態")
with gr.Row():
process_btn = gr.Button("處理視頻")
clear_btn = gr.Button("清除")
process_btn.click(
fn=process_video,
inputs=[video_input, initial_speed],
outputs=[video_output, video_status]
)
clear_btn.click(
fn=lambda: (None, "已清除結果"),
inputs=[],
outputs=[video_output, video_status]
)
with gr.Tab("Webcam"):
gr.Markdown("Use your webcam or mobile camera for real-time drowsiness detection")
with gr.Row():
webcam_input = gr.Image(label="Camera Feed", type="numpy", streaming=True)
webcam_output = gr.Image(label="Processed Feed")
with gr.Row():
speed_input = gr.Slider(minimum=0, maximum=150, value=60, label="Current Speed (km/h)")
update_speed_btn = gr.Button("Update Speed")
with gr.Row():
webcam_status = gr.Textbox(label="Status")
def process_webcam_with_speed(image, speed):
detector.update_speed(speed)
return process_image(image)
update_speed_btn.click(
fn=lambda speed: f"Speed updated to {speed} km/h",
inputs=[speed_input],
outputs=[webcam_status]
)
webcam_input.change(
fn=process_webcam_with_speed,
inputs=[webcam_input, speed_input],
outputs=[webcam_output, webcam_status]
)
gr.Markdown("""
## How It Works
This system detects drowsiness using multiple factors:
1. **Facial features** - Using a trained CNN model
2. **Eye openness** - Measuring eye aspect ratio (EAR)
3. **Head position** - Detecting head drooping
4. **Automatic speed detection** - Using optical flow analysis to track vehicle movement and detect irregular speed changes
The system automatically detects speed changes from the video frames using computer vision techniques:
- **Optical flow** is used to track movement between frames
- **Irregular speed changes** (±5 km/h) are detected as potential signs of drowsy driving
- **No external speed data required** - everything is analyzed directly from the video content
Combining these factors provides more reliable drowsiness detection than using facial features alone.
""")
# Launch the app
demo.launch(share=args.share, server_port=args.port) |