ckcl commited on
Commit
13c2616
·
verified ·
1 Parent(s): f45f602

Update drowsiness_detector.py

Browse files
Files changed (1) hide show
  1. drowsiness_detector.py +121 -121
drowsiness_detector.py CHANGED
@@ -1,122 +1,122 @@
1
- import os
2
- import cv2
3
- import numpy as np
4
- from speed_detector import SpeedDetector
5
- from face_analyzer import FaceAnalyzer
6
- import pandas as pd
7
- import time
8
-
9
- class DrowsinessDetector:
10
- def __init__(self):
11
- self.speed_detector = SpeedDetector()
12
- self.face_analyzer = FaceAnalyzer()
13
-
14
- def process_frame(self, frame_path, face_path):
15
- """
16
- 處理單個幀
17
- :param frame_path: 場景圖片路徑
18
- :param face_path: 人臉圖片路徑
19
- :return: (速度, 是否犯困)
20
- """
21
- try:
22
- # 讀取圖片
23
- frame = cv2.imread(frame_path)
24
- face = cv2.imread(face_path)
25
-
26
- if frame is None or face is None:
27
- print(f"處理 {os.path.basename(frame_path)} 時出錯: 無法讀取圖片")
28
- return None, None
29
-
30
- # 檢測速度
31
- speed = self.speed_detector.detect_speed(frame)
32
-
33
- # 檢測是否犯困
34
- is_drowsy = self.face_analyzer.is_drowsy(face)
35
-
36
- return speed, is_drowsy
37
- except Exception as e:
38
- print(f"處理 {os.path.basename(frame_path)} 時出錯: {str(e)}")
39
- return None, None
40
-
41
- def process_video_folder(self, folder_path):
42
- """
43
- 處理一個視頻文件夾中的所有幀
44
- :param folder_path: 視頻文件夾路徑
45
- :return: 處理結果列表
46
- """
47
- results = []
48
-
49
- # 獲取所有幀圖片
50
- frame_files = [f for f in os.listdir(folder_path) if f.endswith('.jpg') and not f.endswith('_face.jpg')]
51
- total_frames = len(frame_files)
52
-
53
- for i, frame_file in enumerate(frame_files, 1):
54
- # 構建完整的文件路徑
55
- frame_path = os.path.join(folder_path, frame_file)
56
- face_path = os.path.join(folder_path, frame_file.replace('.jpg', '_face.jpg'))
57
-
58
- # 顯示進度
59
- print(f"\r處理進度: {i}/{total_frames} ({i/total_frames*100:.1f}%)", end="")
60
-
61
- try:
62
- speed, is_drowsy = self.process_frame(frame_path, face_path)
63
- if speed is not None and is_drowsy is not None:
64
- results.append({
65
- 'frame': frame_file,
66
- 'speed': speed,
67
- 'is_drowsy': is_drowsy
68
- })
69
- except KeyboardInterrupt:
70
- print("\n檢測到中斷,保存當前結果...")
71
- return results
72
- except Exception as e:
73
- print(f"\n處理 {frame_file} 時出錯: {str(e)}")
74
- continue
75
-
76
- print() # 換行
77
- return results
78
-
79
- def main():
80
- # 初始化檢測器
81
- detector = DrowsinessDetector()
82
-
83
- # 獲取所有視頻文件夾
84
- dataset_path = os.path.join('dataset', 'driver')
85
- video_folders = [f for f in os.listdir(dataset_path) if os.path.isdir(os.path.join(dataset_path, f))]
86
- total_folders = len(video_folders)
87
-
88
- all_results = []
89
- batch_size = 100 # 每處理100個文件夾保存一次結果
90
-
91
- try:
92
- # 處理每個視頻文件夾
93
- for i, folder in enumerate(video_folders, 1):
94
- print(f"\n處理文件夾 {i}/{total_folders}: {folder}")
95
- folder_path = os.path.join(dataset_path, folder)
96
- results = detector.process_video_folder(folder_path)
97
- all_results.extend(results)
98
-
99
- # 每處理完一批文件夾就保存一次結果
100
- if i % batch_size == 0 or i == total_folders:
101
- print(f"\n保存第 {i//batch_size + 1} 批結果...")
102
- df = pd.DataFrame(all_results)
103
- df.to_csv(f'drowsiness_results_batch_{i//batch_size + 1}.csv', index=False)
104
- all_results = [] # 清空結果列表
105
-
106
- except KeyboardInterrupt:
107
- print("\n檢測到中斷,保存當前結果...")
108
- if all_results:
109
- df = pd.DataFrame(all_results)
110
- df.to_csv('drowsiness_results_final.csv', index=False)
111
- print("結果已保存到 drowsiness_results_final.csv")
112
- except Exception as e:
113
- print(f"\n發生錯誤: {str(e)}")
114
- if all_results:
115
- df = pd.DataFrame(all_results)
116
- df.to_csv('drowsiness_results_error.csv', index=False)
117
- print("結果已保存到 drowsiness_results_error.csv")
118
- finally:
119
- print("\n處理完成")
120
-
121
- if __name__ == "__main__":
122
  main()
 
