import streamlit as st import joblib import numpy as np import cv2 import tempfile import os # Streamlit app header st.title("Motion-Based One-Class SVM Model") #==========================IMPORTING MODELS========================== motion_scaler = joblib.load('pages/motion_scaler.pkl') ocsvm_motion = joblib.load('pages/ocsvm_motion.pkl') #==================================================================== #==========================FUNCTIONS========================== # Function to extract and calculate motion features def calculate_optical_flow_and_motion_energy(video_path): motion_features = [] # Open the video file cap = cv2.VideoCapture(video_path) # Check if the video opened successfully if not cap.isOpened(): raise ValueError(f"Error opening video file: {video_path}") # Read the first frame ret, prev_frame = cap.read() if not ret: raise ValueError("Cannot read the first frame from the video.") # Convert the first frame to grayscale prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY) while True: # Read the next frame ret, frame = cap.read() if not ret: break # Exit if there are no more frames # Convert the current frame to grayscale gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) # Calculate optical flow flow = cv2.calcOpticalFlowFarneback(prev_gray, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0) # Calculate magnitude and angle mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1]) # Calculate average magnitude, angle, and motion energy avg_mag = np.mean(mag) # Average magnitude avg_ang = np.mean(ang) # Average angle motion_energy = np.sum(mag) # Motion energy # Motion Intensity Changes (Velocity and Acceleration) if len(motion_features) >= 2: velocity = avg_mag - motion_features[-1][0] acceleration = velocity - (motion_features[-1][0] - motion_features[-2][0]) elif len(motion_features) == 1: velocity = avg_mag - motion_features[0][0] acceleration = 0 else: velocity = 0 acceleration = 0 # Flow Coherence flow_coherence = np.mean(np.cos(ang)) # Store features features = np.array([velocity, avg_mag, flow_coherence]) motion_features.append(features) # Update previous frame prev_gray = gray # Release the video capture object cap.release() return np.array(motion_features) # Function to predict panic from uploaded video and annotate the video def predict_uploaded_video(video_uploaded): # Create a temporary file to save the uploaded video with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as temp_file: temp_file.write(video_uploaded.read()) temp_file_path = temp_file.name motion_features = calculate_optical_flow_and_motion_energy(temp_file_path) motion_features_scaled = motion_scaler.transform(motion_features) predictions = ocsvm_motion.predict(motion_features_scaled) predictions = predictions.tolist() # Convert ndarray to list predictions.append(1) # Add a placeholder for the last frame # Annotate the video output_video_path = temp_file_path.replace(".mp4", "_annotated.mp4") panic_percentage = annotate_video(temp_file_path, predictions, output_video_path) # Remove the temporary file after processing os.remove(temp_file_path) # Provide a download link for the annotated video st.success(f"Video processed successfully! {panic_percentage:.2f}% of frames detected as panic. Download the annotated video below:") st.download_button(label="Download Annotated Video", data=open(output_video_path, "rb"), file_name="annotated_video.mp4") def annotate_video(video_path, predictions, output_path): cap = cv2.VideoCapture(video_path) frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fps = cap.get(cv2.CAP_PROP_FPS) fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height)) frame_number = 0 panic_count = 0 total_frames = len(predictions) # Read the first frame to initialize optical flow calculations ret, prev_frame = cap.read() if not ret: cap.release() out.release() raise ValueError("Cannot read the first frame from the video.") prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY) while True: ret, frame = cap.read() if not ret: break # Exit if there are no more frames # Convert the current frame to grayscale for optical flow calculation gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) # Calculate optical flow between previous and current frames flow = cv2.calcOpticalFlowFarneback(prev_gray, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0) mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1]) # Create a flow map for visualization flow_map = np.sqrt(flow[..., 0]**2 + flow[..., 1]**2) flow_map_normalized = cv2.normalize(flow_map, None, 0, 255, cv2.NORM_MINMAX).astype(np.uint8) # Convert the flow map to a colored heatmap flow_map_colored = cv2.applyColorMap(flow_map_normalized, cv2.COLORMAP_JET) # Combine the original frame with the flow map (you can overlay them or display side by side) combined_frame = cv2.addWeighted(frame, 0.7, flow_map_colored, 0.3, 0) # Annotate the frame based on predictions if predictions[frame_number] == 1: panic_count += 1 text = "Panic" text_size = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 1, 2)[0] text_x = (combined_frame.shape[1] - text_size[0]) // 2 # Center the text cv2.putText(combined_frame, text, (text_x, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) # Write the combined frame with annotations to the output video out.write(combined_frame) # Update the previous frame prev_gray = gray frame_number += 1 cap.release() out.release() # Calculate panic percentage panic_percentage = (panic_count / total_frames) * 100 if total_frames > 0 else 0 return panic_percentage #============================================================= #==========================PREDICTING UPLOADED VIDEO========================== video_mp4_file = st.file_uploader("Upload Video File", type=['mp4']) if video_mp4_file is not None: # Add spinner to indicate loading with st.spinner("Processing video..."): predict_uploaded_video(video_mp4_file) #==============================================================================