import base64 import streamlit as st import cv2 import numpy as np import time import tempfile from PIL import Image import io import os import imageio import sys import threading from datetime import datetime import av import requests import google.generativeai as genai import folium from streamlit_folium import folium_static import geocoder from twilio.rest import Client from inference_sdk import InferenceHTTPClient from inference_sdk import InferenceHTTPClient, InferenceConfiguration # from streamlit_webrtc import webrtc_streamer, VideoProcessorBase, RTCConfiguration # Page configuration st.set_page_config( page_title="Crash Detection System", page_icon="🚨", layout="wide" ) # App title and description st.markdown("

Vehicle Crash Detection System

", unsafe_allow_html=True) st.markdown("""

Real-time vehicle crash detection and severity assessment

""", unsafe_allow_html=True) # Sidebar for API key and Twilio configuration with st.sidebar.expander("API Configuration", expanded=False): api_key = st.text_input("Google Gemini API Key", type="password",value="AIzaSyCcf3s3GS7_925D7t2fgODc5WIKOMZSOzc") roboflow_api_key = st.text_input("Roboflow API Key", value="fWfYhVuhRbuPSffMaLMr", type="password") if api_key: genai.configure(api_key=api_key) st.success("Google API key configured!") if roboflow_api_key: st.success("Roboflow API key configured!") with st.sidebar.expander("Twilio Configuration", expanded=False): twilio_account_sid = st.text_input("Twilio Account SID", type="password") twilio_auth_token = st.text_input("Twilio Auth Token", type="password") twilio_from_number = st.text_input("Twilio From Number") recipient_number = st.text_input("Recipient Phone Number") @st.cache_resource def initialize_roboflow_client(): """Initialize the Roboflow client with caching""" return InferenceHTTPClient( api_url="https://serverless.roboflow.com", api_key=roboflow_api_key ) # Get the client CLIENT = initialize_roboflow_client() def detect_crash(image): """ Detects crashes in an image using Roboflow YOLO model Args: image: PIL Image or numpy array Returns: Dictionary with crash detection results, annotated image, and crash details """ try: # Convert to PIL Image if it's a numpy array if isinstance(image, np.ndarray): # Convert BGR (OpenCV) to RGB (PIL) image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) pil_image = Image.fromarray(image_rgb) else: pil_image = image # Save image temporarily with reduced quality for faster upload temp_img_path = "temp_detection_image.jpg" pil_image.save(temp_img_path, "JPEG", quality=70) # Send to Roboflow for inference custom_configuration = InferenceConfiguration(confidence_threshold=0.85, iou_threshold=0.6) with CLIENT.use_configuration(custom_configuration): result = CLIENT.infer(temp_img_path, model_id="accident-detection-cwbvs/2") # Clean up temp file if os.path.exists(temp_img_path): os.remove(temp_img_path) # Initialize default response crash_detected = False severity = "None" annotated_image = None # Create annotated image (with bounding boxes) if isinstance(image, np.ndarray): annotated_image = image.copy() else: annotated_image = np.array(pil_image) # Convert back to BGR for OpenCV operations annotated_image = cv2.cvtColor(annotated_image, cv2.COLOR_RGB2BGR) # Process predictions if available if "predictions" in result and result["predictions"]: for pred in result["predictions"]: crash_detected = True # Extract severity based on class_id class_id = pred.get("class_id", 0) if class_id == 1: severity = "Minor" elif class_id == 2: severity = "Moderate" elif class_id == 3: severity = "Severe" else: # Default to Moderate for unclassified crashes severity = "Moderate" # Draw bounding box on the image x, y = pred.get("x", 0), pred.get("y", 0) width, height = pred.get("width", 0), pred.get("height", 0) confidence = pred.get("confidence", 0) # Calculate coordinates for rectangle x1 = int(x - width/2) y1 = int(y - height/2) x2 = int(x + width/2) y2 = int(y + height/2) # Ensure coordinates are within image bounds img_height, img_width = annotated_image.shape[:2] x1 = max(0, min(x1, img_width-1)) y1 = max(0, min(y1, img_height-1)) x2 = max(0, min(x2, img_width-1)) y2 = max(0, min(y2, img_height-1)) # Set color based on severity if severity == "Minor": color = (0, 255, 255) # Yellow elif severity == "Moderate": color = (0, 165, 255) # Orange else: color = (0, 0, 255) # Red # Draw rectangle and label cv2.rectangle(annotated_image, (x1, y1), (x2, y2), color, 2) label = f"{severity} crash: {confidence:.2f}" cv2.putText(annotated_image, label, (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) return { "crash_detected": crash_detected, "severity": severity, "annotated_image": annotated_image, "raw_result": result } except Exception as e: st.error(f"Error in crash detection: {str(e)}") # Return original image if error occurs if isinstance(image, np.ndarray): return {"crash_detected": False, "severity": "Error", "annotated_image": image, "raw_result": {}} else: return {"crash_detected": False, "severity": "Error", "annotated_image": np.array(pil_image), "raw_result": {}} def assess_crash_severity(image, crash_info): """ Uses Google Gemini AI to assess crash severity in an image. Args: image: PIL Image or numpy array crash_info: Dictionary with crash detection results Returns: Detailed analysis as a string """ if not api_key: return "API key not configured for detailed analysis" if not crash_info["crash_detected"]: return "No crash detected in this image" try: # Convert to PIL Image if it's a numpy array if isinstance(image, np.ndarray): image_pil = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB)) else: image_pil = image # Use Gemini to assess severity model_gemini = genai.GenerativeModel("gemini-1.5-flash") prompt = f""" Analyze this vehicle crash image. Detected crash severity: {crash_info['severity']} Raw detection data: {crash_info['raw_result']} Give a short, 2-line analysis of the crash. Focus on: 1. Apparent damage level and potential injuries 2. Possible cause or contributing factors Keep your response concise and direct. """ response = model_gemini.generate_content([prompt, image_pil]) if response and hasattr(response, "text"): return response.text.strip() else: return "Unable to generate detailed analysis" except Exception as e: return f"Error in analysis: {str(e)}" def get_current_location(): """ Get the current geolocation. Returns approximate location as a dictionary with lat, lng, address """ try: g = geocoder.ip('me') return { "lat": g.lat, "lng": g.lng, "address": g.address } except Exception as e: return { "lat": 40.7128, # Default to NYC coordinates "lng": -74.0060, "address": "Location unavailable" } def send_crash_alert_twilio(crash_data): """ Send a text message alert using Twilio Args: crash_data: Dictionary with crash details Returns: Boolean indicating success and message """ messaging_service_sid = "MGf47912734231e47b941784b93376839d" if not twilio_account_sid or not twilio_auth_token or not recipient_number: return False, "Twilio configuration incomplete" try: # Initialize Twilio client client = Client(twilio_account_sid, twilio_auth_token) # Create message content message_body = f""" 🚨 CRASH ALERT 🚨 Time: {crash_data['timestamp']} Severity: {crash_data['severity']} """ # Send message if twilio_from_number: message = client.messages.create( body=message_body, from_=twilio_from_number, to=recipient_number ) else: message = client.messages.create( body=message_body, messaging_service_sid=messaging_service_sid, to=recipient_number ) return True, f"Message sent with SID: {message.sid}" except Exception as e: return False, f"Failed to send alert: {str(e)}" # FRONTEND IMPLEMENTATION # Create sidebar for controls st.sidebar.title("Controls") # Detection settings confidence_threshold = st.sidebar.slider( "Detection Confidence", min_value=0.0, max_value=1.0, value=0.45 ) # Input method selection input_method = st.sidebar.radio( "Input Source", ["Webcam", "Upload Image", "Upload Video"] ) # Global variables for tracking detections last_crash_time = None crash_detected = False crash_severity = "None" crash_analysis = "None" latest_crash_image = None alert_sent = False # Statistics if 'total_detections' not in st.session_state: st.session_state.total_detections = 0 if 'total_crashes' not in st.session_state: st.session_state.total_crashes = 0 if 'severe_crashes' not in st.session_state: st.session_state.severe_crashes = 0 if 'alerts_sent' not in st.session_state: st.session_state.alerts_sent = 0 # Create columns for the main display area col1, col2 = st.columns([2, 1]) # Create a single map placeholder that will be populated only once map_container = st.container() # Image display area in column 1 with col1: frame_placeholder = st.empty() # Initial image sample_img = np.zeros((480, 640, 3), dtype=np.uint8) frame_placeholder.image(sample_img, channels="BGR") # Info area in column 2 with col2: status_placeholder = st.empty() severity_placeholder = st.empty() analysis_placeholder = st.empty() timestamp_placeholder = st.empty() location_placeholder = st.empty() alert_status_placeholder = st.empty() # Initialize displays status_placeholder.markdown("""

