|
|
import os |
|
|
import cv2 |
|
|
import numpy as np |
|
|
from speed_detector import SpeedDetector |
|
|
from face_analyzer import FaceAnalyzer |
|
|
import pandas as pd |
|
|
import time |
|
|
|
|
|
class DrowsinessDetector: |
|
|
def __init__(self): |
|
|
self.speed_detector = SpeedDetector() |
|
|
self.face_analyzer = FaceAnalyzer() |
|
|
|
|
|
def process_frame(self, frame_path, face_path): |
|
|
""" |
|
|
Process a single frame |
|
|
:param frame_path: Path to scene image |
|
|
:param face_path: Path to face image |
|
|
:return: (speed, is_drowsy) |
|
|
""" |
|
|
try: |
|
|
|
|
|
frame = cv2.imread(frame_path) |
|
|
face = cv2.imread(face_path) |
|
|
|
|
|
if frame is None or face is None: |
|
|
print(f"Error processing {os.path.basename(frame_path)}: Unable to read image") |
|
|
return None, None |
|
|
|
|
|
|
|
|
speed = self.speed_detector.detect_speed(frame) |
|
|
|
|
|
|
|
|
is_drowsy = self.face_analyzer.is_drowsy(face) |
|
|
|
|
|
return speed, is_drowsy |
|
|
except Exception as e: |
|
|
print(f"Error processing {os.path.basename(frame_path)}: {str(e)}") |
|
|
return None, None |
|
|
|
|
|
def process_video_folder(self, folder_path): |
|
|
""" |
|
|
Process all frames in a video folder |
|
|
:param folder_path: Path to video folder |
|
|
:return: Processing results list |
|
|
""" |
|
|
results = [] |
|
|
|
|
|
|
|
|
frame_files = [f for f in os.listdir(folder_path) if f.endswith('.jpg') and not f.endswith('_face.jpg')] |
|
|
total_frames = len(frame_files) |
|
|
|
|
|
for i, frame_file in enumerate(frame_files, 1): |
|
|
|
|
|
frame_path = os.path.join(folder_path, frame_file) |
|
|
face_path = os.path.join(folder_path, frame_file.replace('.jpg', '_face.jpg')) |
|
|
|
|
|
|
|
|
print(f"\rProcessing progress: {i}/{total_frames} ({i/total_frames*100:.1f}%)", end="") |
|
|
|
|
|
try: |
|
|
speed, is_drowsy = self.process_frame(frame_path, face_path) |
|
|
if speed is not None and is_drowsy is not None: |
|
|
results.append({ |
|
|
'frame': frame_file, |
|
|
'speed': speed, |
|
|
'is_drowsy': is_drowsy |
|
|
}) |
|
|
except KeyboardInterrupt: |
|
|
print("\nInterrupt detected, saving current results...") |
|
|
return results |
|
|
except Exception as e: |
|
|
print(f"\nError processing {frame_file}: {str(e)}") |
|
|
continue |
|
|
|
|
|
print() |
|
|
return results |
|
|
|
|
|
def main(): |
|
|
|
|
|
detector = DrowsinessDetector() |
|
|
|
|
|
|
|
|
dataset_path = os.path.join('dataset', 'driver') |
|
|
video_folders = [f for f in os.listdir(dataset_path) if os.path.isdir(os.path.join(dataset_path, f))] |
|
|
total_folders = len(video_folders) |
|
|
|
|
|
all_results = [] |
|
|
batch_size = 100 |
|
|
|
|
|
try: |
|
|
|
|
|
for i, folder in enumerate(video_folders, 1): |
|
|
print(f"\nProcessing folder {i}/{total_folders}: {folder}") |
|
|
folder_path = os.path.join(dataset_path, folder) |
|
|
results = detector.process_video_folder(folder_path) |
|
|
all_results.extend(results) |
|
|
|
|
|
|
|
|
if i % batch_size == 0 or i == total_folders: |
|
|
print(f"\nSaving results for batch {i//batch_size + 1}...") |
|
|
df = pd.DataFrame(all_results) |
|
|
df.to_csv(f'drowsiness_results_batch_{i//batch_size + 1}.csv', index=False) |
|
|
all_results = [] |
|
|
|
|
|
except KeyboardInterrupt: |
|
|
print("\nInterrupt detected, saving current results...") |
|
|
if all_results: |
|
|
df = pd.DataFrame(all_results) |
|
|
df.to_csv('drowsiness_results_final.csv', index=False) |
|
|
print("The result has been saved to drowsiness_results_final.csv") |
|
|
except Exception as e: |
|
|
print(f"\nA error occurred: {str(e)}") |
|
|
if all_results: |
|
|
df = pd.DataFrame(all_results) |
|
|
df.to_csv('drowsiness_results_error.csv', index=False) |
|
|
print("Results saved to drawsiness_results_error.csv") |
|
|
finally: |
|
|
print("\nProcessing completed") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |