File size: 8,120 Bytes
e52a7fb 8d70ec1 e52a7fb 7137256 e52a7fb 8d70ec1 e52a7fb 7137256 e52a7fb 7137256 e52a7fb 7137256 e52a7fb 7137256 e52a7fb 7137256 8d70ec1 7137256 8d70ec1 7137256 e52a7fb 7137256 e52a7fb 7137256 e52a7fb 7137256 e52a7fb 7137256 e52a7fb | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 | import os
import csv
import shutil # Added to copy files
from dataclasses import dataclass
import model.video_classifier as classifier
device = 'cpu'
model_path = './model/best.pt'
video_folder = './videos'
output_folder = './results'
# Ensure the directories exist
os.makedirs(video_folder, exist_ok=True)
os.makedirs(output_folder, exist_ok=True)
model = classifier.get_model(model_path)
@dataclass
class VideoResult:
video_path: str
label_counts: dict
crash_events: list
video_name: str
is_simulation: int
def result_csv(student, simulation_results, real_results):
results = simulation_results + real_results
file_name = 'crash_results.csv'
csv_file = os.path.join(output_folder, file_name)
csv_columns = [
'Student', 'VideoFileName', 'IsSimulation', 'UniqueCrashes', 'CrashFrames', 'FlightFrames',
'NoDroneFrames', 'NoSignalFrames', 'NoStartedFrames', 'StartedFrames', 'UnstableFrames',
'LandingFrames', 'CrashTimes', 'CrashDurations'
]
with open(csv_file, 'w', newline='') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=csv_columns)
writer.writeheader()
for result in results:
unique_crashes = len(result.crash_events)
crash_times = ''
crash_durations = ''
if unique_crashes > 0:
for j, (start, end) in enumerate(result.crash_events, start=1):
start_formatted = classifier.format_time(start)
end_formatted = classifier.format_time(end)
duration = end - start
crash_times += f"[{start_formatted},{end_formatted}]"
crash_durations += f"{duration:.2f}"
if unique_crashes - j != 0:
crash_times += ","
crash_durations += ","
else:
crash_times = None
crash_durations = None
writer.writerow({
'Student': student,
'VideoFileName': result.video_name,
'IsSimulation': result.is_simulation,
'UniqueCrashes': unique_crashes,
'CrashFrames': result.label_counts.get("Crash", 0),
'FlightFrames': result.label_counts.get("Flight", 0),
'NoDroneFrames': result.label_counts.get("No drone", 0),
'NoSignalFrames': result.label_counts.get("No signal", 0),
'NoStartedFrames': result.label_counts.get("No started", 0),
'StartedFrames': result.label_counts.get("Started", 0),
'UnstableFrames': result.label_counts.get("Unstable", 0),
'LandingFrames': result.label_counts.get("Landing", 0),
'CrashTimes': crash_times,
'CrashDurations': crash_durations
})
return csv_file
def remove_videos_and_csv(folder_path):
video_extensions = {'.mp4'}
csv_extension = '.csv'
for filename in os.listdir(folder_path):
file_path = os.path.join(folder_path, filename)
if os.path.isfile(file_path):
_, extension = os.path.splitext(filename)
if extension.lower() in video_extensions or extension.lower() == csv_extension:
os.remove(file_path)
def delete_uploaded_videos(simulation_paths, real_paths):
all_video_paths = simulation_paths + real_paths
for video_file in all_video_paths:
if os.path.exists(video_file):
os.remove(video_file)
def backend(student_name_tb, simulation_files, real_files, frame_output, text_output, file_output, video_output):
yield frame_output, "Starting process...", file_output, video_output
remove_videos_and_csv(output_folder)
yield frame_output, "Processing Input...", file_output, video_output
name = student_name_tb.strip()
simulation_paths = []
real_paths = []
# Ensure the video_folder exists
os.makedirs(video_folder, exist_ok=True)
# Store original filenames
simulation_filenames = [os.path.basename(file_path) for file_path in simulation_files]
real_filenames = [os.path.basename(file_path) for file_path in real_files]
# Save the uploaded simulation videos to the 'video_folder'
for i, file_path in enumerate(simulation_files, start=1):
ext = os.path.splitext(file_path)[1]
sim_video_path = os.path.join(video_folder, f'sim_video{i}{ext}')
shutil.copy(file_path, sim_video_path)
simulation_paths.append(sim_video_path)
# Save the uploaded real videos to the 'video_folder'
for i, file_path in enumerate(real_files, start=1):
ext = os.path.splitext(file_path)[1]
real_video_path = os.path.join(video_folder, f'real_video{i}{ext}')
shutil.copy(file_path, real_video_path)
real_paths.append(real_video_path)
simulation_results = []
real_results = []
output_video_paths = []
# Process simulation videos
for i, path in enumerate(simulation_paths, start=1):
# Setup output paths
crash_video_filename = f'Simulation_Video_{i}_Crashes.mp4'
crash_video_path = os.path.join(output_folder, crash_video_filename)
labeled_video_filename = f'Simulation_Video_{i}_Labeled.mp4'
labeled_video_path = os.path.join(output_folder, labeled_video_filename)
output_video_paths.extend([crash_video_path, labeled_video_path])
yield frame_output, f"Processing Simulation Video #{i}...", file_output, video_output
video_gen = classifier.video_classification(
path, labeled_video_path, crash_video_path, model, min_crash_duration=2.0)
for item in video_gen:
if item['type'] == 'frame':
frame_out = item['frame']
progress_text = item['progress_text']
yield frame_out, progress_text, file_output, video_output
elif item['type'] == 'results':
label_counts = item['label_counts']
crash_events = item['crash_events']
simulation_results.append(VideoResult(
video_path=path,
label_counts=label_counts,
crash_events=crash_events,
video_name=simulation_filenames[i - 1],
is_simulation=1))
break
# Process real videos
for i, path in enumerate(real_paths, start=1):
crash_video_filename = f'Real_Video_{i}_Crashes.mp4'
crash_video_path = os.path.join(output_folder, crash_video_filename)
labeled_video_filename = f'Real_Video_{i}_Labeled.mp4'
labeled_video_path = os.path.join(output_folder, labeled_video_filename)
output_video_paths.extend([crash_video_path, labeled_video_path])
yield frame_output, f"Processing Real Video #{i}...", file_output, video_output
video_gen = classifier.video_classification(
path, labeled_video_path, crash_video_path, model, min_crash_duration=2.0)
for item in video_gen:
if item['type'] == 'frame':
frame_out = item['frame']
progress_text = item['progress_text']
yield frame_out, progress_text, file_output, video_output
elif item['type'] == 'results':
label_counts = item['label_counts']
crash_events = item['crash_events']
real_results.append(VideoResult(
video_path=path,
label_counts=label_counts,
crash_events=crash_events,
video_name=real_filenames[i - 1],
is_simulation=0))
break
yield frame_output, "Outputting Results to CSV...", file_output, video_output
csv_file = result_csv(name, simulation_results, real_results)
yield frame_output, "Finishing process...", file_output, video_output
# Delete the uploaded video files from the server
delete_uploaded_videos(simulation_paths, real_paths)
yield frame_output, "Download Ready.", csv_file, output_video_paths
|