Status: Monitoring

""", unsafe_allow_html=True) severity_placeholder.markdown("""

Crash Severity:

None

""", unsafe_allow_html=True) analysis_placeholder.markdown("""

Crash Analysis:

None

""", unsafe_allow_html=True) timestamp_placeholder.markdown("""

Timestamp:

N/A

""", unsafe_allow_html=True) location_placeholder.markdown("""

Location:

N/A

""", unsafe_allow_html=True) # Add a placeholder for the map in the map container with map_container: map_placeholder = st.empty() # Initialize the map only once initial_location = get_current_location() m = folium.Map(location=[initial_location["lat"], initial_location["lng"]], zoom_start=15) folium.Marker( [initial_location["lat"], initial_location["lng"]], popup="Current Location", tooltip="Current Location" ).add_to(m) map_placeholder.empty() # Clear initially, will be populated when needed def update_info_display(custom_analysis=None): """Update the information display""" global crash_detected, crash_severity, crash_analysis, last_crash_time # Update status if crash_detected: status_placeholder.markdown(f"""

Status: CRASH DETECTED! 🚨

""", unsafe_allow_html=True) else: status_placeholder.markdown("""

Status: Monitoring

""", unsafe_allow_html=True) # Update severity if crash_detected: severity_color = "#FF4B4B" if crash_severity.lower() == "severe" else "#FFA500" severity_placeholder.markdown(f"""

