DroneCrashClassification / app_backend.py
EthanStanks's picture
Fixing file upload
8d70ec1
import os
import csv
import shutil # Added to copy files
from dataclasses import dataclass
import model.video_classifier as classifier
device = 'cpu'
model_path = './model/best.pt'
video_folder = './videos'
output_folder = './results'
# Ensure the directories exist
os.makedirs(video_folder, exist_ok=True)
os.makedirs(output_folder, exist_ok=True)
model = classifier.get_model(model_path)
@dataclass
class VideoResult:
video_path: str
label_counts: dict
crash_events: list
video_name: str
is_simulation: int
def result_csv(student, simulation_results, real_results):
results = simulation_results + real_results
file_name = 'crash_results.csv'
csv_file = os.path.join(output_folder, file_name)
csv_columns = [
'Student', 'VideoFileName', 'IsSimulation', 'UniqueCrashes', 'CrashFrames', 'FlightFrames',
'NoDroneFrames', 'NoSignalFrames', 'NoStartedFrames', 'StartedFrames', 'UnstableFrames',
'LandingFrames', 'CrashTimes', 'CrashDurations'
]
with open(csv_file, 'w', newline='') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=csv_columns)
writer.writeheader()
for result in results:
unique_crashes = len(result.crash_events)
crash_times = ''
crash_durations = ''
if unique_crashes > 0:
for j, (start, end) in enumerate(result.crash_events, start=1):
start_formatted = classifier.format_time(start)
end_formatted = classifier.format_time(end)
duration = end - start
crash_times += f"[{start_formatted},{end_formatted}]"
crash_durations += f"{duration:.2f}"
if unique_crashes - j != 0:
crash_times += ","
crash_durations += ","
else:
crash_times = None
crash_durations = None
writer.writerow({
'Student': student,
'VideoFileName': result.video_name,
'IsSimulation': result.is_simulation,
'UniqueCrashes': unique_crashes,
'CrashFrames': result.label_counts.get("Crash", 0),
'FlightFrames': result.label_counts.get("Flight", 0),
'NoDroneFrames': result.label_counts.get("No drone", 0),
'NoSignalFrames': result.label_counts.get("No signal", 0),
'NoStartedFrames': result.label_counts.get("No started", 0),
'StartedFrames': result.label_counts.get("Started", 0),
'UnstableFrames': result.label_counts.get("Unstable", 0),
'LandingFrames': result.label_counts.get("Landing", 0),
'CrashTimes': crash_times,
'CrashDurations': crash_durations
})
return csv_file
def remove_videos_and_csv(folder_path):
video_extensions = {'.mp4'}
csv_extension = '.csv'
for filename in os.listdir(folder_path):
file_path = os.path.join(folder_path, filename)
if os.path.isfile(file_path):
_, extension = os.path.splitext(filename)
if extension.lower() in video_extensions or extension.lower() == csv_extension:
os.remove(file_path)
def delete_uploaded_videos(simulation_paths, real_paths):
all_video_paths = simulation_paths + real_paths
for video_file in all_video_paths:
if os.path.exists(video_file):
os.remove(video_file)
def backend(student_name_tb, simulation_files, real_files, frame_output, text_output, file_output, video_output):
yield frame_output, "Starting process...", file_output, video_output
remove_videos_and_csv(output_folder)
yield frame_output, "Processing Input...", file_output, video_output
name = student_name_tb.strip()
simulation_paths = []
real_paths = []
# Ensure the video_folder exists
os.makedirs(video_folder, exist_ok=True)
# Store original filenames
simulation_filenames = [os.path.basename(file_path) for file_path in simulation_files]
real_filenames = [os.path.basename(file_path) for file_path in real_files]
# Save the uploaded simulation videos to the 'video_folder'
for i, file_path in enumerate(simulation_files, start=1):
ext = os.path.splitext(file_path)[1]
sim_video_path = os.path.join(video_folder, f'sim_video{i}{ext}')
shutil.copy(file_path, sim_video_path)
simulation_paths.append(sim_video_path)
# Save the uploaded real videos to the 'video_folder'
for i, file_path in enumerate(real_files, start=1):
ext = os.path.splitext(file_path)[1]
real_video_path = os.path.join(video_folder, f'real_video{i}{ext}')
shutil.copy(file_path, real_video_path)
real_paths.append(real_video_path)
simulation_results = []
real_results = []
output_video_paths = []
# Process simulation videos
for i, path in enumerate(simulation_paths, start=1):
# Setup output paths
crash_video_filename = f'Simulation_Video_{i}_Crashes.mp4'
crash_video_path = os.path.join(output_folder, crash_video_filename)
labeled_video_filename = f'Simulation_Video_{i}_Labeled.mp4'
labeled_video_path = os.path.join(output_folder, labeled_video_filename)
output_video_paths.extend([crash_video_path, labeled_video_path])
yield frame_output, f"Processing Simulation Video #{i}...", file_output, video_output
video_gen = classifier.video_classification(
path, labeled_video_path, crash_video_path, model, min_crash_duration=2.0)
for item in video_gen:
if item['type'] == 'frame':
frame_out = item['frame']
progress_text = item['progress_text']
yield frame_out, progress_text, file_output, video_output
elif item['type'] == 'results':
label_counts = item['label_counts']
crash_events = item['crash_events']
simulation_results.append(VideoResult(
video_path=path,
label_counts=label_counts,
crash_events=crash_events,
video_name=simulation_filenames[i - 1],
is_simulation=1))
break
# Process real videos
for i, path in enumerate(real_paths, start=1):
crash_video_filename = f'Real_Video_{i}_Crashes.mp4'
crash_video_path = os.path.join(output_folder, crash_video_filename)
labeled_video_filename = f'Real_Video_{i}_Labeled.mp4'
labeled_video_path = os.path.join(output_folder, labeled_video_filename)
output_video_paths.extend([crash_video_path, labeled_video_path])
yield frame_output, f"Processing Real Video #{i}...", file_output, video_output
video_gen = classifier.video_classification(
path, labeled_video_path, crash_video_path, model, min_crash_duration=2.0)
for item in video_gen:
if item['type'] == 'frame':
frame_out = item['frame']
progress_text = item['progress_text']
yield frame_out, progress_text, file_output, video_output
elif item['type'] == 'results':
label_counts = item['label_counts']
crash_events = item['crash_events']
real_results.append(VideoResult(
video_path=path,
label_counts=label_counts,
crash_events=crash_events,
video_name=real_filenames[i - 1],
is_simulation=0))
break
yield frame_output, "Outputting Results to CSV...", file_output, video_output
csv_file = result_csv(name, simulation_results, real_results)
yield frame_output, "Finishing process...", file_output, video_output
# Delete the uploaded video files from the server
delete_uploaded_videos(simulation_paths, real_paths)
yield frame_output, "Download Ready.", csv_file, output_video_paths