| import os |
| import csv |
| import shutil |
| from dataclasses import dataclass |
| import model.video_classifier as classifier |
|
|
| device = 'cpu' |
| model_path = './model/best.pt' |
| video_folder = './videos' |
| output_folder = './results' |
|
|
| |
| os.makedirs(video_folder, exist_ok=True) |
| os.makedirs(output_folder, exist_ok=True) |
|
|
| model = classifier.get_model(model_path) |
|
|
| @dataclass |
| class VideoResult: |
| video_path: str |
| label_counts: dict |
| crash_events: list |
| video_name: str |
| is_simulation: int |
|
|
| def result_csv(student, simulation_results, real_results): |
| results = simulation_results + real_results |
| file_name = 'crash_results.csv' |
| csv_file = os.path.join(output_folder, file_name) |
| csv_columns = [ |
| 'Student', 'VideoFileName', 'IsSimulation', 'UniqueCrashes', 'CrashFrames', 'FlightFrames', |
| 'NoDroneFrames', 'NoSignalFrames', 'NoStartedFrames', 'StartedFrames', 'UnstableFrames', |
| 'LandingFrames', 'CrashTimes', 'CrashDurations' |
| ] |
| with open(csv_file, 'w', newline='') as csvfile: |
| writer = csv.DictWriter(csvfile, fieldnames=csv_columns) |
| writer.writeheader() |
| for result in results: |
| unique_crashes = len(result.crash_events) |
| crash_times = '' |
| crash_durations = '' |
| if unique_crashes > 0: |
| for j, (start, end) in enumerate(result.crash_events, start=1): |
| start_formatted = classifier.format_time(start) |
| end_formatted = classifier.format_time(end) |
| duration = end - start |
| crash_times += f"[{start_formatted},{end_formatted}]" |
| crash_durations += f"{duration:.2f}" |
| if unique_crashes - j != 0: |
| crash_times += "," |
| crash_durations += "," |
| else: |
| crash_times = None |
| crash_durations = None |
|
|
| writer.writerow({ |
| 'Student': student, |
| 'VideoFileName': result.video_name, |
| 'IsSimulation': result.is_simulation, |
| 'UniqueCrashes': unique_crashes, |
| 'CrashFrames': result.label_counts.get("Crash", 0), |
| 'FlightFrames': result.label_counts.get("Flight", 0), |
| 'NoDroneFrames': result.label_counts.get("No drone", 0), |
| 'NoSignalFrames': result.label_counts.get("No signal", 0), |
| 'NoStartedFrames': result.label_counts.get("No started", 0), |
| 'StartedFrames': result.label_counts.get("Started", 0), |
| 'UnstableFrames': result.label_counts.get("Unstable", 0), |
| 'LandingFrames': result.label_counts.get("Landing", 0), |
| 'CrashTimes': crash_times, |
| 'CrashDurations': crash_durations |
| }) |
| return csv_file |
|
|
| def remove_videos_and_csv(folder_path): |
| video_extensions = {'.mp4'} |
| csv_extension = '.csv' |
| for filename in os.listdir(folder_path): |
| file_path = os.path.join(folder_path, filename) |
| if os.path.isfile(file_path): |
| _, extension = os.path.splitext(filename) |
| if extension.lower() in video_extensions or extension.lower() == csv_extension: |
| os.remove(file_path) |
|
|
| def delete_uploaded_videos(simulation_paths, real_paths): |
| all_video_paths = simulation_paths + real_paths |
| for video_file in all_video_paths: |
| if os.path.exists(video_file): |
| os.remove(video_file) |
|
|
| def backend(student_name_tb, simulation_files, real_files, frame_output, text_output, file_output, video_output): |
| yield frame_output, "Starting process...", file_output, video_output |
| remove_videos_and_csv(output_folder) |
| yield frame_output, "Processing Input...", file_output, video_output |
| name = student_name_tb.strip() |
| simulation_paths = [] |
| real_paths = [] |
|
|
| |
| os.makedirs(video_folder, exist_ok=True) |
|
|
| |
| simulation_filenames = [os.path.basename(file_path) for file_path in simulation_files] |
| real_filenames = [os.path.basename(file_path) for file_path in real_files] |
|
|
| |
| for i, file_path in enumerate(simulation_files, start=1): |
| ext = os.path.splitext(file_path)[1] |
| sim_video_path = os.path.join(video_folder, f'sim_video{i}{ext}') |
| shutil.copy(file_path, sim_video_path) |
| simulation_paths.append(sim_video_path) |
|
|
| |
| for i, file_path in enumerate(real_files, start=1): |
| ext = os.path.splitext(file_path)[1] |
| real_video_path = os.path.join(video_folder, f'real_video{i}{ext}') |
| shutil.copy(file_path, real_video_path) |
| real_paths.append(real_video_path) |
|
|
| simulation_results = [] |
| real_results = [] |
| output_video_paths = [] |
|
|
| |
| for i, path in enumerate(simulation_paths, start=1): |
| |
| crash_video_filename = f'Simulation_Video_{i}_Crashes.mp4' |
| crash_video_path = os.path.join(output_folder, crash_video_filename) |
| labeled_video_filename = f'Simulation_Video_{i}_Labeled.mp4' |
| labeled_video_path = os.path.join(output_folder, labeled_video_filename) |
| output_video_paths.extend([crash_video_path, labeled_video_path]) |
|
|
| yield frame_output, f"Processing Simulation Video #{i}...", file_output, video_output |
|
|
| video_gen = classifier.video_classification( |
| path, labeled_video_path, crash_video_path, model, min_crash_duration=2.0) |
|
|
| for item in video_gen: |
| if item['type'] == 'frame': |
| frame_out = item['frame'] |
| progress_text = item['progress_text'] |
| yield frame_out, progress_text, file_output, video_output |
| elif item['type'] == 'results': |
| label_counts = item['label_counts'] |
| crash_events = item['crash_events'] |
| simulation_results.append(VideoResult( |
| video_path=path, |
| label_counts=label_counts, |
| crash_events=crash_events, |
| video_name=simulation_filenames[i - 1], |
| is_simulation=1)) |
| break |
|
|
| |
| for i, path in enumerate(real_paths, start=1): |
| crash_video_filename = f'Real_Video_{i}_Crashes.mp4' |
| crash_video_path = os.path.join(output_folder, crash_video_filename) |
| labeled_video_filename = f'Real_Video_{i}_Labeled.mp4' |
| labeled_video_path = os.path.join(output_folder, labeled_video_filename) |
| output_video_paths.extend([crash_video_path, labeled_video_path]) |
|
|
| yield frame_output, f"Processing Real Video #{i}...", file_output, video_output |
|
|
| video_gen = classifier.video_classification( |
| path, labeled_video_path, crash_video_path, model, min_crash_duration=2.0) |
|
|
| for item in video_gen: |
| if item['type'] == 'frame': |
| frame_out = item['frame'] |
| progress_text = item['progress_text'] |
| yield frame_out, progress_text, file_output, video_output |
| elif item['type'] == 'results': |
| label_counts = item['label_counts'] |
| crash_events = item['crash_events'] |
| real_results.append(VideoResult( |
| video_path=path, |
| label_counts=label_counts, |
| crash_events=crash_events, |
| video_name=real_filenames[i - 1], |
| is_simulation=0)) |
| break |
|
|
| yield frame_output, "Outputting Results to CSV...", file_output, video_output |
| csv_file = result_csv(name, simulation_results, real_results) |
| yield frame_output, "Finishing process...", file_output, video_output |
| |
| delete_uploaded_videos(simulation_paths, real_paths) |
| yield frame_output, "Download Ready.", csv_file, output_video_paths |
|
|