Crash Severity:

{crash_severity}

""", unsafe_allow_html=True) # Update analysis - use custom if provided display_analysis = custom_analysis if custom_analysis else crash_analysis analysis_placeholder.markdown(f"""

Crash Analysis:

{display_analysis}

""", unsafe_allow_html=True) else: severity_placeholder.markdown("""

Crash Severity:

None

""", unsafe_allow_html=True) analysis_placeholder.markdown("""

Crash Analysis:

None

""", unsafe_allow_html=True) # Update timestamp if last_crash_time: crash_time = datetime.fromtimestamp(last_crash_time).strftime('%Y-%m-%d %H:%M:%S') timestamp_placeholder.markdown(f"""

Last Crash Detected:

{crash_time}

""", unsafe_allow_html=True) # Update location location = get_current_location() location_placeholder.markdown(f"""

Location:

{location['address']}

""", unsafe_allow_html=True) # Update map with crash location m = folium.Map(location=[location["lat"], location["lng"]], zoom_start=15) folium.Marker( [location["lat"], location["lng"]], popup=f"Crash Location
Severity: {crash_severity}
Time: {crash_time}", tooltip="Crash Location", icon=folium.Icon(color='red', icon='warning-sign') ).add_to(m) # Only update the map once in the map container with map_container: folium_static(m) else: timestamp_placeholder.markdown("""

Timestamp:

N/A

""", unsafe_allow_html=True) location_placeholder.markdown("""

Location:

N/A

""", unsafe_allow_html=True) # Display status message # status_placeholder.info("Starting camera... Please accept camera permissions when prompted.") from camera_input_live import camera_input_live # WebRTC streamer component if input_method == "Webcam": # No need for webcam selection as camera_input_live handles that # Add start/stop buttons in sidebar col1, col2 = st.sidebar.columns(2) start_button = col1.button("Start Detection") stop_button = col2.button("Stop Detection") if start_button: st.session_state.webcam_running = True if stop_button: st.session_state.webcam_running = False st.success("Detection stopped") # Display status initially if not st.session_state.get("webcam_running", False): status_placeholder.markdown("""

