dnn_space / app.py
ckcl's picture
Update app.py
ab21ecf verified
import gradio as gr
import numpy as np
import cv2
from PIL import Image
import io
import os
import json
import time
import argparse
import tensorflow as tf
from tensorflow import keras
import math
from collections import deque
class SpeedDetector:
def __init__(self, history_size=30):
self.speed_history = deque(maxlen=history_size)
self.last_update_time = None
self.current_speed = 0
self.speed_change_threshold = 5 # km/h
self.abnormal_speed_changes = 0
self.speed_deviation_sum = 0
self.speed_change_score = 0
# For optical flow speed estimation
self.prev_gray = None
self.prev_points = None
self.frame_idx = 0
self.speed_estimate = 60 # Initial estimate
def update_speed(self, speed_km_h):
"""Update with current speed in km/h"""
current_time = time.time()
# Add to history
self.speed_history.append(speed_km_h)
self.current_speed = speed_km_h
# Not enough data yet
if len(self.speed_history) < 5:
return 0
# Calculate speed variation metrics
speed_arr = np.array(self.speed_history)
# 1. Standard deviation of speed
speed_std = np.std(speed_arr)
# 2. Detect abrupt changes
for i in range(1, len(speed_arr)):
change = abs(speed_arr[i] - speed_arr[i-1])
if change >= self.speed_change_threshold:
self.abnormal_speed_changes += 1
# 3. Calculate average rate of change
changes = np.abs(np.diff(speed_arr))
avg_change = np.mean(changes) if len(changes) > 0 else 0
# Combine into a score (0-1 range)
self.speed_deviation_sum = min(5, speed_std) / 5 # Normalize to 0-1
abnormal_change_factor = min(1, self.abnormal_speed_changes / 5)
avg_change_factor = min(1, avg_change / self.speed_change_threshold)
# Weighted combination
self.speed_change_score = (
0.4 * self.speed_deviation_sum +
0.4 * abnormal_change_factor +
0.2 * avg_change_factor
)
return self.speed_change_score
def detect_speed_from_frame(self, frame):
"""Detect speed from video frame using optical flow"""
if frame is None:
return self.current_speed
# Convert frame to grayscale
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# For the first frame, initialize points to track
if self.prev_gray is None or self.frame_idx % 30 == 0: # Reset tracking points every 30 frames
# Detect good features to track
mask = np.zeros_like(gray)
# Focus on the lower portion of the frame (road)
h, w = gray.shape
mask[h//2:, :] = 255
corners = cv2.goodFeaturesToTrack(gray, maxCorners=100, qualityLevel=0.01, minDistance=10, mask=mask)
if corners is not None and len(corners) > 0:
self.prev_points = corners
self.prev_gray = gray.copy()
else:
# No good points to track
self.frame_idx += 1
return self.current_speed
# Calculate optical flow if we have previous points
if self.prev_gray is not None and self.prev_points is not None:
# Calculate optical flow
new_points, status, _ = cv2.calcOpticalFlowPyrLK(self.prev_gray, gray, self.prev_points, None)
# Filter only valid points
if new_points is not None and status is not None:
good_new = new_points[status == 1]
good_old = self.prev_points[status == 1]
# Calculate flow magnitude
if len(good_new) > 0 and len(good_old) > 0:
flow_magnitudes = np.sqrt(
np.sum((good_new - good_old)**2, axis=1)
)
avg_flow = np.mean(flow_magnitudes) if len(flow_magnitudes) > 0 else 0
# Map optical flow to speed change
# Higher flow = faster movement
# This is a simplified mapping and would need calibration for real-world use
flow_threshold = 1.0 # Adjust based on testing
if avg_flow > flow_threshold:
# Movement detected, estimate acceleration
speed_change = min(5, max(-5, (avg_flow - flow_threshold) * 2))
# Add some temporal smoothing to avoid sudden changes
speed_change = speed_change * 0.3 # Reduce magnitude for smoother change
else:
# Minimal movement, slight deceleration (coasting)
speed_change = -0.1
# Update speed with detected change
self.speed_estimate += speed_change
# Keep speed in reasonable range
self.speed_estimate = max(40, min(120, self.speed_estimate))
# Update tracking points
self.prev_points = good_new.reshape(-1, 1, 2)
# Update previous gray frame
self.prev_gray = gray.copy()
self.frame_idx += 1
# Check for dashboard speedometer (would require more sophisticated OCR in a real system)
# For now, just use our estimated speed
detected_speed = self.speed_estimate
# Update current speed and trigger speed change detection
self.update_speed(detected_speed)
return detected_speed
def get_speed_change_score(self):
"""Return a score from 0-1 indicating abnormal speed changes"""
return self.speed_change_score
def reset(self):
"""Reset the detector state"""
self.speed_history.clear()
self.abnormal_speed_changes = 0
self.speed_deviation_sum = 0
self.speed_change_score = 0
self.prev_gray = None
self.prev_points = None
self.frame_idx = 0
self.speed_estimate = 60 # Reset to initial estimate
class DrowsinessDetector:
def __init__(self):
self.model = None
self.input_shape = (224, 224, 3) # Updated to match model's expected input shape
self.face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
self.id2label = {0: "notdrowsy", 1: "drowsy"}
self.label2id = {"notdrowsy": 0, "drowsy": 1}
# Speed detector
self.speed_detector = SpeedDetector()
self.SPEED_CHANGE_WEIGHT = 0.15 # Weight for speed changes in drowsiness calculation
# 嘗試動態 import dlib,並設置 fallback
self.landmark_detection_enabled = False
try:
import dlib
self.detector = dlib.get_frontal_face_detector()
predictor_path = "shape_predictor_68_face_landmarks.dat"
if not os.path.exists(predictor_path):
print(f"Warning: {predictor_path} not found. Downloading...")
import urllib.request
urllib.request.urlretrieve(
"https://github.com/italojs/facial-landmarks-recognition/raw/master/shape_predictor_68_face_landmarks.dat",
predictor_path
)
self.predictor = dlib.shape_predictor(predictor_path)
self.landmark_detection_enabled = True
print("Facial landmark detection enabled")
except Exception as e:
print(f"Warning: Facial landmark detection disabled: {e}")
print("The system will use a simpler detection method. For better accuracy, install CMake and dlib.")
# Constants for drowsiness detection
self.EAR_THRESHOLD = 0.25 # Eye aspect ratio threshold
self.CONSECUTIVE_FRAMES = 20
self.ear_counter = 0
self.GAZE_THRESHOLD = 0.2 # Gaze direction threshold
self.HEAD_POSE_THRESHOLD = 0.3 # Head pose threshold
# Parameters for weighted ensemble
self.MODEL_WEIGHT = 0.45 # Reduced to accommodate speed factor
self.EAR_WEIGHT = 0.2
self.GAZE_WEIGHT = 0.1
self.HEAD_POSE_WEIGHT = 0.1
# For tracking across frames
self.prev_drowsy_count = 0
self.drowsy_history = []
self.current_speed = 0 # Current speed in km/h
def update_speed(self, speed_km_h):
"""Update the current speed"""
self.current_speed = speed_km_h
return self.speed_detector.update_speed(speed_km_h)
def reset_speed_detector(self):
"""Reset the speed detector"""
self.speed_detector.reset()
def load_model(self):
"""Load the CNN model from local files"""
try:
# Use local model files
config_path = "huggingface_model/config.json"
model_path = "drowsiness_model.h5"
# Load config
with open(config_path, 'r') as f:
config = json.load(f)
# Load the Keras model directly
self.model = keras.models.load_model(model_path)
# Print model summary for debugging
print("Model loaded successfully")
print(f"Model input shape: {self.model.input_shape}")
self.model.summary()
except Exception as e:
print(f"Error loading CNN model: {str(e)}")
raise
def eye_aspect_ratio(self, eye):
"""Calculate the eye aspect ratio"""
# Compute the euclidean distances between the two sets of vertical eye landmarks
A = dist.euclidean(eye[1], eye[5])
B = dist.euclidean(eye[2], eye[4])
# Compute the euclidean distance between the horizontal eye landmarks
C = dist.euclidean(eye[0], eye[3])
# Calculate the eye aspect ratio
ear = (A + B) / (2.0 * C)
return ear
def calculate_gaze(self, eye_points, facial_landmarks):
"""Calculate gaze direction"""
left_eye_region = np.array([(facial_landmarks.part(i).x, facial_landmarks.part(i).y) for i in range(36, 42)])
right_eye_region = np.array([(facial_landmarks.part(i).x, facial_landmarks.part(i).y) for i in range(42, 48)])
# Compute eye centers
left_eye_center = left_eye_region.mean(axis=0).astype("int")
right_eye_center = right_eye_region.mean(axis=0).astype("int")
# Compute the angle between eye centers
dY = right_eye_center[1] - left_eye_center[1]
dX = right_eye_center[0] - left_eye_center[0]
angle = np.degrees(np.arctan2(dY, dX))
# Normalize the angle
return abs(angle) / 180.0
def get_head_pose(self, shape):
"""Calculate the head pose"""
# Get specific facial landmarks for head pose estimation
image_points = np.array([
(shape.part(30).x, shape.part(30).y), # Nose tip
(shape.part(8).x, shape.part(8).y), # Chin
(shape.part(36).x, shape.part(36).y), # Left eye left corner
(shape.part(45).x, shape.part(45).y), # Right eye right corner
(shape.part(48).x, shape.part(48).y), # Left mouth corner
(shape.part(54).x, shape.part(54).y) # Right mouth corner
], dtype="double")
# A simple head pose estimation using the angle of the face
# Calculate center of the face
center_x = np.mean([p[0] for p in image_points])
center_y = np.mean([p[1] for p in image_points])
# Calculate angle with respect to vertical
angle = 0
if len(image_points) > 2:
point1 = image_points[0] # Nose
point2 = image_points[1] # Chin
angle = abs(math.atan2(point2[1] - point1[1], point2[0] - point1[0]))
# Normalize to 0-1 range where 0 is upright and 1 is drooping
normalized_pose = min(1.0, abs(angle) / (math.pi/2))
return normalized_pose
def detect_face(self, frame):
"""Detect face in the frame"""
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
faces = self.face_cascade.detectMultiScale(gray, 1.1, 4)
if len(faces) > 0:
(x, y, w, h) = faces[0] # Get the first face
face = frame[y:y+h, x:x+w]
return face, (x, y, w, h)
return None, None
def preprocess_image(self, image):
"""Preprocess the input image for CNN"""
if image is None:
return None
# Convert to RGB
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
# Resize to model input size (224x224)
image = cv2.resize(image, (self.input_shape[0], self.input_shape[1]))
# Normalize
image = image.astype(np.float32) / 255.0
# Add batch dimension
image = np.expand_dims(image, axis=0)
return image
def predict(self, image):
"""Make prediction on the input image using multiple features"""
if self.model is None:
raise ValueError("Model not loaded. Call load_model() first.")
# Initialize results
drowsy_prob = 0.0
face_coords = None
ear_value = 1.0 # Default to wide open eyes
gaze_value = 0.0
head_pose_value = 0.0
landmark_detection_success = False
# Detect face
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
face, face_coords = self.detect_face(image)
if face is None:
return 0.0, None, "No face detected", {}
# Get model prediction
inputs = self.preprocess_image(face)
if inputs is None:
return 0.0, face_coords, "Error processing image", {}
outputs = self.model.predict(inputs)
# Get the drowsiness probability from the model
if outputs.shape[1] == 1:
model_prob = outputs[0][0]
# Convert to probability if needed
if model_prob < 0 or model_prob > 1:
model_prob = 1 / (1 + np.exp(-model_prob))
else:
# For multi-class model
probs = tf.nn.softmax(outputs, axis=1).numpy()
model_prob = probs[0, 1] # Probability of class 1 (drowsy)
# Get speed change score from detector
speed_change_score = self.speed_detector.get_speed_change_score()
# Get additional features if landmark detection is enabled
metrics = {
"model_prob": model_prob,
"ear": 1.0,
"gaze": 0.0,
"head_pose": 0.0,
"speed_change": speed_change_score
}
if self.landmark_detection_enabled:
try:
import dlib
from scipy.spatial import distance as dist
# Detect faces with dlib for landmark detection
rects = self.detector(gray, 0)
if len(rects) > 0:
# Get facial landmarks
shape = self.predictor(gray, rects[0])
# Get eye aspect ratio
left_eye = [(shape.part(i).x, shape.part(i).y) for i in range(36, 42)]
right_eye = [(shape.part(i).x, shape.part(i).y) for i in range(42, 48)]
left_ear = self.eye_aspect_ratio(left_eye)
right_ear = self.eye_aspect_ratio(right_eye)
ear_value = (left_ear + right_ear) / 2.0
# Get gaze direction
gaze_value = self.calculate_gaze(None, shape)
# Get head pose
head_pose_value = self.get_head_pose(shape)
# Update metrics
metrics["ear"] = ear_value
metrics["gaze"] = gaze_value
metrics["head_pose"] = head_pose_value
landmark_detection_success = True
except Exception as e:
print(f"Error in landmark detection: {e}")
else:
# Use a simplified heuristic approach when dlib is not available
# Calculate an estimated eye ratio from the grayscale intensity in eye regions
# This is a simplified approach that is not as accurate as the EAR method
if face_coords is not None:
try:
# Try to estimate eye regions based on face proportions
face_gray = cv2.cvtColor(face, cv2.COLOR_BGR2GRAY)
face_height, face_width = face_gray.shape[:2]
# Estimate eye regions (these are approximate and may not be accurate for all faces)
left_eye_region = face_gray[int(face_height*0.2):int(face_height*0.4), int(face_width*0.2):int(face_width*0.4)]
right_eye_region = face_gray[int(face_height*0.2):int(face_height*0.4), int(face_width*0.6):int(face_width*0.8)]
# Simplified metric: use average intensity - lower values might indicate closed eyes
if left_eye_region.size > 0 and right_eye_region.size > 0:
left_eye_avg = np.mean(left_eye_region) / 255.0
right_eye_avg = np.mean(right_eye_region) / 255.0
# Invert so that darker regions (potentially closed eyes) have higher values
left_eye_closed = 1.0 - left_eye_avg
right_eye_closed = 1.0 - right_eye_avg
# Combine into a simple eye closure metric (0-1 range, higher means more closed)
eye_closure = (left_eye_closed + right_eye_closed) / 2.0
# Convert to a rough approximation of EAR
# Lower values indicate more closed eyes (like EAR)
estimated_ear = max(0.15, 0.4 - (eye_closure * 0.25))
ear_value = estimated_ear
metrics["ear"] = ear_value
except Exception as e:
print(f"Error in simplified eye detection: {e}")
# Combine features for final drowsiness probability
if landmark_detection_success:
# Calculate eye state factor (1.0 when eyes closed, 0.0 when fully open)
eye_state = max(0, min(1, (self.EAR_THRESHOLD - ear_value) * 5))
# Weight the factors
weighted_avg = (
self.MODEL_WEIGHT * model_prob +
self.EAR_WEIGHT * eye_state +
self.GAZE_WEIGHT * gaze_value +
self.HEAD_POSE_WEIGHT * head_pose_value +
self.SPEED_CHANGE_WEIGHT * speed_change_score # Add speed change factor
)
# Update drowsy probability
drowsy_prob = weighted_avg
else:
# If landmark detection failed, use simplified approach
# Use model probability with higher weight
if "ear" in metrics and metrics["ear"] < 1.0:
# We have the simplified eye metric
eye_state = max(0, min(1, (self.EAR_THRESHOLD - metrics["ear"]) * 5))
drowsy_prob = (self.MODEL_WEIGHT * model_prob) + ((1 - self.MODEL_WEIGHT - self.SPEED_CHANGE_WEIGHT) * eye_state) + (self.SPEED_CHANGE_WEIGHT * speed_change_score)
else:
# Only model and speed are available
drowsy_prob = (model_prob * 0.85) + (speed_change_score * 0.15)
# Apply smoothing with history
self.drowsy_history.append(drowsy_prob)
if len(self.drowsy_history) > 10:
self.drowsy_history.pop(0)
# Use median filtering for robustness
drowsy_prob = np.median(self.drowsy_history)
return drowsy_prob, face_coords, None, metrics
# Create a global instance
detector = DrowsinessDetector()
def process_image(image):
"""Process image input"""
if image is None:
return None, "No image provided"
try:
# Check for valid image
if image.size == 0 or image.shape[0] == 0 or image.shape[1] == 0:
return None, "Invalid image dimensions"
# Make a copy of the image to avoid modifying the original
processed_image = image.copy()
# Make prediction
drowsy_prob, face_coords, error, metrics = detector.predict(processed_image)
if error:
return None, error
if face_coords is None:
# No face detected - add text to the image and return it
cv2.putText(processed_image, "No face detected", (30, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 0, 255), 2)
return processed_image, "No face detected"
# Draw bounding box
x, y, w, h = face_coords
# Use a higher threshold (0.7) to reduce false positives
is_drowsy = drowsy_prob >= 0.7
# Determine alert level and color
if drowsy_prob >= 0.85:
alert_level = "High Risk"
color = (0, 0, 255) # Red
elif drowsy_prob >= 0.7:
alert_level = "Medium Risk"
color = (0, 165, 255) # Orange
else:
alert_level = "Alert"
color = (0, 255, 0) # Green
cv2.rectangle(processed_image, (x, y), (x+w, y+h), color, 2)
# Add the metrics as text on image
y_offset = 25
cv2.putText(processed_image, f"{'Drowsy' if is_drowsy else 'Alert'} ({drowsy_prob:.2f})",
(x, y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, color, 2)
# Add alert level
cv2.putText(processed_image, alert_level, (x, y-35),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
# Add metrics in bottom left
cv2.putText(processed_image, f"Model: {metrics['model_prob']:.2f}", (10, processed_image.shape[0]-10-y_offset*3),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
cv2.putText(processed_image, f"Eye Ratio: {metrics['ear']:.2f}", (10, processed_image.shape[0]-10-y_offset*2),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
cv2.putText(processed_image, f"Head Pose: {metrics['head_pose']:.2f}", (10, processed_image.shape[0]-10-y_offset),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
# Add confidence disclaimer for high model probabilities but good eye metrics
if metrics['model_prob'] > 0.9 and metrics['ear'] > 0.25:
cv2.putText(processed_image, "Model conflict - verify manually",
(10, processed_image.shape[0]-10-y_offset*4),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 165, 255), 1)
return processed_image, f"Processed successfully. Drowsiness: {drowsy_prob:.2f}, Alert level: {alert_level}"
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"Error processing image: {str(e)}\n{error_details}")
return None, f"Error processing image: {str(e)}"
def process_video(video, initial_speed=60):
"""Process video input"""
if video is None:
return None, "No video provided"
try:
# 创建内存缓冲区而不是临时文件
temp_input = None
# Handle video input (can be file path or video data)
if isinstance(video, str):
print(f"Processing video from path: {video}")
# 直接读取原始文件,不复制到临时目录
cap = cv2.VideoCapture(video)
else:
print(f"Processing video from uploaded data")
# 读取上传的视频数据到内存
import tempfile
temp_input = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False)
temp_input_path = temp_input.name
with open(temp_input_path, "wb") as f:
f.write(video)
cap = cv2.VideoCapture(temp_input_path)
if not cap.isOpened():
return None, "Error: Could not open video"
# Get input video properties
fps = cap.get(cv2.CAP_PROP_FPS)
if fps <= 0:
fps = 30 # Default to 30fps if invalid
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print(f"Video properties: {width}x{height} at {fps}fps, total frames: {total_frames}")
# 创建内存缓冲区而不是临时输出文件
import io
import base64
# 使用临时文件来存储处理后的视频(处理完毕后会删除)
import tempfile
temp_output = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False)
temp_output_path = temp_output.name
# Try different codecs on Windows
if os.name == 'nt': # Windows
# 使用mp4v编码以确保兼容性
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
else:
# On other platforms, use MP4V
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
# Create video writer
out = cv2.VideoWriter(temp_output_path, fourcc, fps, (width, height))
if not out.isOpened():
return None, "Error: Could not create output video file"
# Reset speed detector at the start of each video
detector.reset_speed_detector()
# Initialize speed value with the provided initial speed
current_speed = initial_speed
detector.speed_detector.speed_estimate = initial_speed
# Process each frame
frame_count = 0
processed_count = 0
face_detected_count = 0
drowsy_count = 0
high_risk_count = 0
ear_sum = 0
model_prob_sum = 0
while True:
ret, frame = cap.read()
if not ret:
print(f"End of video or error reading frame at frame {frame_count}")
break
frame_count += 1
# Detect speed from the current frame
current_speed = detector.speed_detector.detect_speed_from_frame(frame)
try:
# Try to process the frame
processed_frame, message = process_image(frame)
# Add speed info to the frame
if processed_frame is not None:
speed_text = f"Speed: {current_speed:.1f} km/h"
cv2.putText(processed_frame, speed_text, (10, processed_frame.shape[0]-45),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
# Add speed change score
speed_change_score = detector.speed_detector.get_speed_change_score()
cv2.putText(processed_frame, f"Speed Variation: {speed_change_score:.2f}",
(10, processed_frame.shape[0]-70),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
if processed_frame is not None:
out.write(processed_frame)
processed_count += 1
if "No face detected" not in message:
face_detected_count += 1
if "Drowsiness" in message:
# Extract drowsiness probability
try:
drowsy_text = message.split("Drowsiness: ")[1].split(",")[0]
drowsy_prob = float(drowsy_text)
# Track drowsiness stats
if drowsy_prob >= 0.7:
drowsy_count += 1
if drowsy_prob >= 0.85:
high_risk_count += 1
# Get metrics from the frame
_, _, _, metrics = detector.predict(frame)
if 'ear' in metrics:
ear_sum += metrics['ear']
if 'model_prob' in metrics:
model_prob_sum += metrics['model_prob']
except:
pass
else:
# Fallback: If processing fails, just use the original frame
# Add text indicating processing failed
cv2.putText(frame, "Processing failed", (30, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
out.write(frame)
processed_count += 1
print(f"Frame {frame_count}: Processing failed - {message}")
except Exception as e:
# If any error occurs during processing, use original frame
cv2.putText(frame, f"Error: {str(e)[:30]}", (30, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
out.write(frame)
processed_count += 1
print(f"Frame {frame_count}: Exception - {str(e)}")
# Print progress for every 10th frame
if frame_count % 10 == 0:
print(f"Processed {frame_count}/{total_frames} frames")
# Release resources
cap.release()
out.release()
# Calculate statistics
drowsy_percentage = (drowsy_count / face_detected_count * 100) if face_detected_count > 0 else 0
high_risk_percentage = (high_risk_count / face_detected_count * 100) if face_detected_count > 0 else 0
avg_ear = ear_sum / face_detected_count if face_detected_count > 0 else 0
avg_model_prob = model_prob_sum / face_detected_count if face_detected_count > 0 else 0
speed_score = detector.speed_detector.get_speed_change_score()
# Check if video was created successfully and return it directly
if os.path.exists(temp_output_path) and os.path.getsize(temp_output_path) > 0:
print(f"Video processed successfully with {processed_count} frames")
print(f"Drowsy frames: {drowsy_count} ({drowsy_percentage:.1f}%), High risk frames: {high_risk_count} ({high_risk_percentage:.1f}%)")
print(f"Average eye ratio: {avg_ear:.2f}, Average model probability: {avg_model_prob:.2f}")
print(f"Speed change score: {speed_score:.2f}")
# If model prob is high but eye ratio is also high (open eyes), flag potential false positive
false_positive_warning = ""
if avg_model_prob > 0.8 and avg_ear > 0.25:
false_positive_warning = " ⚠️ Possible false positive (eyes open but model detects drowsiness)"
result_message = (f"Video processed successfully. Frames: {frame_count}, faces detected: {face_detected_count}, "
f"drowsy: {drowsy_count} ({drowsy_percentage:.1f}%), high risk: {high_risk_count} ({high_risk_percentage:.1f}%)."
f" Avg eye ratio: {avg_ear:.2f}, Speed score: {speed_score:.2f}{false_positive_warning}")
# 直接返回文件而不保留它
video_result = temp_output_path
return video_result, result_message
else:
print(f"Failed to create output video. Frames read: {frame_count}, processed: {processed_count}")
return None, f"Error: Failed to create output video. Frames read: {frame_count}, processed: {processed_count}"
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"Error processing video: {str(e)}\n{error_details}")
return None, f"Error processing video: {str(e)}"
finally:
# Clean up resources
if 'out' in locals() and out is not None:
out.release()
if 'cap' in locals() and cap is not None:
cap.release()
# 删除临时输入文件(如果存在)
if temp_input is not None:
try:
os.unlink(temp_input.name)
except:
pass
def process_webcam(image):
"""Process webcam input - returns processed image and status message"""
return process_image(image)
# Launch the app
if __name__ == "__main__":
# Parse command line arguments
parser = argparse.ArgumentParser(description="Driver Drowsiness Detection App")
parser.add_argument("--share", action="store_true", help="Create a public link (may trigger security warnings)")
parser.add_argument("--port", type=int, default=7860, help="Port to run the app on")
args = parser.parse_args()
# Print warning if share is enabled
if args.share:
print("WARNING: Running with --share may trigger security warnings on some systems.")
print("The app will be accessible from the internet through a temporary URL.")
# 注册退出时的清理函数
import atexit
import glob
import shutil
def cleanup_temp_files():
"""清理所有临时文件"""
try:
# 删除所有可能留下的临时文件
import tempfile
temp_dir = tempfile.gettempdir()
pattern = os.path.join(temp_dir, "tmp*")
for file in glob.glob(pattern):
try:
if os.path.isfile(file):
os.remove(file)
except Exception as e:
print(f"Failed to delete {file}: {e}")
# 确保没有留下.mp4或.avi文件
for ext in [".mp4", ".avi"]:
pattern = os.path.join(temp_dir, f"*{ext}")
for file in glob.glob(pattern):
try:
os.remove(file)
except Exception as e:
print(f"Failed to delete {file}: {e}")
print("Cleaned up temporary files")
except Exception as e:
print(f"Error during cleanup: {e}")
# 注册清理函数
atexit.register(cleanup_temp_files)
# Load the model at startup
detector.load_model()
# Create interface
with gr.Blocks(title="Driver Drowsiness Detection") as demo:
gr.Markdown("""
# 🚗 Driver Drowsiness Detection System
This system detects driver drowsiness using computer vision and deep learning.
## Features:
- Image analysis
- Video processing with speed monitoring
- Webcam detection (PC and mobile)
- Multi-factor drowsiness prediction (face, eyes, head pose, speed changes)
""")
with gr.Tabs():
with gr.Tab("Image"):
gr.Markdown("Upload an image for drowsiness detection")
with gr.Row():
image_input = gr.Image(label="Input Image", type="numpy")
image_output = gr.Image(label="Processed Image")
with gr.Row():
status_output = gr.Textbox(label="Status")
image_input.change(
fn=process_image,
inputs=[image_input],
outputs=[image_output, status_output]
)
with gr.Tab("Video"):
gr.Markdown("""
### 上傳駕駛視頻進行困倦檢測
系統將自動從視頻中檢測以下內容:
- 駕駛員面部表情和眼睛狀態
- 車輛速度變化 (通過視頻中的光流分析)
- 當車速變化超過 ±5 km/h 時將被視為異常駕駛行為
**注意:** 處理後的視頻不會保存到本地文件夾,請使用界面右上角的下載按鈕保存結果。
""")
with gr.Row():
video_input = gr.Video(label="輸入視頻")
video_output = gr.Video(label="處理後視頻 (點擊右上角下載)")
with gr.Row():
initial_speed = gr.Slider(minimum=10, maximum=120, value=60, label="初始車速估計值 (km/h)",
info="僅作為初始估計值,系統會自動從視頻中檢測實際速度變化")
with gr.Row():
video_status = gr.Textbox(label="處理狀態")
with gr.Row():
process_btn = gr.Button("處理視頻")
clear_btn = gr.Button("清除")
process_btn.click(
fn=process_video,
inputs=[video_input, initial_speed],
outputs=[video_output, video_status]
)
clear_btn.click(
fn=lambda: (None, "已清除結果"),
inputs=[],
outputs=[video_output, video_status]
)
with gr.Tab("Webcam"):
gr.Markdown("Use your webcam or mobile camera for real-time drowsiness detection")
with gr.Row():
webcam_input = gr.Image(label="Camera Feed", type="numpy", streaming=True)
webcam_output = gr.Image(label="Processed Feed")
with gr.Row():
speed_input = gr.Slider(minimum=0, maximum=150, value=60, label="Current Speed (km/h)")
update_speed_btn = gr.Button("Update Speed")
with gr.Row():
webcam_status = gr.Textbox(label="Status")
def process_webcam_with_speed(image, speed):
detector.update_speed(speed)
return process_image(image)
update_speed_btn.click(
fn=lambda speed: f"Speed updated to {speed} km/h",
inputs=[speed_input],
outputs=[webcam_status]
)
webcam_input.change(
fn=process_webcam_with_speed,
inputs=[webcam_input, speed_input],
outputs=[webcam_output, webcam_status]
)
gr.Markdown("""
## How It Works
This system detects drowsiness using multiple factors:
1. **Facial features** - Using a trained CNN model
2. **Eye openness** - Measuring eye aspect ratio (EAR)
3. **Head position** - Detecting head drooping
4. **Automatic speed detection** - Using optical flow analysis to track vehicle movement and detect irregular speed changes
The system automatically detects speed changes from the video frames using computer vision techniques:
- **Optical flow** is used to track movement between frames
- **Irregular speed changes** (±5 km/h) are detected as potential signs of drowsy driving
- **No external speed data required** - everything is analyzed directly from the video content
Combining these factors provides more reliable drowsiness detection than using facial features alone.
""")
# Launch the app
demo.launch(share=args.share, server_port=args.port)