Spaces:
Build error
Build error
| import gradio as gr | |
| import numpy as np | |
| import cv2 | |
| from PIL import Image | |
| import io | |
| import os | |
| import json | |
| import time | |
| import argparse | |
| import tensorflow as tf | |
| from tensorflow import keras | |
| import dlib | |
| from scipy.spatial import distance as dist | |
| import math | |
| from collections import deque | |
| class SpeedDetector: | |
| def __init__(self, history_size=30): | |
| self.speed_history = deque(maxlen=history_size) | |
| self.last_update_time = None | |
| self.current_speed = 0 | |
| self.speed_change_threshold = 5 # km/h | |
| self.abnormal_speed_changes = 0 | |
| self.speed_deviation_sum = 0 | |
| self.speed_change_score = 0 | |
| # For optical flow speed estimation | |
| self.prev_gray = None | |
| self.prev_points = None | |
| self.frame_idx = 0 | |
| self.speed_estimate = 60 # Initial estimate | |
| def update_speed(self, speed_km_h): | |
| """Update with current speed in km/h""" | |
| current_time = time.time() | |
| # Add to history | |
| self.speed_history.append(speed_km_h) | |
| self.current_speed = speed_km_h | |
| # Not enough data yet | |
| if len(self.speed_history) < 5: | |
| return 0 | |
| # Calculate speed variation metrics | |
| speed_arr = np.array(self.speed_history) | |
| # 1. Standard deviation of speed | |
| speed_std = np.std(speed_arr) | |
| # 2. Detect abrupt changes | |
| for i in range(1, len(speed_arr)): | |
| change = abs(speed_arr[i] - speed_arr[i-1]) | |
| if change >= self.speed_change_threshold: | |
| self.abnormal_speed_changes += 1 | |
| # 3. Calculate average rate of change | |
| changes = np.abs(np.diff(speed_arr)) | |
| avg_change = np.mean(changes) if len(changes) > 0 else 0 | |
| # Combine into a score (0-1 range) | |
| self.speed_deviation_sum = min(5, speed_std) / 5 # Normalize to 0-1 | |
| abnormal_change_factor = min(1, self.abnormal_speed_changes / 5) | |
| avg_change_factor = min(1, avg_change / self.speed_change_threshold) | |
| # Weighted combination | |
| self.speed_change_score = ( | |
| 0.4 * self.speed_deviation_sum + | |
| 0.4 * abnormal_change_factor + | |
| 0.2 * avg_change_factor | |
| ) | |
| return self.speed_change_score | |
| def detect_speed_from_frame(self, frame): | |
| """Detect speed from video frame using optical flow""" | |
| if frame is None: | |
| return self.current_speed | |
| # Convert frame to grayscale | |
| gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
| # For the first frame, initialize points to track | |
| if self.prev_gray is None or self.frame_idx % 30 == 0: # Reset tracking points every 30 frames | |
| # Detect good features to track | |
| mask = np.zeros_like(gray) | |
| # Focus on the lower portion of the frame (road) | |
| h, w = gray.shape | |
| mask[h//2:, :] = 255 | |
| corners = cv2.goodFeaturesToTrack(gray, maxCorners=100, qualityLevel=0.01, minDistance=10, mask=mask) | |
| if corners is not None and len(corners) > 0: | |
| self.prev_points = corners | |
| self.prev_gray = gray.copy() | |
| else: | |
| # No good points to track | |
| self.frame_idx += 1 | |
| return self.current_speed | |
| # Calculate optical flow if we have previous points | |
| if self.prev_gray is not None and self.prev_points is not None: | |
| # Calculate optical flow | |
| new_points, status, _ = cv2.calcOpticalFlowPyrLK(self.prev_gray, gray, self.prev_points, None) | |
| # Filter only valid points | |
| if new_points is not None and status is not None: | |
| good_new = new_points[status == 1] | |
| good_old = self.prev_points[status == 1] | |
| # Calculate flow magnitude | |
| if len(good_new) > 0 and len(good_old) > 0: | |
| flow_magnitudes = np.sqrt( | |
| np.sum((good_new - good_old)**2, axis=1) | |
| ) | |
| avg_flow = np.mean(flow_magnitudes) if len(flow_magnitudes) > 0 else 0 | |
| # Map optical flow to speed change | |
| # Higher flow = faster movement | |
| # This is a simplified mapping and would need calibration for real-world use | |
| flow_threshold = 1.0 # Adjust based on testing | |
| if avg_flow > flow_threshold: | |
| # Movement detected, estimate acceleration | |
| speed_change = min(5, max(-5, (avg_flow - flow_threshold) * 2)) | |
| # Add some temporal smoothing to avoid sudden changes | |
| speed_change = speed_change * 0.3 # Reduce magnitude for smoother change | |
| else: | |
| # Minimal movement, slight deceleration (coasting) | |
| speed_change = -0.1 | |
| # Update speed with detected change | |
| self.speed_estimate += speed_change | |
| # Keep speed in reasonable range | |
| self.speed_estimate = max(40, min(120, self.speed_estimate)) | |
| # Update tracking points | |
| self.prev_points = good_new.reshape(-1, 1, 2) | |
| # Update previous gray frame | |
| self.prev_gray = gray.copy() | |
| self.frame_idx += 1 | |
| # Check for dashboard speedometer (would require more sophisticated OCR in a real system) | |
| # For now, just use our estimated speed | |
| detected_speed = self.speed_estimate | |
| # Update current speed and trigger speed change detection | |
| self.update_speed(detected_speed) | |
| return detected_speed | |
| def get_speed_change_score(self): | |
| """Return a score from 0-1 indicating abnormal speed changes""" | |
| return self.speed_change_score | |
| def reset(self): | |
| """Reset the detector state""" | |
| self.speed_history.clear() | |
| self.abnormal_speed_changes = 0 | |
| self.speed_deviation_sum = 0 | |
| self.speed_change_score = 0 | |
| self.prev_gray = None | |
| self.prev_points = None | |
| self.frame_idx = 0 | |
| self.speed_estimate = 60 # Reset to initial estimate | |
| class DrowsinessDetector: | |
| def __init__(self): | |
| self.model = None | |
| self.input_shape = (224, 224, 3) # Updated to match model's expected input shape | |
| self.face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') | |
| self.id2label = {0: "notdrowsy", 1: "drowsy"} | |
| self.label2id = {"notdrowsy": 0, "drowsy": 1} | |
| # Speed detector | |
| self.speed_detector = SpeedDetector() | |
| self.SPEED_CHANGE_WEIGHT = 0.15 # Weight for speed changes in drowsiness calculation | |
| # Try to load dlib and facial landmark predictor (but make it optional) | |
| self.landmark_detection_enabled = False | |
| try: | |
| import dlib | |
| self.detector = dlib.get_frontal_face_detector() | |
| # Check if the shape predictor file exists | |
| predictor_path = "shape_predictor_68_face_landmarks.dat" | |
| if not os.path.exists(predictor_path): | |
| print(f"Warning: {predictor_path} not found. Downloading...") | |
| import urllib.request | |
| urllib.request.urlretrieve( | |
| "https://github.com/italojs/facial-landmarks-recognition/raw/master/shape_predictor_68_face_landmarks.dat", | |
| predictor_path | |
| ) | |
| self.predictor = dlib.shape_predictor(predictor_path) | |
| self.landmark_detection_enabled = True | |
| print("Facial landmark detection enabled") | |
| except Exception as e: | |
| print(f"Warning: Facial landmark detection disabled: {e}") | |
| print("The system will use a simpler detection method. For better accuracy, install CMake and dlib.") | |
| # Constants for drowsiness detection | |
| self.EAR_THRESHOLD = 0.25 # Eye aspect ratio threshold | |
| self.CONSECUTIVE_FRAMES = 20 | |
| self.ear_counter = 0 | |
| self.GAZE_THRESHOLD = 0.2 # Gaze direction threshold | |
| self.HEAD_POSE_THRESHOLD = 0.3 # Head pose threshold | |
| # Parameters for weighted ensemble | |
| self.MODEL_WEIGHT = 0.45 # Reduced to accommodate speed factor | |
| self.EAR_WEIGHT = 0.2 | |
| self.GAZE_WEIGHT = 0.1 | |
| self.HEAD_POSE_WEIGHT = 0.1 | |
| # For tracking across frames | |
| self.prev_drowsy_count = 0 | |
| self.drowsy_history = [] | |
| self.current_speed = 0 # Current speed in km/h | |
| def update_speed(self, speed_km_h): | |
| """Update the current speed""" | |
| self.current_speed = speed_km_h | |
| return self.speed_detector.update_speed(speed_km_h) | |
| def reset_speed_detector(self): | |
| """Reset the speed detector""" | |
| self.speed_detector.reset() | |
| def load_model(self): | |
| """Load the CNN model from local files""" | |
| try: | |
| # Use local model files | |
| config_path = "huggingface_model/config.json" | |
| model_path = "drowsiness_model.h5" | |
| # Load config | |
| with open(config_path, 'r') as f: | |
| config = json.load(f) | |
| # Load the Keras model directly | |
| self.model = keras.models.load_model(model_path) | |
| # Print model summary for debugging | |
| print("Model loaded successfully") | |
| print(f"Model input shape: {self.model.input_shape}") | |
| self.model.summary() | |
| except Exception as e: | |
| print(f"Error loading CNN model: {str(e)}") | |
| raise | |
| def eye_aspect_ratio(self, eye): | |
| """Calculate the eye aspect ratio""" | |
| # Compute the euclidean distances between the two sets of vertical eye landmarks | |
| A = dist.euclidean(eye[1], eye[5]) | |
| B = dist.euclidean(eye[2], eye[4]) | |
| # Compute the euclidean distance between the horizontal eye landmarks | |
| C = dist.euclidean(eye[0], eye[3]) | |
| # Calculate the eye aspect ratio | |
| ear = (A + B) / (2.0 * C) | |
| return ear | |
| def calculate_gaze(self, eye_points, facial_landmarks): | |
| """Calculate gaze direction""" | |
| left_eye_region = np.array([(facial_landmarks.part(i).x, facial_landmarks.part(i).y) for i in range(36, 42)]) | |
| right_eye_region = np.array([(facial_landmarks.part(i).x, facial_landmarks.part(i).y) for i in range(42, 48)]) | |
| # Compute eye centers | |
| left_eye_center = left_eye_region.mean(axis=0).astype("int") | |
| right_eye_center = right_eye_region.mean(axis=0).astype("int") | |
| # Compute the angle between eye centers | |
| dY = right_eye_center[1] - left_eye_center[1] | |
| dX = right_eye_center[0] - left_eye_center[0] | |
| angle = np.degrees(np.arctan2(dY, dX)) | |
| # Normalize the angle | |
| return abs(angle) / 180.0 | |
| def get_head_pose(self, shape): | |
| """Calculate the head pose""" | |
| # Get specific facial landmarks for head pose estimation | |
| image_points = np.array([ | |
| (shape.part(30).x, shape.part(30).y), # Nose tip | |
| (shape.part(8).x, shape.part(8).y), # Chin | |
| (shape.part(36).x, shape.part(36).y), # Left eye left corner | |
| (shape.part(45).x, shape.part(45).y), # Right eye right corner | |
| (shape.part(48).x, shape.part(48).y), # Left mouth corner | |
| (shape.part(54).x, shape.part(54).y) # Right mouth corner | |
| ], dtype="double") | |
| # A simple head pose estimation using the angle of the face | |
| # Calculate center of the face | |
| center_x = np.mean([p[0] for p in image_points]) | |
| center_y = np.mean([p[1] for p in image_points]) | |
| # Calculate angle with respect to vertical | |
| angle = 0 | |
| if len(image_points) > 2: | |
| point1 = image_points[0] # Nose | |
| point2 = image_points[1] # Chin | |
| angle = abs(math.atan2(point2[1] - point1[1], point2[0] - point1[0])) | |
| # Normalize to 0-1 range where 0 is upright and 1 is drooping | |
| normalized_pose = min(1.0, abs(angle) / (math.pi/2)) | |
| return normalized_pose | |
| def detect_face(self, frame): | |
| """Detect face in the frame""" | |
| gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
| faces = self.face_cascade.detectMultiScale(gray, 1.1, 4) | |
| if len(faces) > 0: | |
| (x, y, w, h) = faces[0] # Get the first face | |
| face = frame[y:y+h, x:x+w] | |
| return face, (x, y, w, h) | |
| return None, None | |
| def preprocess_image(self, image): | |
| """Preprocess the input image for CNN""" | |
| if image is None: | |
| return None | |
| # Convert to RGB | |
| image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
| # Resize to model input size (224x224) | |
| image = cv2.resize(image, (self.input_shape[0], self.input_shape[1])) | |
| # Normalize | |
| image = image.astype(np.float32) / 255.0 | |
| # Add batch dimension | |
| image = np.expand_dims(image, axis=0) | |
| return image | |
| def predict(self, image): | |
| """Make prediction on the input image using multiple features""" | |
| if self.model is None: | |
| raise ValueError("Model not loaded. Call load_model() first.") | |
| # Initialize results | |
| drowsy_prob = 0.0 | |
| face_coords = None | |
| ear_value = 1.0 # Default to wide open eyes | |
| gaze_value = 0.0 | |
| head_pose_value = 0.0 | |
| landmark_detection_success = False | |
| # Detect face | |
| gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
| face, face_coords = self.detect_face(image) | |
| if face is None: | |
| return 0.0, None, "No face detected", {} | |
| # Get model prediction | |
| inputs = self.preprocess_image(face) | |
| if inputs is None: | |
| return 0.0, face_coords, "Error processing image", {} | |
| outputs = self.model.predict(inputs) | |
| # Get the drowsiness probability from the model | |
| if outputs.shape[1] == 1: | |
| model_prob = outputs[0][0] | |
| # Convert to probability if needed | |
| if model_prob < 0 or model_prob > 1: | |
| model_prob = 1 / (1 + np.exp(-model_prob)) | |
| else: | |
| # For multi-class model | |
| probs = tf.nn.softmax(outputs, axis=1).numpy() | |
| model_prob = probs[0, 1] # Probability of class 1 (drowsy) | |
| # Get speed change score from detector | |
| speed_change_score = self.speed_detector.get_speed_change_score() | |
| # Get additional features if landmark detection is enabled | |
| metrics = { | |
| "model_prob": model_prob, | |
| "ear": 1.0, | |
| "gaze": 0.0, | |
| "head_pose": 0.0, | |
| "speed_change": speed_change_score | |
| } | |
| if self.landmark_detection_enabled: | |
| try: | |
| # Import dlib here to avoid errors if it's not installed | |
| import dlib | |
| from scipy.spatial import distance as dist | |
| # Detect faces with dlib for landmark detection | |
| rects = self.detector(gray, 0) | |
| if len(rects) > 0: | |
| # Get facial landmarks | |
| shape = self.predictor(gray, rects[0]) | |
| # Get eye aspect ratio | |
| left_eye = [(shape.part(i).x, shape.part(i).y) for i in range(36, 42)] | |
| right_eye = [(shape.part(i).x, shape.part(i).y) for i in range(42, 48)] | |
| left_ear = self.eye_aspect_ratio(left_eye) | |
| right_ear = self.eye_aspect_ratio(right_eye) | |
| ear_value = (left_ear + right_ear) / 2.0 | |
| # Get gaze direction | |
| gaze_value = self.calculate_gaze(None, shape) | |
| # Get head pose | |
| head_pose_value = self.get_head_pose(shape) | |
| # Update metrics | |
| metrics["ear"] = ear_value | |
| metrics["gaze"] = gaze_value | |
| metrics["head_pose"] = head_pose_value | |
| landmark_detection_success = True | |
| except Exception as e: | |
| print(f"Error in landmark detection: {e}") | |
| else: | |
| # Use a simplified heuristic approach when dlib is not available | |
| # Calculate an estimated eye ratio from the grayscale intensity in eye regions | |
| # This is a simplified approach that is not as accurate as the EAR method | |
| if face_coords is not None: | |
| try: | |
| # Try to estimate eye regions based on face proportions | |
| face_gray = cv2.cvtColor(face, cv2.COLOR_BGR2GRAY) | |
| face_height, face_width = face_gray.shape[:2] | |
| # Estimate eye regions (these are approximate and may not be accurate for all faces) | |
| left_eye_region = face_gray[int(face_height*0.2):int(face_height*0.4), int(face_width*0.2):int(face_width*0.4)] | |
| right_eye_region = face_gray[int(face_height*0.2):int(face_height*0.4), int(face_width*0.6):int(face_width*0.8)] | |
| # Simplified metric: use average intensity - lower values might indicate closed eyes | |
| if left_eye_region.size > 0 and right_eye_region.size > 0: | |
| left_eye_avg = np.mean(left_eye_region) / 255.0 | |
| right_eye_avg = np.mean(right_eye_region) / 255.0 | |
| # Invert so that darker regions (potentially closed eyes) have higher values | |
| left_eye_closed = 1.0 - left_eye_avg | |
| right_eye_closed = 1.0 - right_eye_avg | |
| # Combine into a simple eye closure metric (0-1 range, higher means more closed) | |
| eye_closure = (left_eye_closed + right_eye_closed) / 2.0 | |
| # Convert to a rough approximation of EAR | |
| # Lower values indicate more closed eyes (like EAR) | |
| estimated_ear = max(0.15, 0.4 - (eye_closure * 0.25)) | |
| ear_value = estimated_ear | |
| metrics["ear"] = ear_value | |
| except Exception as e: | |
| print(f"Error in simplified eye detection: {e}") | |
| # Combine features for final drowsiness probability | |
| if landmark_detection_success: | |
| # Calculate eye state factor (1.0 when eyes closed, 0.0 when fully open) | |
| eye_state = max(0, min(1, (self.EAR_THRESHOLD - ear_value) * 5)) | |
| # Weight the factors | |
| weighted_avg = ( | |
| self.MODEL_WEIGHT * model_prob + | |
| self.EAR_WEIGHT * eye_state + | |
| self.GAZE_WEIGHT * gaze_value + | |
| self.HEAD_POSE_WEIGHT * head_pose_value + | |
| self.SPEED_CHANGE_WEIGHT * speed_change_score # Add speed change factor | |
| ) | |
| # Update drowsy probability | |
| drowsy_prob = weighted_avg | |
| else: | |
| # If landmark detection failed, use simplified approach | |
| # Use model probability with higher weight | |
| if "ear" in metrics and metrics["ear"] < 1.0: | |
| # We have the simplified eye metric | |
| eye_state = max(0, min(1, (self.EAR_THRESHOLD - metrics["ear"]) * 5)) | |
| drowsy_prob = (self.MODEL_WEIGHT * model_prob) + ((1 - self.MODEL_WEIGHT - self.SPEED_CHANGE_WEIGHT) * eye_state) + (self.SPEED_CHANGE_WEIGHT * speed_change_score) | |
| else: | |
| # Only model and speed are available | |
| drowsy_prob = (model_prob * 0.85) + (speed_change_score * 0.15) | |
| # Apply smoothing with history | |
| self.drowsy_history.append(drowsy_prob) | |
| if len(self.drowsy_history) > 10: | |
| self.drowsy_history.pop(0) | |
| # Use median filtering for robustness | |
| drowsy_prob = np.median(self.drowsy_history) | |
| return drowsy_prob, face_coords, None, metrics | |
| # Create a global instance | |
| detector = DrowsinessDetector() | |
| def process_image(image): | |
| """Process image input""" | |
| if image is None: | |
| return None, "No image provided" | |
| try: | |
| # Check for valid image | |
| if image.size == 0 or image.shape[0] == 0 or image.shape[1] == 0: | |
| return None, "Invalid image dimensions" | |
| # Make a copy of the image to avoid modifying the original | |
| processed_image = image.copy() | |
| # Make prediction | |
| drowsy_prob, face_coords, error, metrics = detector.predict(processed_image) | |
| if error: | |
| return None, error | |
| if face_coords is None: | |
| # No face detected - add text to the image and return it | |
| cv2.putText(processed_image, "No face detected", (30, 30), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 0, 255), 2) | |
| return processed_image, "No face detected" | |
| # Draw bounding box | |
| x, y, w, h = face_coords | |
| # Use a higher threshold (0.7) to reduce false positives | |
| is_drowsy = drowsy_prob >= 0.7 | |
| # Determine alert level and color | |
| if drowsy_prob >= 0.85: | |
| alert_level = "High Risk" | |
| color = (0, 0, 255) # Red | |
| elif drowsy_prob >= 0.7: | |
| alert_level = "Medium Risk" | |
| color = (0, 165, 255) # Orange | |
| else: | |
| alert_level = "Alert" | |
| color = (0, 255, 0) # Green | |
| cv2.rectangle(processed_image, (x, y), (x+w, y+h), color, 2) | |
| # Add the metrics as text on image | |
| y_offset = 25 | |
| cv2.putText(processed_image, f"{'Drowsy' if is_drowsy else 'Alert'} ({drowsy_prob:.2f})", | |
| (x, y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, color, 2) | |
| # Add alert level | |
| cv2.putText(processed_image, alert_level, (x, y-35), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2) | |
| # Add metrics in bottom left | |
| cv2.putText(processed_image, f"Model: {metrics['model_prob']:.2f}", (10, processed_image.shape[0]-10-y_offset*3), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1) | |
| cv2.putText(processed_image, f"Eye Ratio: {metrics['ear']:.2f}", (10, processed_image.shape[0]-10-y_offset*2), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1) | |
| cv2.putText(processed_image, f"Head Pose: {metrics['head_pose']:.2f}", (10, processed_image.shape[0]-10-y_offset), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1) | |
| # Add confidence disclaimer for high model probabilities but good eye metrics | |
| if metrics['model_prob'] > 0.9 and metrics['ear'] > 0.25: | |
| cv2.putText(processed_image, "Model conflict - verify manually", | |
| (10, processed_image.shape[0]-10-y_offset*4), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 165, 255), 1) | |
| return processed_image, f"Processed successfully. Drowsiness: {drowsy_prob:.2f}, Alert level: {alert_level}" | |
| except Exception as e: | |
| import traceback | |
| error_details = traceback.format_exc() | |
| print(f"Error processing image: {str(e)}\n{error_details}") | |
| return None, f"Error processing image: {str(e)}" | |
| def process_video(video, initial_speed=60): | |
| """Process video input""" | |
| if video is None: | |
| return None, "No video provided" | |
| try: | |
| # 创建内存缓冲区而不是临时文件 | |
| temp_input = None | |
| # Handle video input (can be file path or video data) | |
| if isinstance(video, str): | |
| print(f"Processing video from path: {video}") | |
| # 直接读取原始文件,不复制到临时目录 | |
| cap = cv2.VideoCapture(video) | |
| else: | |
| print(f"Processing video from uploaded data") | |
| # 读取上传的视频数据到内存 | |
| import tempfile | |
| temp_input = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) | |
| temp_input_path = temp_input.name | |
| with open(temp_input_path, "wb") as f: | |
| f.write(video) | |
| cap = cv2.VideoCapture(temp_input_path) | |
| if not cap.isOpened(): | |
| return None, "Error: Could not open video" | |
| # Get input video properties | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| if fps <= 0: | |
| fps = 30 # Default to 30fps if invalid | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| print(f"Video properties: {width}x{height} at {fps}fps, total frames: {total_frames}") | |
| # 创建内存缓冲区而不是临时输出文件 | |
| import io | |
| import base64 | |
| # 使用临时文件来存储处理后的视频(处理完毕后会删除) | |
| import tempfile | |
| temp_output = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) | |
| temp_output_path = temp_output.name | |
| # Try different codecs on Windows | |
| if os.name == 'nt': # Windows | |
| # 使用mp4v编码以确保兼容性 | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| else: | |
| # On other platforms, use MP4V | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| # Create video writer | |
| out = cv2.VideoWriter(temp_output_path, fourcc, fps, (width, height)) | |
| if not out.isOpened(): | |
| return None, "Error: Could not create output video file" | |
| # Reset speed detector at the start of each video | |
| detector.reset_speed_detector() | |
| # Initialize speed value with the provided initial speed | |
| current_speed = initial_speed | |
| detector.speed_detector.speed_estimate = initial_speed | |
| # Process each frame | |
| frame_count = 0 | |
| processed_count = 0 | |
| face_detected_count = 0 | |
| drowsy_count = 0 | |
| high_risk_count = 0 | |
| ear_sum = 0 | |
| model_prob_sum = 0 | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| print(f"End of video or error reading frame at frame {frame_count}") | |
| break | |
| frame_count += 1 | |
| # Detect speed from the current frame | |
| current_speed = detector.speed_detector.detect_speed_from_frame(frame) | |
| try: | |
| # Try to process the frame | |
| processed_frame, message = process_image(frame) | |
| # Add speed info to the frame | |
| if processed_frame is not None: | |
| speed_text = f"Speed: {current_speed:.1f} km/h" | |
| cv2.putText(processed_frame, speed_text, (10, processed_frame.shape[0]-45), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1) | |
| # Add speed change score | |
| speed_change_score = detector.speed_detector.get_speed_change_score() | |
| cv2.putText(processed_frame, f"Speed Variation: {speed_change_score:.2f}", | |
| (10, processed_frame.shape[0]-70), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1) | |
| if processed_frame is not None: | |
| out.write(processed_frame) | |
| processed_count += 1 | |
| if "No face detected" not in message: | |
| face_detected_count += 1 | |
| if "Drowsiness" in message: | |
| # Extract drowsiness probability | |
| try: | |
| drowsy_text = message.split("Drowsiness: ")[1].split(",")[0] | |
| drowsy_prob = float(drowsy_text) | |
| # Track drowsiness stats | |
| if drowsy_prob >= 0.7: | |
| drowsy_count += 1 | |
| if drowsy_prob >= 0.85: | |
| high_risk_count += 1 | |
| # Get metrics from the frame | |
| _, _, _, metrics = detector.predict(frame) | |
| if 'ear' in metrics: | |
| ear_sum += metrics['ear'] | |
| if 'model_prob' in metrics: | |
| model_prob_sum += metrics['model_prob'] | |
| except: | |
| pass | |
| else: | |
| # Fallback: If processing fails, just use the original frame | |
| # Add text indicating processing failed | |
| cv2.putText(frame, "Processing failed", (30, 30), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2) | |
| out.write(frame) | |
| processed_count += 1 | |
| print(f"Frame {frame_count}: Processing failed - {message}") | |
| except Exception as e: | |
| # If any error occurs during processing, use original frame | |
| cv2.putText(frame, f"Error: {str(e)[:30]}", (30, 30), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2) | |
| out.write(frame) | |
| processed_count += 1 | |
| print(f"Frame {frame_count}: Exception - {str(e)}") | |
| # Print progress for every 10th frame | |
| if frame_count % 10 == 0: | |
| print(f"Processed {frame_count}/{total_frames} frames") | |
| # Release resources | |
| cap.release() | |
| out.release() | |
| # Calculate statistics | |
| drowsy_percentage = (drowsy_count / face_detected_count * 100) if face_detected_count > 0 else 0 | |
| high_risk_percentage = (high_risk_count / face_detected_count * 100) if face_detected_count > 0 else 0 | |
| avg_ear = ear_sum / face_detected_count if face_detected_count > 0 else 0 | |
| avg_model_prob = model_prob_sum / face_detected_count if face_detected_count > 0 else 0 | |
| speed_score = detector.speed_detector.get_speed_change_score() | |
| # Check if video was created successfully and return it directly | |
| if os.path.exists(temp_output_path) and os.path.getsize(temp_output_path) > 0: | |
| print(f"Video processed successfully with {processed_count} frames") | |
| print(f"Drowsy frames: {drowsy_count} ({drowsy_percentage:.1f}%), High risk frames: {high_risk_count} ({high_risk_percentage:.1f}%)") | |
| print(f"Average eye ratio: {avg_ear:.2f}, Average model probability: {avg_model_prob:.2f}") | |
| print(f"Speed change score: {speed_score:.2f}") | |
| # If model prob is high but eye ratio is also high (open eyes), flag potential false positive | |
| false_positive_warning = "" | |
| if avg_model_prob > 0.8 and avg_ear > 0.25: | |
| false_positive_warning = " ⚠️ Possible false positive (eyes open but model detects drowsiness)" | |
| result_message = (f"Video processed successfully. Frames: {frame_count}, faces detected: {face_detected_count}, " | |
| f"drowsy: {drowsy_count} ({drowsy_percentage:.1f}%), high risk: {high_risk_count} ({high_risk_percentage:.1f}%)." | |
| f" Avg eye ratio: {avg_ear:.2f}, Speed score: {speed_score:.2f}{false_positive_warning}") | |
| # 直接返回文件而不保留它 | |
| video_result = temp_output_path | |
| return video_result, result_message | |
| else: | |
| print(f"Failed to create output video. Frames read: {frame_count}, processed: {processed_count}") | |
| return None, f"Error: Failed to create output video. Frames read: {frame_count}, processed: {processed_count}" | |
| except Exception as e: | |
| import traceback | |
| error_details = traceback.format_exc() | |
| print(f"Error processing video: {str(e)}\n{error_details}") | |
| return None, f"Error processing video: {str(e)}" | |
| finally: | |
| # Clean up resources | |
| if 'out' in locals() and out is not None: | |
| out.release() | |
| if 'cap' in locals() and cap is not None: | |
| cap.release() | |
| # 删除临时输入文件(如果存在) | |
| if temp_input is not None: | |
| try: | |
| os.unlink(temp_input.name) | |
| except: | |
| pass | |
| def process_webcam(image): | |
| """Process webcam input - returns processed image and status message""" | |
| return process_image(image) | |
| # Launch the app | |
| if __name__ == "__main__": | |
| # Parse command line arguments | |
| parser = argparse.ArgumentParser(description="Driver Drowsiness Detection App") | |
| parser.add_argument("--share", action="store_true", help="Create a public link (may trigger security warnings)") | |
| parser.add_argument("--port", type=int, default=7860, help="Port to run the app on") | |
| args = parser.parse_args() | |
| # Print warning if share is enabled | |
| if args.share: | |
| print("WARNING: Running with --share may trigger security warnings on some systems.") | |
| print("The app will be accessible from the internet through a temporary URL.") | |
| # 注册退出时的清理函数 | |
| import atexit | |
| import glob | |
| import shutil | |
| def cleanup_temp_files(): | |
| """清理所有临时文件""" | |
| try: | |
| # 删除所有可能留下的临时文件 | |
| import tempfile | |
| temp_dir = tempfile.gettempdir() | |
| pattern = os.path.join(temp_dir, "tmp*") | |
| for file in glob.glob(pattern): | |
| try: | |
| if os.path.isfile(file): | |
| os.remove(file) | |
| except Exception as e: | |
| print(f"Failed to delete {file}: {e}") | |
| # 确保没有留下.mp4或.avi文件 | |
| for ext in [".mp4", ".avi"]: | |
| pattern = os.path.join(temp_dir, f"*{ext}") | |
| for file in glob.glob(pattern): | |
| try: | |
| os.remove(file) | |
| except Exception as e: | |
| print(f"Failed to delete {file}: {e}") | |
| print("Cleaned up temporary files") | |
| except Exception as e: | |
| print(f"Error during cleanup: {e}") | |
| # 注册清理函数 | |
| atexit.register(cleanup_temp_files) | |
| # Load the model at startup | |
| detector.load_model() | |
| # Create interface | |
| with gr.Blocks(title="Driver Drowsiness Detection") as demo: | |
| gr.Markdown(""" | |
| # 🚗 Driver Drowsiness Detection System | |
| This system detects driver drowsiness using computer vision and deep learning. | |
| ## Features: | |
| - Image analysis | |
| - Video processing with speed monitoring | |
| - Webcam detection (PC and mobile) | |
| - Multi-factor drowsiness prediction (face, eyes, head pose, speed changes) | |
| """) | |
| with gr.Tabs(): | |
| with gr.Tab("Image"): | |
| gr.Markdown("Upload an image for drowsiness detection") | |
| with gr.Row(): | |
| image_input = gr.Image(label="Input Image", type="numpy") | |
| image_output = gr.Image(label="Processed Image") | |
| with gr.Row(): | |
| status_output = gr.Textbox(label="Status") | |
| image_input.change( | |
| fn=process_image, | |
| inputs=[image_input], | |
| outputs=[image_output, status_output] | |
| ) | |
| with gr.Tab("Video"): | |
| gr.Markdown(""" | |
| ### 上傳駕駛視頻進行困倦檢測 | |
| 系統將自動從視頻中檢測以下內容: | |
| - 駕駛員面部表情和眼睛狀態 | |
| - 車輛速度變化 (通過視頻中的光流分析) | |
| - 當車速變化超過 ±5 km/h 時將被視為異常駕駛行為 | |
| **注意:** 處理後的視頻不會保存到本地文件夾,請使用界面右上角的下載按鈕保存結果。 | |
| """) | |
| with gr.Row(): | |
| video_input = gr.Video(label="輸入視頻") | |
| video_output = gr.Video(label="處理後視頻 (點擊右上角下載)") | |
| with gr.Row(): | |
| initial_speed = gr.Slider(minimum=10, maximum=120, value=60, label="初始車速估計值 (km/h)", | |
| info="僅作為初始估計值,系統會自動從視頻中檢測實際速度變化") | |
| with gr.Row(): | |
| video_status = gr.Textbox(label="處理狀態") | |
| with gr.Row(): | |
| process_btn = gr.Button("處理視頻") | |
| clear_btn = gr.Button("清除") | |
| process_btn.click( | |
| fn=process_video, | |
| inputs=[video_input, initial_speed], | |
| outputs=[video_output, video_status] | |
| ) | |
| clear_btn.click( | |
| fn=lambda: (None, "已清除結果"), | |
| inputs=[], | |
| outputs=[video_output, video_status] | |
| ) | |
| with gr.Tab("Webcam"): | |
| gr.Markdown("Use your webcam or mobile camera for real-time drowsiness detection") | |
| with gr.Row(): | |
| webcam_input = gr.Image(source="webcam", streaming=True, label="Camera Feed", type="numpy") | |
| webcam_output = gr.Image(label="Processed Feed") | |
| with gr.Row(): | |
| speed_input = gr.Slider(minimum=0, maximum=150, value=60, label="Current Speed (km/h)") | |
| update_speed_btn = gr.Button("Update Speed") | |
| with gr.Row(): | |
| webcam_status = gr.Textbox(label="Status") | |
| def process_webcam_with_speed(image, speed): | |
| detector.update_speed(speed) | |
| return process_image(image) | |
| update_speed_btn.click( | |
| fn=lambda speed: f"Speed updated to {speed} km/h", | |
| inputs=[speed_input], | |
| outputs=[webcam_status] | |
| ) | |
| webcam_input.change( | |
| fn=process_webcam_with_speed, | |
| inputs=[webcam_input, speed_input], | |
| outputs=[webcam_output, webcam_status] | |
| ) | |
| gr.Markdown(""" | |
| ## How It Works | |
| This system detects drowsiness using multiple factors: | |
| 1. **Facial features** - Using a trained CNN model | |
| 2. **Eye openness** - Measuring eye aspect ratio (EAR) | |
| 3. **Head position** - Detecting head drooping | |
| 4. **Automatic speed detection** - Using optical flow analysis to track vehicle movement and detect irregular speed changes | |
| The system automatically detects speed changes from the video frames using computer vision techniques: | |
| - **Optical flow** is used to track movement between frames | |
| - **Irregular speed changes** (±5 km/h) are detected as potential signs of drowsy driving | |
| - **No external speed data required** - everything is analyzed directly from the video content | |
| Combining these factors provides more reliable drowsiness detection than using facial features alone. | |
| """) | |
| # Launch the app | |
| demo.launch(share=args.share, server_port=args.port) |