Status: Idle

""", unsafe_allow_html=True) # Only show camera when running if st.session_state.get("webcam_running", False): # Get image from camera_input_live frame = camera_input_live() # Process frame if available if frame is not None: # Convert camera_input_live output to OpenCV format st.image(frame) bytes_data = frame.getvalue() cv2_frame = cv2.imdecode(np.frombuffer(bytes_data, np.uint8), cv2.IMREAD_COLOR) # Process frame for detection st.session_state.total_detections += 1 detection_result = detect_crash(cv2_frame) # Update image with annotations # annotated_image = detection_result["annotated_image"] # frame_placeholder.image(annotated_image, channels="BGR") # If crash detected, process further if detection_result["crash_detected"] and not alert_sent: # Set crash data crash_detected = True crash_severity = detection_result["severity"] last_crash_time = time.time() # Update statistics st.session_state.total_crashes += 1 if crash_severity.lower() == "severe": st.session_state.severe_crashes += 1 # Get analysis from Gemini crash_analysis = assess_crash_severity(cv2_frame, detection_result) # Update info display update_info_display() # Send alert only once location = get_current_location() crash_data = { "timestamp": datetime.fromtimestamp(last_crash_time).strftime('%Y-%m-%d %H:%M:%S'), "severity": crash_severity, "analysis": crash_analysis, "location": location, "raw_detection": detection_result["raw_result"] } success, message = send_crash_alert_twilio(crash_data) if success: st.session_state.alerts_sent += 1 alert_status_placeholder.success(f"Alert sent: {message}") alert_sent = True else: alert_status_placeholder.error(f"Alert failed: {message}") # Status update if no crash if not detection_result["crash_detected"]: status_placeholder.markdown("""

Status: Monitoring

""", unsafe_allow_html=True) elif input_method == "Upload Image": uploaded_file = st.sidebar.file_uploader("Choose an image...", type=["jpg", "jpeg", "png"]) if uploaded_file is not None: # Read image image_bytes = uploaded_file.read() image = cv2.imdecode(np.frombuffer(image_bytes, np.uint8), cv2.IMREAD_COLOR) # Display original image frame_placeholder.image(image, channels="BGR", caption="Uploaded Image") # Process button if st.sidebar.button("Process Image"): # Increment counter st.session_state.total_detections += 1 # Process the image with st.spinner("Processing image..."): detection_result = detect_crash(image) # Display annotated image annotated_image = detection_result["annotated_image"] frame_placeholder.image(annotated_image, channels="BGR", caption="Processed Image") # If crash detected if detection_result["crash_detected"]: # Set crash data crash_detected = True crash_severity = detection_result["severity"] last_crash_time = time.time() # Update statistics st.session_state.total_crashes += 1 if crash_severity.lower() == "severe": st.session_state.severe_crashes += 1 # Get analysis from Gemini crash_analysis = assess_crash_severity(image, detection_result) # Update info display update_info_display() # Send alert location = get_current_location() crash_data = { "timestamp": datetime.fromtimestamp(last_crash_time).strftime('%Y-%m-%d %H:%M:%S'), "severity": crash_severity, "analysis": crash_analysis, "location": location, "raw_detection": detection_result["raw_result"] } success, message = send_crash_alert_twilio(crash_data) if success: st.session_state.alerts_sent += 1 alert_status_placeholder.success(f"Alert sent: {message}") else: alert_status_placeholder.error(f"Alert failed: {message}") else: st.info("No crash detected in this image.") # Update monitoring status status_placeholder.markdown("""

Status: Monitoring

