Spaces:
Build error
Build error
| import os | |
| import cv2 | |
| import numpy as np | |
| from speed_detector import SpeedDetector | |
| from face_analyzer import FaceAnalyzer | |
| import pandas as pd | |
| import time | |
| class DrowsinessDetector: | |
| def __init__(self): | |
| self.speed_detector = SpeedDetector() | |
| self.face_analyzer = FaceAnalyzer() | |
| def process_frame(self, frame_path, face_path): | |
| """ | |
| Process a single frame | |
| :param frame_path: Path to scene image | |
| :param face_path: Path to face image | |
| :return: (speed, is_drowsy) | |
| """ | |
| try: | |
| # Read images | |
| frame = cv2.imread(frame_path) | |
| face = cv2.imread(face_path) | |
| if frame is None or face is None: | |
| print(f"Error processing {os.path.basename(frame_path)}: Unable to read image") | |
| return None, None | |
| # Detect speed | |
| speed = self.speed_detector.detect_speed(frame) | |
| # Detect drowsiness | |
| is_drowsy = self.face_analyzer.is_drowsy(face) | |
| return speed, is_drowsy | |
| except Exception as e: | |
| print(f"Error processing {os.path.basename(frame_path)}: {str(e)}") | |
| return None, None | |
| def process_video_folder(self, folder_path): | |
| """ | |
| Process all frames in a video folder | |
| :param folder_path: Path to video folder | |
| :return: Processing results list | |
| """ | |
| results = [] | |
| # Get all frame images | |
| frame_files = [f for f in os.listdir(folder_path) if f.endswith('.jpg') and not f.endswith('_face.jpg')] | |
| total_frames = len(frame_files) | |
| for i, frame_file in enumerate(frame_files, 1): | |
| # Build full file path | |
| frame_path = os.path.join(folder_path, frame_file) | |
| face_path = os.path.join(folder_path, frame_file.replace('.jpg', '_face.jpg')) | |
| # Show progress | |
| print(f"\rProcessing progress: {i}/{total_frames} ({i/total_frames*100:.1f}%)", end="") | |
| try: | |
| speed, is_drowsy = self.process_frame(frame_path, face_path) | |
| if speed is not None and is_drowsy is not None: | |
| results.append({ | |
| 'frame': frame_file, | |
| 'speed': speed, | |
| 'is_drowsy': is_drowsy | |
| }) | |
| except KeyboardInterrupt: | |
| print("\nInterrupt detected, saving current results...") | |
| return results | |
| except Exception as e: | |
| print(f"\nError processing {frame_file}: {str(e)}") | |
| continue | |
| print() # New line | |
| return results | |
| def main(): | |
| # Initialize detector | |
| detector = DrowsinessDetector() | |
| # Get all video folders | |
| dataset_path = os.path.join('dataset', 'driver') | |
| video_folders = [f for f in os.listdir(dataset_path) if os.path.isdir(os.path.join(dataset_path, f))] | |
| total_folders = len(video_folders) | |
| all_results = [] | |
| batch_size = 100 # Save results after processing 100 folders | |
| try: | |
| # Process each video folder | |
| for i, folder in enumerate(video_folders, 1): | |
| print(f"\nProcessing folder {i}/{total_folders}: {folder}") | |
| folder_path = os.path.join(dataset_path, folder) | |
| results = detector.process_video_folder(folder_path) | |
| all_results.extend(results) | |
| # Save results after processing each batch of folders | |
| if i % batch_size == 0 or i == total_folders: | |
| print(f"\nSaving results for batch {i//batch_size + 1}...") | |
| df = pd.DataFrame(all_results) | |
| df.to_csv(f'drowsiness_results_batch_{i//batch_size + 1}.csv', index=False) | |
| all_results = [] # Clear results list | |
| except KeyboardInterrupt: | |
| print("\nInterrupt detected, saving current results...") | |
| if all_results: | |
| df = pd.DataFrame(all_results) | |
| df.to_csv('drowsiness_results_final.csv', index=False) | |
| print("The result has been saved to drowsiness_results_final.csv") | |
| except Exception as e: | |
| print(f"\nA error occurred: {str(e)}") | |
| if all_results: | |
| df = pd.DataFrame(all_results) | |
| df.to_csv('drowsiness_results_error.csv', index=False) | |
| print("Results saved to drawsiness_results_error.csv") | |
| finally: | |
| print("\nProcessing completed") | |
| if __name__ == "__main__": | |
| main() | |