1
+ import os
2
+ import cv2
3
+ import numpy as np
4
+ from speed_detector import SpeedDetector
5
+ from face_analyzer import FaceAnalyzer
6
+ import pandas as pd
7
+ import time
8
+
9
+ class DrowsinessDetector:
10
+ def __init__(self):
11
+ self.speed_detector = SpeedDetector()
12
+ self.face_analyzer = FaceAnalyzer()
13
+
14
+ def process_frame(self, frame_path, face_path):
15
+ """
16
+ Process a single frame
17
+ :param frame_path: Path to scene image
18
+ :param face_path: Path to face image
19
+ :return: (speed, is_drowsy)
20
+ """
21
+ try:
22
+ # Read images
23
+ frame = cv2.imread(frame_path)
24
+ face = cv2.imread(face_path)
25
+
26
+ if frame is None or face is None:
27
+ print(f"Error processing {os.path.basename(frame_path)}: Unable to read image")
28
+ return None, None
29
+
30
+ # Detect speed
31
+ speed = self.speed_detector.detect_speed(frame)
32
+
33
+ # Detect drowsiness
34
+ is_drowsy = self.face_analyzer.is_drowsy(face)
35
+
36
+ return speed, is_drowsy
37
+ except Exception as e:
38
+ print(f"Error processing {os.path.basename(frame_path)}: {str(e)}")
39
+ return None, None
40
+
41
+ def process_video_folder(self, folder_path):
42
+ """
43
+ Process all frames in a video folder
44
+ :param folder_path: Path to video folder
45
+ :return: Processing results list
46
+ """
47
+ results = []
48
+
49
+ # Get all frame images
50
+ frame_files = [f for f in os.listdir(folder_path) if f.endswith('.jpg') and not f.endswith('_face.jpg')]
51
+ total_frames = len(frame_files)
52
+
53
+ for i, frame_file in enumerate(frame_files, 1):
54
+ # Build full file path
55
+ frame_path = os.path.join(folder_path, frame_file)
56
+ face_path = os.path.join(folder_path, frame_file.replace('.jpg', '_face.jpg'))
57
+
58
+ # Show progress
59
+ print(f"\rProcessing progress: {i}/{total_frames} ({i/total_frames*100:.1f}%)", end="")
60
+
61
+ try:
62
+ speed, is_drowsy = self.process_frame(frame_path, face_path)
63
+ if speed is not None and is_drowsy is not None:
64
+ results.append({
65
+ 'frame': frame_file,
66
+ 'speed': speed,
67
+ 'is_drowsy': is_drowsy
68
+ })
69
+ except KeyboardInterrupt:
70
+ print("\nInterrupt detected, saving current results...")
71
+ return results
72
+ except Exception as e:
73
+ print(f"\nError processing {frame_file}: {str(e)}")
74
+ continue
75
+
76
+ print() # New line
77
+ return results
78
+
79
+ def main():
80
+ # Initialize detector
81
+ detector = DrowsinessDetector()
82
+
83
+ # Get all video folders
84
+ dataset_path = os.path.join('dataset', 'driver')
85
+ video_folders = [f for f in os.listdir(dataset_path) if os.path.isdir(os.path.join(dataset_path, f))]
86
+ total_folders = len(video_folders)
87
+
88
+ all_results = []
89
+ batch_size = 100 # Save results after processing 100 folders
90
+
91
+ try:
92
+ # Process each video folder
93
+ for i, folder in enumerate(video_folders, 1):
94
+ print(f"\nProcessing folder {i}/{total_folders}: {folder}")
95
+ folder_path = os.path.join(dataset_path, folder)
96
+ results = detector.process_video_folder(folder_path)
97
+ all_results.extend(results)
98
+
99
+ # Save results after processing each batch of folders
100
+ if i % batch_size == 0 or i == total_folders:
101
+ print(f"\nSaving results for batch {i//batch_size + 1}...")
102
+ df = pd.DataFrame(all_results)
103
+ df.to_csv(f'drowsiness_results_batch_{i//batch_size + 1}.csv', index=False)
104
+ all_results = [] # Clear results list
105
+
106
+ except KeyboardInterrupt:
107
+ print("\nInterrupt detected, saving current results...")
108
+ if all_results:
109
+ df = pd.DataFrame(all_results)
110
+ df.to_csv('drowsiness_results_final.csv', index=False)
111
+ print("The result has been saved to drowsiness_results_final.csv")
112
+ except Exception as e:
113
+ print(f"\nA error occurred: {str(e)}")
114
+ if all_results:
115
+ df = pd.DataFrame(all_results)
116
+ df.to_csv('drowsiness_results_error.csv', index=False)
117
+ print("Results saved to drawsiness_results_error.csv")
118
+ finally:
119
+ print("\nProcessing completed")
120
+
121
+ if __name__ == "__main__":
122
  main()