""", unsafe_allow_html=True) elif input_method == "Upload Video": uploaded_file = st.sidebar.file_uploader("Choose a video...", type=["mp4", "avi", "mov"]) if uploaded_file is not None: # Save uploaded file temporarily tfile = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') tfile.write(uploaded_file.read()) tfile_path = tfile.name tfile.close() if st.sidebar.button("Process Video"): try: cap = cv2.VideoCapture(tfile_path) if not cap.isOpened(): st.error("Cannot open video file") else: fps = cap.get(cv2.CAP_PROP_FPS) frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) processed_frames = [] crash_frames = [] progress_bar = st.progress(0) status_text = st.empty() status_text.text(f"Analyzing video... (0/{frame_count} frames)") frame_number = 0 while True: ret, frame = cap.read() if not ret: break st.session_state.total_detections += 1 detection_result = detect_crash(frame) processed_frames.append({ "frame": detection_result["annotated_image"], "result": detection_result }) if detection_result["crash_detected"]: crash_frames.append({ "frame_number": frame_number, "frame": frame.copy(), "detection_result": detection_result, "timestamp": time.time(), "severity": detection_result["severity"] }) frame_number += 1 if frame_number % 5 == 0 or frame_number == frame_count: progress_value = min(frame_number / frame_count, 1.0) progress_bar.progress(progress_value) status_text.text(f"Analyzing video... ({frame_number}/{frame_count} frames)") cap.release() status_text.text("Creating processed video...") output_video_path = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4').name if processed_frames: first_frame = processed_frames[0]["frame"] h, w = first_frame.shape[:2] fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Use 'mp4v' for compatibility out = cv2.VideoWriter(output_video_path, fourcc, fps, (w, h)) for frame_data in processed_frames: out.write(frame_data["frame"]) out.release() time.sleep(0.5) # Ensure the file is fully flushed try: with open(output_video_path, 'rb') as video_file: video_bytes = video_file.read() # Generate download link b64 = base64.b64encode(video_bytes).decode() href = f'📥 Click here to download the processed video' status_text.empty() progress_bar.empty() st.markdown(href, unsafe_allow_html=True) except Exception as e: st.error(f"Error preparing video download: {e}") st.success(f"Video analysis complete. {len(processed_frames)} frames processed, {len(crash_frames)} crashes detected.") if crash_frames: crash_frames.sort(key=lambda x: x["frame_number"]) last_crash = crash_frames[-1] frame_placeholder.image( last_crash["detection_result"]["annotated_image"], channels="BGR", caption=f"Last Detected Crash (Frame {last_crash['frame_number']})", use_column_width=True ) with st.spinner("Analyzing crash severity..."): crash_analysis = assess_crash_severity(last_crash["frame"], last_crash["detection_result"]) crash_detected = True crash_severity = last_crash["severity"] last_crash_time = last_crash["timestamp"] st.session_state.total_crashes += len(crash_frames) severe_count = sum(1 for crash in crash_frames if crash["severity"].lower() == "severe") st.session_state.severe_crashes += severe_count update_info_display(custom_analysis=crash_analysis) location = get_current_location() crash_data = { "timestamp": datetime.fromtimestamp(last_crash_time).strftime('%Y-%m-%d %H:%M:%S'), "severity": crash_severity, "analysis": crash_analysis, "location": location, "raw_detection": last_crash["detection_result"]["raw_result"] } success, message = send_crash_alert_twilio(crash_data) if success: st.session_state.alerts_sent += 1 alert_status_placeholder.success(f"Alert sent: {message}") else: alert_status_placeholder.error(f"Alert failed: {message}") else: st.info("No crashes detected in this video.") try: os.remove(output_video_path) except: pass except Exception as e: st.error(f"Error processing video: {e}") st.exception(e) finally: try: if os.path.exists(tfile_path): os.remove(tfile_path) except: pass st.markdown("---") col1, col2, col3, col4 = st.columns(4) with col1: st.markdown(f"""

Total Detections

{st.session_state.total_detections}

""", unsafe_allow_html=True) with col2: st.markdown(f"""

Total Crashes

{st.session_state.total_crashes}

""", unsafe_allow_html=True) with col3: st.markdown(f"""

Severe Crashes

{st.session_state.severe_crashes}

""", unsafe_allow_html=True) with col4: st.markdown(f"""

SMS Alerts Sent

{st.session_state.alerts_sent}

""", unsafe_allow_html=True) # Footer st.markdown("---") st.markdown("""

Vehicle Crash Detection and Severity Assessment System

© 2025

""", unsafe_allow_html=True)