THESIS_DGO / pages /Motion-Based_Model.py
HansGan
qwe
4ec29e4
import streamlit as st
import joblib
import numpy as np
import cv2
import tempfile
import os
# Streamlit app header
st.title("Motion-Based One-Class SVM Model")
#==========================IMPORTING MODELS==========================
motion_scaler = joblib.load('pages/motion_scaler.pkl')
ocsvm_motion = joblib.load('pages/ocsvm_motion.pkl')
#====================================================================
#==========================FUNCTIONS==========================
# Function to extract and calculate motion features
def calculate_optical_flow_and_motion_energy(video_path):
motion_features = []
# Open the video file
cap = cv2.VideoCapture(video_path)
# Check if the video opened successfully
if not cap.isOpened():
raise ValueError(f"Error opening video file: {video_path}")
# Read the first frame
ret, prev_frame = cap.read()
if not ret:
raise ValueError("Cannot read the first frame from the video.")
# Convert the first frame to grayscale
prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
while True:
# Read the next frame
ret, frame = cap.read()
if not ret:
break # Exit if there are no more frames
# Convert the current frame to grayscale
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# Calculate optical flow
flow = cv2.calcOpticalFlowFarneback(prev_gray, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)
# Calculate magnitude and angle
mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])
# Calculate average magnitude, angle, and motion energy
avg_mag = np.mean(mag) # Average magnitude
avg_ang = np.mean(ang) # Average angle
motion_energy = np.sum(mag) # Motion energy
# Motion Intensity Changes (Velocity and Acceleration)
if len(motion_features) >= 2:
velocity = avg_mag - motion_features[-1][0]
acceleration = velocity - (motion_features[-1][0] - motion_features[-2][0])
elif len(motion_features) == 1:
velocity = avg_mag - motion_features[0][0]
acceleration = 0
else:
velocity = 0
acceleration = 0
# Flow Coherence
flow_coherence = np.mean(np.cos(ang))
# Store features
features = np.array([velocity, avg_mag, flow_coherence])
motion_features.append(features)
# Update previous frame
prev_gray = gray
# Release the video capture object
cap.release()
return np.array(motion_features)
# Function to predict panic from uploaded video and annotate the video
def predict_uploaded_video(video_uploaded):
# Create a temporary file to save the uploaded video
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as temp_file:
temp_file.write(video_uploaded.read())
temp_file_path = temp_file.name
motion_features = calculate_optical_flow_and_motion_energy(temp_file_path)
motion_features_scaled = motion_scaler.transform(motion_features)
predictions = ocsvm_motion.predict(motion_features_scaled)
predictions = predictions.tolist() # Convert ndarray to list
predictions.append(1) # Add a placeholder for the last frame
# Annotate the video
output_video_path = temp_file_path.replace(".mp4", "_annotated.mp4")
panic_percentage = annotate_video(temp_file_path, predictions, output_video_path)
# Remove the temporary file after processing
os.remove(temp_file_path)
# Provide a download link for the annotated video
st.success(f"Video processed successfully! {panic_percentage:.2f}% of frames detected as panic. Download the annotated video below:")
st.download_button(label="Download Annotated Video", data=open(output_video_path, "rb"), file_name="annotated_video.mp4")
def annotate_video(video_path, predictions, output_path):
cap = cv2.VideoCapture(video_path)
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))
frame_number = 0
panic_count = 0
total_frames = len(predictions)
# Read the first frame to initialize optical flow calculations
ret, prev_frame = cap.read()
if not ret:
cap.release()
out.release()
raise ValueError("Cannot read the first frame from the video.")
prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
while True:
ret, frame = cap.read()
if not ret:
break # Exit if there are no more frames
# Convert the current frame to grayscale for optical flow calculation
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# Calculate optical flow between previous and current frames
flow = cv2.calcOpticalFlowFarneback(prev_gray, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)
mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])
# Create a flow map for visualization
flow_map = np.sqrt(flow[..., 0]**2 + flow[..., 1]**2)
flow_map_normalized = cv2.normalize(flow_map, None, 0, 255, cv2.NORM_MINMAX).astype(np.uint8)
# Convert the flow map to a colored heatmap
flow_map_colored = cv2.applyColorMap(flow_map_normalized, cv2.COLORMAP_JET)
# Combine the original frame with the flow map (you can overlay them or display side by side)
combined_frame = cv2.addWeighted(frame, 0.7, flow_map_colored, 0.3, 0)
# Annotate the frame based on predictions
if predictions[frame_number] == 1:
panic_count += 1
text = "Panic"
text_size = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 1, 2)[0]
text_x = (combined_frame.shape[1] - text_size[0]) // 2 # Center the text
cv2.putText(combined_frame, text, (text_x, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
# Write the combined frame with annotations to the output video
out.write(combined_frame)
# Update the previous frame
prev_gray = gray
frame_number += 1
cap.release()
out.release()
# Calculate panic percentage
panic_percentage = (panic_count / total_frames) * 100 if total_frames > 0 else 0
return panic_percentage
#=============================================================
#==========================PREDICTING UPLOADED VIDEO==========================
video_mp4_file = st.file_uploader("Upload Video File", type=['mp4'])
if video_mp4_file is not None:
# Add spinner to indicate loading
with st.spinner("Processing video..."):
predict_uploaded_video(video_mp4_file)
#==============================================================================