import gradio as gr import numpy as np import cv2 from PIL import Image import io import os import json import time import argparse import tensorflow as tf from tensorflow import keras import math from collections import deque class SpeedDetector: def __init__(self, history_size=30): self.speed_history = deque(maxlen=history_size) self.last_update_time = None self.current_speed = 0 self.speed_change_threshold = 5 # km/h self.abnormal_speed_changes = 0 self.speed_deviation_sum = 0 self.speed_change_score = 0 # For optical flow speed estimation self.prev_gray = None self.prev_points = None self.frame_idx = 0 self.speed_estimate = 60 # Initial estimate def update_speed(self, speed_km_h): """Update with current speed in km/h""" current_time = time.time() # Add to history self.speed_history.append(speed_km_h) self.current_speed = speed_km_h # Not enough data yet if len(self.speed_history) < 5: return 0 # Calculate speed variation metrics speed_arr = np.array(self.speed_history) # 1. Standard deviation of speed speed_std = np.std(speed_arr) # 2. Detect abrupt changes for i in range(1, len(speed_arr)): change = abs(speed_arr[i] - speed_arr[i-1]) if change >= self.speed_change_threshold: self.abnormal_speed_changes += 1 # 3. Calculate average rate of change changes = np.abs(np.diff(speed_arr)) avg_change = np.mean(changes) if len(changes) > 0 else 0 # Combine into a score (0-1 range) self.speed_deviation_sum = min(5, speed_std) / 5 # Normalize to 0-1 abnormal_change_factor = min(1, self.abnormal_speed_changes / 5) avg_change_factor = min(1, avg_change / self.speed_change_threshold) # Weighted combination self.speed_change_score = ( 0.4 * self.speed_deviation_sum + 0.4 * abnormal_change_factor + 0.2 * avg_change_factor ) return self.speed_change_score def detect_speed_from_frame(self, frame): """Detect speed from video frame using optical flow""" if frame is None: return self.current_speed # Convert frame to grayscale gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) # For the first frame, initialize points to track if self.prev_gray is None or self.frame_idx % 30 == 0: # Reset tracking points every 30 frames # Detect good features to track mask = np.zeros_like(gray) # Focus on the lower portion of the frame (road) h, w = gray.shape mask[h//2:, :] = 255 corners = cv2.goodFeaturesToTrack(gray, maxCorners=100, qualityLevel=0.01, minDistance=10, mask=mask) if corners is not None and len(corners) > 0: self.prev_points = corners self.prev_gray = gray.copy() else: # No good points to track self.frame_idx += 1 return self.current_speed # Calculate optical flow if we have previous points if self.prev_gray is not None and self.prev_points is not None: # Calculate optical flow new_points, status, _ = cv2.calcOpticalFlowPyrLK(self.prev_gray, gray, self.prev_points, None) # Filter only valid points if new_points is not None and status is not None: good_new = new_points[status == 1] good_old = self.prev_points[status == 1] # Calculate flow magnitude if len(good_new) > 0 and len(good_old) > 0: flow_magnitudes = np.sqrt( np.sum((good_new - good_old)**2, axis=1) ) avg_flow = np.mean(flow_magnitudes) if len(flow_magnitudes) > 0 else 0 # Map optical flow to speed change # Higher flow = faster movement # This is a simplified mapping and would need calibration for real-world use flow_threshold = 1.0 # Adjust based on testing if avg_flow > flow_threshold: # Movement detected, estimate acceleration speed_change = min(5, max(-5, (avg_flow - flow_threshold) * 2)) # Add some temporal smoothing to avoid sudden changes speed_change = speed_change * 0.3 # Reduce magnitude for smoother change else: # Minimal movement, slight deceleration (coasting) speed_change = -0.1 # Update speed with detected change self.speed_estimate += speed_change # Keep speed in reasonable range self.speed_estimate = max(40, min(120, self.speed_estimate)) # Update tracking points self.prev_points = good_new.reshape(-1, 1, 2) # Update previous gray frame self.prev_gray = gray.copy() self.frame_idx += 1 # Check for dashboard speedometer (would require more sophisticated OCR in a real system) # For now, just use our estimated speed detected_speed = self.speed_estimate # Update current speed and trigger speed change detection self.update_speed(detected_speed) return detected_speed def get_speed_change_score(self): """Return a score from 0-1 indicating abnormal speed changes""" return self.speed_change_score def reset(self): """Reset the detector state""" self.speed_history.clear() self.abnormal_speed_changes = 0 self.speed_deviation_sum = 0 self.speed_change_score = 0 self.prev_gray = None self.prev_points = None self.frame_idx = 0 self.speed_estimate = 60 # Reset to initial estimate class DrowsinessDetector: def __init__(self): self.model = None self.input_shape = (224, 224, 3) # Updated to match model's expected input shape self.face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') self.id2label = {0: "notdrowsy", 1: "drowsy"} self.label2id = {"notdrowsy": 0, "drowsy": 1} # Speed detector self.speed_detector = SpeedDetector() self.SPEED_CHANGE_WEIGHT = 0.15 # Weight for speed changes in drowsiness calculation # 嘗試動態 import dlib,並設置 fallback self.landmark_detection_enabled = False try: import dlib self.detector = dlib.get_frontal_face_detector() predictor_path = "shape_predictor_68_face_landmarks.dat" if not os.path.exists(predictor_path): print(f"Warning: {predictor_path} not found. Downloading...") import urllib.request urllib.request.urlretrieve( "https://github.com/italojs/facial-landmarks-recognition/raw/master/shape_predictor_68_face_landmarks.dat", predictor_path ) self.predictor = dlib.shape_predictor(predictor_path) self.landmark_detection_enabled = True print("Facial landmark detection enabled") except Exception as e: print(f"Warning: Facial landmark detection disabled: {e}") print("The system will use a simpler detection method. For better accuracy, install CMake and dlib.") # Constants for drowsiness detection self.EAR_THRESHOLD = 0.25 # Eye aspect ratio threshold self.CONSECUTIVE_FRAMES = 20 self.ear_counter = 0 self.GAZE_THRESHOLD = 0.2 # Gaze direction threshold self.HEAD_POSE_THRESHOLD = 0.3 # Head pose threshold # Parameters for weighted ensemble self.MODEL_WEIGHT = 0.45 # Reduced to accommodate speed factor self.EAR_WEIGHT = 0.2 self.GAZE_WEIGHT = 0.1 self.HEAD_POSE_WEIGHT = 0.1 # For tracking across frames self.prev_drowsy_count = 0 self.drowsy_history = [] self.current_speed = 0 # Current speed in km/h def update_speed(self, speed_km_h): """Update the current speed""" self.current_speed = speed_km_h return self.speed_detector.update_speed(speed_km_h) def reset_speed_detector(self): """Reset the speed detector""" self.speed_detector.reset() def load_model(self): """Load the CNN model from local files""" try: # Use local model files config_path = "huggingface_model/config.json" model_path = "drowsiness_model.h5" # Load config with open(config_path, 'r') as f: config = json.load(f) # Load the Keras model directly self.model = keras.models.load_model(model_path) # Print model summary for debugging print("Model loaded successfully") print(f"Model input shape: {self.model.input_shape}") self.model.summary() except Exception as e: print(f"Error loading CNN model: {str(e)}") raise def eye_aspect_ratio(self, eye): """Calculate the eye aspect ratio""" # Compute the euclidean distances between the two sets of vertical eye landmarks A = dist.euclidean(eye[1], eye[5]) B = dist.euclidean(eye[2], eye[4]) # Compute the euclidean distance between the horizontal eye landmarks C = dist.euclidean(eye[0], eye[3]) # Calculate the eye aspect ratio ear = (A + B) / (2.0 * C) return ear def calculate_gaze(self, eye_points, facial_landmarks): """Calculate gaze direction""" left_eye_region = np.array([(facial_landmarks.part(i).x, facial_landmarks.part(i).y) for i in range(36, 42)]) right_eye_region = np.array([(facial_landmarks.part(i).x, facial_landmarks.part(i).y) for i in range(42, 48)]) # Compute eye centers left_eye_center = left_eye_region.mean(axis=0).astype("int") right_eye_center = right_eye_region.mean(axis=0).astype("int") # Compute the angle between eye centers dY = right_eye_center[1] - left_eye_center[1] dX = right_eye_center[0] - left_eye_center[0] angle = np.degrees(np.arctan2(dY, dX)) # Normalize the angle return abs(angle) / 180.0 def get_head_pose(self, shape): """Calculate the head pose""" # Get specific facial landmarks for head pose estimation image_points = np.array([ (shape.part(30).x, shape.part(30).y), # Nose tip (shape.part(8).x, shape.part(8).y), # Chin (shape.part(36).x, shape.part(36).y), # Left eye left corner (shape.part(45).x, shape.part(45).y), # Right eye right corner (shape.part(48).x, shape.part(48).y), # Left mouth corner (shape.part(54).x, shape.part(54).y) # Right mouth corner ], dtype="double") # A simple head pose estimation using the angle of the face # Calculate center of the face center_x = np.mean([p[0] for p in image_points]) center_y = np.mean([p[1] for p in image_points]) # Calculate angle with respect to vertical angle = 0 if len(image_points) > 2: point1 = image_points[0] # Nose point2 = image_points[1] # Chin angle = abs(math.atan2(point2[1] - point1[1], point2[0] - point1[0])) # Normalize to 0-1 range where 0 is upright and 1 is drooping normalized_pose = min(1.0, abs(angle) / (math.pi/2)) return normalized_pose def detect_face(self, frame): """Detect face in the frame""" gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) faces = self.face_cascade.detectMultiScale(gray, 1.1, 4) if len(faces) > 0: (x, y, w, h) = faces[0] # Get the first face face = frame[y:y+h, x:x+w] return face, (x, y, w, h) return None, None def preprocess_image(self, image): """Preprocess the input image for CNN""" if image is None: return None # Convert to RGB image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # Resize to model input size (224x224) image = cv2.resize(image, (self.input_shape[0], self.input_shape[1])) # Normalize image = image.astype(np.float32) / 255.0 # Add batch dimension image = np.expand_dims(image, axis=0) return image def predict(self, image): """Make prediction on the input image using multiple features""" if self.model is None: raise ValueError("Model not loaded. Call load_model() first.") # Initialize results drowsy_prob = 0.0 face_coords = None ear_value = 1.0 # Default to wide open eyes gaze_value = 0.0 head_pose_value = 0.0 landmark_detection_success = False # Detect face gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) face, face_coords = self.detect_face(image) if face is None: return 0.0, None, "No face detected", {} # Get model prediction inputs = self.preprocess_image(face) if inputs is None: return 0.0, face_coords, "Error processing image", {} outputs = self.model.predict(inputs) # Get the drowsiness probability from the model if outputs.shape[1] == 1: model_prob = outputs[0][0] # Convert to probability if needed if model_prob < 0 or model_prob > 1: model_prob = 1 / (1 + np.exp(-model_prob)) else: # For multi-class model probs = tf.nn.softmax(outputs, axis=1).numpy() model_prob = probs[0, 1] # Probability of class 1 (drowsy) # Get speed change score from detector speed_change_score = self.speed_detector.get_speed_change_score() # Get additional features if landmark detection is enabled metrics = { "model_prob": model_prob, "ear": 1.0, "gaze": 0.0, "head_pose": 0.0, "speed_change": speed_change_score } if self.landmark_detection_enabled: try: import dlib from scipy.spatial import distance as dist # Detect faces with dlib for landmark detection rects = self.detector(gray, 0) if len(rects) > 0: # Get facial landmarks shape = self.predictor(gray, rects[0]) # Get eye aspect ratio left_eye = [(shape.part(i).x, shape.part(i).y) for i in range(36, 42)] right_eye = [(shape.part(i).x, shape.part(i).y) for i in range(42, 48)] left_ear = self.eye_aspect_ratio(left_eye) right_ear = self.eye_aspect_ratio(right_eye) ear_value = (left_ear + right_ear) / 2.0 # Get gaze direction gaze_value = self.calculate_gaze(None, shape) # Get head pose head_pose_value = self.get_head_pose(shape) # Update metrics metrics["ear"] = ear_value metrics["gaze"] = gaze_value metrics["head_pose"] = head_pose_value landmark_detection_success = True except Exception as e: print(f"Error in landmark detection: {e}") else: # Use a simplified heuristic approach when dlib is not available # Calculate an estimated eye ratio from the grayscale intensity in eye regions # This is a simplified approach that is not as accurate as the EAR method if face_coords is not None: try: # Try to estimate eye regions based on face proportions face_gray = cv2.cvtColor(face, cv2.COLOR_BGR2GRAY) face_height, face_width = face_gray.shape[:2] # Estimate eye regions (these are approximate and may not be accurate for all faces) left_eye_region = face_gray[int(face_height*0.2):int(face_height*0.4), int(face_width*0.2):int(face_width*0.4)] right_eye_region = face_gray[int(face_height*0.2):int(face_height*0.4), int(face_width*0.6):int(face_width*0.8)] # Simplified metric: use average intensity - lower values might indicate closed eyes if left_eye_region.size > 0 and right_eye_region.size > 0: left_eye_avg = np.mean(left_eye_region) / 255.0 right_eye_avg = np.mean(right_eye_region) / 255.0 # Invert so that darker regions (potentially closed eyes) have higher values left_eye_closed = 1.0 - left_eye_avg right_eye_closed = 1.0 - right_eye_avg # Combine into a simple eye closure metric (0-1 range, higher means more closed) eye_closure = (left_eye_closed + right_eye_closed) / 2.0 # Convert to a rough approximation of EAR # Lower values indicate more closed eyes (like EAR) estimated_ear = max(0.15, 0.4 - (eye_closure * 0.25)) ear_value = estimated_ear metrics["ear"] = ear_value except Exception as e: print(f"Error in simplified eye detection: {e}") # Combine features for final drowsiness probability if landmark_detection_success: # Calculate eye state factor (1.0 when eyes closed, 0.0 when fully open) eye_state = max(0, min(1, (self.EAR_THRESHOLD - ear_value) * 5)) # Weight the factors weighted_avg = ( self.MODEL_WEIGHT * model_prob + self.EAR_WEIGHT * eye_state + self.GAZE_WEIGHT * gaze_value + self.HEAD_POSE_WEIGHT * head_pose_value + self.SPEED_CHANGE_WEIGHT * speed_change_score # Add speed change factor ) # Update drowsy probability drowsy_prob = weighted_avg else: # If landmark detection failed, use simplified approach # Use model probability with higher weight if "ear" in metrics and metrics["ear"] < 1.0: # We have the simplified eye metric eye_state = max(0, min(1, (self.EAR_THRESHOLD - metrics["ear"]) * 5)) drowsy_prob = (self.MODEL_WEIGHT * model_prob) + ((1 - self.MODEL_WEIGHT - self.SPEED_CHANGE_WEIGHT) * eye_state) + (self.SPEED_CHANGE_WEIGHT * speed_change_score) else: # Only model and speed are available drowsy_prob = (model_prob * 0.85) + (speed_change_score * 0.15) # Apply smoothing with history self.drowsy_history.append(drowsy_prob) if len(self.drowsy_history) > 10: self.drowsy_history.pop(0) # Use median filtering for robustness drowsy_prob = np.median(self.drowsy_history) return drowsy_prob, face_coords, None, metrics # Create a global instance detector = DrowsinessDetector() def process_image(image): """Process image input""" if image is None: return None, "No image provided" try: # Check for valid image if image.size == 0 or image.shape[0] == 0 or image.shape[1] == 0: return None, "Invalid image dimensions" # Make a copy of the image to avoid modifying the original processed_image = image.copy() # Make prediction drowsy_prob, face_coords, error, metrics = detector.predict(processed_image) if error: return None, error if face_coords is None: # No face detected - add text to the image and return it cv2.putText(processed_image, "No face detected", (30, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 0, 255), 2) return processed_image, "No face detected" # Draw bounding box x, y, w, h = face_coords # Use a higher threshold (0.7) to reduce false positives is_drowsy = drowsy_prob >= 0.7 # Determine alert level and color if drowsy_prob >= 0.85: alert_level = "High Risk" color = (0, 0, 255) # Red elif drowsy_prob >= 0.7: alert_level = "Medium Risk" color = (0, 165, 255) # Orange else: alert_level = "Alert" color = (0, 255, 0) # Green cv2.rectangle(processed_image, (x, y), (x+w, y+h), color, 2) # Add the metrics as text on image y_offset = 25 cv2.putText(processed_image, f"{'Drowsy' if is_drowsy else 'Alert'} ({drowsy_prob:.2f})", (x, y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, color, 2) # Add alert level cv2.putText(processed_image, alert_level, (x, y-35), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2) # Add metrics in bottom left cv2.putText(processed_image, f"Model: {metrics['model_prob']:.2f}", (10, processed_image.shape[0]-10-y_offset*3), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1) cv2.putText(processed_image, f"Eye Ratio: {metrics['ear']:.2f}", (10, processed_image.shape[0]-10-y_offset*2), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1) cv2.putText(processed_image, f"Head Pose: {metrics['head_pose']:.2f}", (10, processed_image.shape[0]-10-y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1) # Add confidence disclaimer for high model probabilities but good eye metrics if metrics['model_prob'] > 0.9 and metrics['ear'] > 0.25: cv2.putText(processed_image, "Model conflict - verify manually", (10, processed_image.shape[0]-10-y_offset*4), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 165, 255), 1) return processed_image, f"Processed successfully. Drowsiness: {drowsy_prob:.2f}, Alert level: {alert_level}" except Exception as e: import traceback error_details = traceback.format_exc() print(f"Error processing image: {str(e)}\n{error_details}") return None, f"Error processing image: {str(e)}" def process_video(video, initial_speed=60): """Process video input""" if video is None: return None, "No video provided" try: # 创建内存缓冲区而不是临时文件 temp_input = None # Handle video input (can be file path or video data) if isinstance(video, str): print(f"Processing video from path: {video}") # 直接读取原始文件,不复制到临时目录 cap = cv2.VideoCapture(video) else: print(f"Processing video from uploaded data") # 读取上传的视频数据到内存 import tempfile temp_input = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) temp_input_path = temp_input.name with open(temp_input_path, "wb") as f: f.write(video) cap = cv2.VideoCapture(temp_input_path) if not cap.isOpened(): return None, "Error: Could not open video" # Get input video properties fps = cap.get(cv2.CAP_PROP_FPS) if fps <= 0: fps = 30 # Default to 30fps if invalid width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) print(f"Video properties: {width}x{height} at {fps}fps, total frames: {total_frames}") # 创建内存缓冲区而不是临时输出文件 import io import base64 # 使用临时文件来存储处理后的视频(处理完毕后会删除) import tempfile temp_output = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) temp_output_path = temp_output.name # Try different codecs on Windows if os.name == 'nt': # Windows # 使用mp4v编码以确保兼容性 fourcc = cv2.VideoWriter_fourcc(*'mp4v') else: # On other platforms, use MP4V fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Create video writer out = cv2.VideoWriter(temp_output_path, fourcc, fps, (width, height)) if not out.isOpened(): return None, "Error: Could not create output video file" # Reset speed detector at the start of each video detector.reset_speed_detector() # Initialize speed value with the provided initial speed current_speed = initial_speed detector.speed_detector.speed_estimate = initial_speed # Process each frame frame_count = 0 processed_count = 0 face_detected_count = 0 drowsy_count = 0 high_risk_count = 0 ear_sum = 0 model_prob_sum = 0 while True: ret, frame = cap.read() if not ret: print(f"End of video or error reading frame at frame {frame_count}") break frame_count += 1 # Detect speed from the current frame current_speed = detector.speed_detector.detect_speed_from_frame(frame) try: # Try to process the frame processed_frame, message = process_image(frame) # Add speed info to the frame if processed_frame is not None: speed_text = f"Speed: {current_speed:.1f} km/h" cv2.putText(processed_frame, speed_text, (10, processed_frame.shape[0]-45), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1) # Add speed change score speed_change_score = detector.speed_detector.get_speed_change_score() cv2.putText(processed_frame, f"Speed Variation: {speed_change_score:.2f}", (10, processed_frame.shape[0]-70), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1) if processed_frame is not None: out.write(processed_frame) processed_count += 1 if "No face detected" not in message: face_detected_count += 1 if "Drowsiness" in message: # Extract drowsiness probability try: drowsy_text = message.split("Drowsiness: ")[1].split(",")[0] drowsy_prob = float(drowsy_text) # Track drowsiness stats if drowsy_prob >= 0.7: drowsy_count += 1 if drowsy_prob >= 0.85: high_risk_count += 1 # Get metrics from the frame _, _, _, metrics = detector.predict(frame) if 'ear' in metrics: ear_sum += metrics['ear'] if 'model_prob' in metrics: model_prob_sum += metrics['model_prob'] except: pass else: # Fallback: If processing fails, just use the original frame # Add text indicating processing failed cv2.putText(frame, "Processing failed", (30, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2) out.write(frame) processed_count += 1 print(f"Frame {frame_count}: Processing failed - {message}") except Exception as e: # If any error occurs during processing, use original frame cv2.putText(frame, f"Error: {str(e)[:30]}", (30, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2) out.write(frame) processed_count += 1 print(f"Frame {frame_count}: Exception - {str(e)}") # Print progress for every 10th frame if frame_count % 10 == 0: print(f"Processed {frame_count}/{total_frames} frames") # Release resources cap.release() out.release() # Calculate statistics drowsy_percentage = (drowsy_count / face_detected_count * 100) if face_detected_count > 0 else 0 high_risk_percentage = (high_risk_count / face_detected_count * 100) if face_detected_count > 0 else 0 avg_ear = ear_sum / face_detected_count if face_detected_count > 0 else 0 avg_model_prob = model_prob_sum / face_detected_count if face_detected_count > 0 else 0 speed_score = detector.speed_detector.get_speed_change_score() # Check if video was created successfully and return it directly if os.path.exists(temp_output_path) and os.path.getsize(temp_output_path) > 0: print(f"Video processed successfully with {processed_count} frames") print(f"Drowsy frames: {drowsy_count} ({drowsy_percentage:.1f}%), High risk frames: {high_risk_count} ({high_risk_percentage:.1f}%)") print(f"Average eye ratio: {avg_ear:.2f}, Average model probability: {avg_model_prob:.2f}") print(f"Speed change score: {speed_score:.2f}") # If model prob is high but eye ratio is also high (open eyes), flag potential false positive false_positive_warning = "" if avg_model_prob > 0.8 and avg_ear > 0.25: false_positive_warning = " ⚠️ Possible false positive (eyes open but model detects drowsiness)" result_message = (f"Video processed successfully. Frames: {frame_count}, faces detected: {face_detected_count}, " f"drowsy: {drowsy_count} ({drowsy_percentage:.1f}%), high risk: {high_risk_count} ({high_risk_percentage:.1f}%)." f" Avg eye ratio: {avg_ear:.2f}, Speed score: {speed_score:.2f}{false_positive_warning}") # 直接返回文件而不保留它 video_result = temp_output_path return video_result, result_message else: print(f"Failed to create output video. Frames read: {frame_count}, processed: {processed_count}") return None, f"Error: Failed to create output video. Frames read: {frame_count}, processed: {processed_count}" except Exception as e: import traceback error_details = traceback.format_exc() print(f"Error processing video: {str(e)}\n{error_details}") return None, f"Error processing video: {str(e)}" finally: # Clean up resources if 'out' in locals() and out is not None: out.release() if 'cap' in locals() and cap is not None: cap.release() # 删除临时输入文件(如果存在) if temp_input is not None: try: os.unlink(temp_input.name) except: pass def process_webcam(image): """Process webcam input - returns processed image and status message""" return process_image(image) # Launch the app if __name__ == "__main__": # Parse command line arguments parser = argparse.ArgumentParser(description="Driver Drowsiness Detection App") parser.add_argument("--share", action="store_true", help="Create a public link (may trigger security warnings)") parser.add_argument("--port", type=int, default=7860, help="Port to run the app on") args = parser.parse_args() # Print warning if share is enabled if args.share: print("WARNING: Running with --share may trigger security warnings on some systems.") print("The app will be accessible from the internet through a temporary URL.") # 注册退出时的清理函数 import atexit import glob import shutil def cleanup_temp_files(): """清理所有临时文件""" try: # 删除所有可能留下的临时文件 import tempfile temp_dir = tempfile.gettempdir() pattern = os.path.join(temp_dir, "tmp*") for file in glob.glob(pattern): try: if os.path.isfile(file): os.remove(file) except Exception as e: print(f"Failed to delete {file}: {e}") # 确保没有留下.mp4或.avi文件 for ext in [".mp4", ".avi"]: pattern = os.path.join(temp_dir, f"*{ext}") for file in glob.glob(pattern): try: os.remove(file) except Exception as e: print(f"Failed to delete {file}: {e}") print("Cleaned up temporary files") except Exception as e: print(f"Error during cleanup: {e}") # 注册清理函数 atexit.register(cleanup_temp_files) # Load the model at startup detector.load_model() # Create interface with gr.Blocks(title="Driver Drowsiness Detection") as demo: gr.Markdown(""" # 🚗 Driver Drowsiness Detection System This system detects driver drowsiness using computer vision and deep learning. ## Features: - Image analysis - Video processing with speed monitoring - Webcam detection (PC and mobile) - Multi-factor drowsiness prediction (face, eyes, head pose, speed changes) """) with gr.Tabs(): with gr.Tab("Image"): gr.Markdown("Upload an image for drowsiness detection") with gr.Row(): image_input = gr.Image(label="Input Image", type="numpy") image_output = gr.Image(label="Processed Image") with gr.Row(): status_output = gr.Textbox(label="Status") image_input.change( fn=process_image, inputs=[image_input], outputs=[image_output, status_output] ) with gr.Tab("Video"): gr.Markdown(""" ### 上傳駕駛視頻進行困倦檢測 系統將自動從視頻中檢測以下內容: - 駕駛員面部表情和眼睛狀態 - 車輛速度變化 (通過視頻中的光流分析) - 當車速變化超過 ±5 km/h 時將被視為異常駕駛行為 **注意:** 處理後的視頻不會保存到本地文件夾,請使用界面右上角的下載按鈕保存結果。 """) with gr.Row(): video_input = gr.Video(label="輸入視頻") video_output = gr.Video(label="處理後視頻 (點擊右上角下載)") with gr.Row(): initial_speed = gr.Slider(minimum=10, maximum=120, value=60, label="初始車速估計值 (km/h)", info="僅作為初始估計值,系統會自動從視頻中檢測實際速度變化") with gr.Row(): video_status = gr.Textbox(label="處理狀態") with gr.Row(): process_btn = gr.Button("處理視頻") clear_btn = gr.Button("清除") process_btn.click( fn=process_video, inputs=[video_input, initial_speed], outputs=[video_output, video_status] ) clear_btn.click( fn=lambda: (None, "已清除結果"), inputs=[], outputs=[video_output, video_status] ) with gr.Tab("Webcam"): gr.Markdown("Use your webcam or mobile camera for real-time drowsiness detection") with gr.Row(): webcam_input = gr.Image(label="Camera Feed", type="numpy", streaming=True) webcam_output = gr.Image(label="Processed Feed") with gr.Row(): speed_input = gr.Slider(minimum=0, maximum=150, value=60, label="Current Speed (km/h)") update_speed_btn = gr.Button("Update Speed") with gr.Row(): webcam_status = gr.Textbox(label="Status") def process_webcam_with_speed(image, speed): detector.update_speed(speed) return process_image(image) update_speed_btn.click( fn=lambda speed: f"Speed updated to {speed} km/h", inputs=[speed_input], outputs=[webcam_status] ) webcam_input.change( fn=process_webcam_with_speed, inputs=[webcam_input, speed_input], outputs=[webcam_output, webcam_status] ) gr.Markdown(""" ## How It Works This system detects drowsiness using multiple factors: 1. **Facial features** - Using a trained CNN model 2. **Eye openness** - Measuring eye aspect ratio (EAR) 3. **Head position** - Detecting head drooping 4. **Automatic speed detection** - Using optical flow analysis to track vehicle movement and detect irregular speed changes The system automatically detects speed changes from the video frames using computer vision techniques: - **Optical flow** is used to track movement between frames - **Irregular speed changes** (±5 km/h) are detected as potential signs of drowsy driving - **No external speed data required** - everything is analyzed directly from the video content Combining these factors provides more reliable drowsiness detection than using facial features alone. """) # Launch the app demo.launch(share=args.share, server_port=args.port)