import os import cv2 import numpy as np from speed_detector import SpeedDetector from face_analyzer import FaceAnalyzer import pandas as pd import time class DrowsinessDetector: def __init__(self): self.speed_detector = SpeedDetector() self.face_analyzer = FaceAnalyzer() def process_frame(self, frame_path, face_path): """ Process a single frame :param frame_path: Path to scene image :param face_path: Path to face image :return: (speed, is_drowsy) """ try: # Read images frame = cv2.imread(frame_path) face = cv2.imread(face_path) if frame is None or face is None: print(f"Error processing {os.path.basename(frame_path)}: Unable to read image") return None, None # Detect speed speed = self.speed_detector.detect_speed(frame) # Detect drowsiness is_drowsy = self.face_analyzer.is_drowsy(face) return speed, is_drowsy except Exception as e: print(f"Error processing {os.path.basename(frame_path)}: {str(e)}") return None, None def process_video_folder(self, folder_path): """ Process all frames in a video folder :param folder_path: Path to video folder :return: Processing results list """ results = [] # Get all frame images frame_files = [f for f in os.listdir(folder_path) if f.endswith('.jpg') and not f.endswith('_face.jpg')] total_frames = len(frame_files) for i, frame_file in enumerate(frame_files, 1): # Build full file path frame_path = os.path.join(folder_path, frame_file) face_path = os.path.join(folder_path, frame_file.replace('.jpg', '_face.jpg')) # Show progress print(f"\rProcessing progress: {i}/{total_frames} ({i/total_frames*100:.1f}%)", end="") try: speed, is_drowsy = self.process_frame(frame_path, face_path) if speed is not None and is_drowsy is not None: results.append({ 'frame': frame_file, 'speed': speed, 'is_drowsy': is_drowsy }) except KeyboardInterrupt: print("\nInterrupt detected, saving current results...") return results except Exception as e: print(f"\nError processing {frame_file}: {str(e)}") continue print() # New line return results def main(): # Initialize detector detector = DrowsinessDetector() # Get all video folders dataset_path = os.path.join('dataset', 'driver') video_folders = [f for f in os.listdir(dataset_path) if os.path.isdir(os.path.join(dataset_path, f))] total_folders = len(video_folders) all_results = [] batch_size = 100 # Save results after processing 100 folders try: # Process each video folder for i, folder in enumerate(video_folders, 1): print(f"\nProcessing folder {i}/{total_folders}: {folder}") folder_path = os.path.join(dataset_path, folder) results = detector.process_video_folder(folder_path) all_results.extend(results) # Save results after processing each batch of folders if i % batch_size == 0 or i == total_folders: print(f"\nSaving results for batch {i//batch_size + 1}...") df = pd.DataFrame(all_results) df.to_csv(f'drowsiness_results_batch_{i//batch_size + 1}.csv', index=False) all_results = [] # Clear results list except KeyboardInterrupt: print("\nInterrupt detected, saving current results...") if all_results: df = pd.DataFrame(all_results) df.to_csv('drowsiness_results_final.csv', index=False) print("The result has been saved to drowsiness_results_final.csv") except Exception as e: print(f"\nA error occurred: {str(e)}") if all_results: df = pd.DataFrame(all_results) df.to_csv('drowsiness_results_error.csv', index=False) print("Results saved to drawsiness_results_error.csv") finally: print("\nProcessing completed") if __name__ == "__main__": main()