Spaces:
Build error
Build error
| import streamlit as st | |
| import joblib | |
| import numpy as np | |
| import cv2 | |
| import tempfile | |
| import os | |
| # Streamlit app header | |
| st.title("Motion-Based One-Class SVM Model") | |
| #==========================IMPORTING MODELS========================== | |
| motion_scaler = joblib.load('pages/motion_scaler.pkl') | |
| ocsvm_motion = joblib.load('pages/ocsvm_motion.pkl') | |
| #==================================================================== | |
| #==========================FUNCTIONS========================== | |
| # Function to extract and calculate motion features | |
| def calculate_optical_flow_and_motion_energy(video_path): | |
| motion_features = [] | |
| # Open the video file | |
| cap = cv2.VideoCapture(video_path) | |
| # Check if the video opened successfully | |
| if not cap.isOpened(): | |
| raise ValueError(f"Error opening video file: {video_path}") | |
| # Read the first frame | |
| ret, prev_frame = cap.read() | |
| if not ret: | |
| raise ValueError("Cannot read the first frame from the video.") | |
| # Convert the first frame to grayscale | |
| prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY) | |
| while True: | |
| # Read the next frame | |
| ret, frame = cap.read() | |
| if not ret: | |
| break # Exit if there are no more frames | |
| # Convert the current frame to grayscale | |
| gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
| # Calculate optical flow | |
| flow = cv2.calcOpticalFlowFarneback(prev_gray, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0) | |
| # Calculate magnitude and angle | |
| mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1]) | |
| # Calculate average magnitude, angle, and motion energy | |
| avg_mag = np.mean(mag) # Average magnitude | |
| avg_ang = np.mean(ang) # Average angle | |
| motion_energy = np.sum(mag) # Motion energy | |
| # Motion Intensity Changes (Velocity and Acceleration) | |
| if len(motion_features) >= 2: | |
| velocity = avg_mag - motion_features[-1][0] | |
| acceleration = velocity - (motion_features[-1][0] - motion_features[-2][0]) | |
| elif len(motion_features) == 1: | |
| velocity = avg_mag - motion_features[0][0] | |
| acceleration = 0 | |
| else: | |
| velocity = 0 | |
| acceleration = 0 | |
| # Flow Coherence | |
| flow_coherence = np.mean(np.cos(ang)) | |
| # Store features | |
| features = np.array([velocity, avg_mag, flow_coherence]) | |
| motion_features.append(features) | |
| # Update previous frame | |
| prev_gray = gray | |
| # Release the video capture object | |
| cap.release() | |
| return np.array(motion_features) | |
| # Function to predict panic from uploaded video and annotate the video | |
| def predict_uploaded_video(video_uploaded): | |
| # Create a temporary file to save the uploaded video | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as temp_file: | |
| temp_file.write(video_uploaded.read()) | |
| temp_file_path = temp_file.name | |
| motion_features = calculate_optical_flow_and_motion_energy(temp_file_path) | |
| motion_features_scaled = motion_scaler.transform(motion_features) | |
| predictions = ocsvm_motion.predict(motion_features_scaled) | |
| predictions = predictions.tolist() # Convert ndarray to list | |
| predictions.append(1) # Add a placeholder for the last frame | |
| # Annotate the video | |
| output_video_path = temp_file_path.replace(".mp4", "_annotated.mp4") | |
| panic_percentage = annotate_video(temp_file_path, predictions, output_video_path) | |
| # Remove the temporary file after processing | |
| os.remove(temp_file_path) | |
| # Provide a download link for the annotated video | |
| st.success(f"Video processed successfully! {panic_percentage:.2f}% of frames detected as panic. Download the annotated video below:") | |
| st.download_button(label="Download Annotated Video", data=open(output_video_path, "rb"), file_name="annotated_video.mp4") | |
| def annotate_video(video_path, predictions, output_path): | |
| cap = cv2.VideoCapture(video_path) | |
| frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height)) | |
| frame_number = 0 | |
| panic_count = 0 | |
| total_frames = len(predictions) | |
| # Read the first frame to initialize optical flow calculations | |
| ret, prev_frame = cap.read() | |
| if not ret: | |
| cap.release() | |
| out.release() | |
| raise ValueError("Cannot read the first frame from the video.") | |
| prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY) | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break # Exit if there are no more frames | |
| # Convert the current frame to grayscale for optical flow calculation | |
| gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
| # Calculate optical flow between previous and current frames | |
| flow = cv2.calcOpticalFlowFarneback(prev_gray, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0) | |
| mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1]) | |
| # Create a flow map for visualization | |
| flow_map = np.sqrt(flow[..., 0]**2 + flow[..., 1]**2) | |
| flow_map_normalized = cv2.normalize(flow_map, None, 0, 255, cv2.NORM_MINMAX).astype(np.uint8) | |
| # Convert the flow map to a colored heatmap | |
| flow_map_colored = cv2.applyColorMap(flow_map_normalized, cv2.COLORMAP_JET) | |
| # Combine the original frame with the flow map (you can overlay them or display side by side) | |
| combined_frame = cv2.addWeighted(frame, 0.7, flow_map_colored, 0.3, 0) | |
| # Annotate the frame based on predictions | |
| if predictions[frame_number] == 1: | |
| panic_count += 1 | |
| text = "Panic" | |
| text_size = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 1, 2)[0] | |
| text_x = (combined_frame.shape[1] - text_size[0]) // 2 # Center the text | |
| cv2.putText(combined_frame, text, (text_x, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) | |
| # Write the combined frame with annotations to the output video | |
| out.write(combined_frame) | |
| # Update the previous frame | |
| prev_gray = gray | |
| frame_number += 1 | |
| cap.release() | |
| out.release() | |
| # Calculate panic percentage | |
| panic_percentage = (panic_count / total_frames) * 100 if total_frames > 0 else 0 | |
| return panic_percentage | |
| #============================================================= | |
| #==========================PREDICTING UPLOADED VIDEO========================== | |
| video_mp4_file = st.file_uploader("Upload Video File", type=['mp4']) | |
| if video_mp4_file is not None: | |
| # Add spinner to indicate loading | |
| with st.spinner("Processing video..."): | |
| predict_uploaded_video(video_mp4_file) | |
| #============================================================================== | |