|
|
import base64 |
|
|
import streamlit as st |
|
|
import cv2 |
|
|
import numpy as np |
|
|
import time |
|
|
import tempfile |
|
|
from PIL import Image |
|
|
import io |
|
|
import os |
|
|
import imageio |
|
|
import sys |
|
|
import threading |
|
|
from datetime import datetime |
|
|
import av |
|
|
import requests |
|
|
import google.generativeai as genai |
|
|
import folium |
|
|
from streamlit_folium import folium_static |
|
|
import geocoder |
|
|
from twilio.rest import Client |
|
|
from inference_sdk import InferenceHTTPClient |
|
|
from inference_sdk import InferenceHTTPClient, InferenceConfiguration |
|
|
|
|
|
|
|
|
|
|
|
st.set_page_config( |
|
|
page_title="Crash Detection System", |
|
|
page_icon="🚨", |
|
|
layout="wide" |
|
|
) |
|
|
|
|
|
|
|
|
st.markdown("<h1 style='text-align: center; color: #FF4B4B;'>Vehicle Crash Detection System</h1>", unsafe_allow_html=True) |
|
|
st.markdown(""" |
|
|
<p style='text-align: center; font-size: 1.2em;'>Real-time vehicle crash detection and severity assessment</p> |
|
|
""", unsafe_allow_html=True) |
|
|
|
|
|
|
|
|
with st.sidebar.expander("API Configuration", expanded=False): |
|
|
api_key = st.text_input("Google Gemini API Key", type="password",value="AIzaSyCcf3s3GS7_925D7t2fgODc5WIKOMZSOzc") |
|
|
roboflow_api_key = st.text_input("Roboflow API Key", value="fWfYhVuhRbuPSffMaLMr", type="password") |
|
|
if api_key: |
|
|
genai.configure(api_key=api_key) |
|
|
st.success("Google API key configured!") |
|
|
if roboflow_api_key: |
|
|
st.success("Roboflow API key configured!") |
|
|
|
|
|
with st.sidebar.expander("Twilio Configuration", expanded=False): |
|
|
twilio_account_sid = st.text_input("Twilio Account SID", type="password") |
|
|
twilio_auth_token = st.text_input("Twilio Auth Token", type="password") |
|
|
twilio_from_number = st.text_input("Twilio From Number") |
|
|
recipient_number = st.text_input("Recipient Phone Number") |
|
|
|
|
|
@st.cache_resource |
|
|
def initialize_roboflow_client(): |
|
|
"""Initialize the Roboflow client with caching""" |
|
|
return InferenceHTTPClient( |
|
|
api_url="https://serverless.roboflow.com", |
|
|
api_key=roboflow_api_key |
|
|
) |
|
|
|
|
|
|
|
|
CLIENT = initialize_roboflow_client() |
|
|
|
|
|
def detect_crash(image): |
|
|
""" |
|
|
Detects crashes in an image using Roboflow YOLO model |
|
|
|
|
|
Args: |
|
|
image: PIL Image or numpy array |
|
|
|
|
|
Returns: |
|
|
Dictionary with crash detection results, annotated image, and crash details |
|
|
""" |
|
|
try: |
|
|
|
|
|
if isinstance(image, np.ndarray): |
|
|
|
|
|
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) |
|
|
pil_image = Image.fromarray(image_rgb) |
|
|
else: |
|
|
pil_image = image |
|
|
|
|
|
|
|
|
temp_img_path = "temp_detection_image.jpg" |
|
|
pil_image.save(temp_img_path, "JPEG", quality=70) |
|
|
|
|
|
|
|
|
custom_configuration = InferenceConfiguration(confidence_threshold=0.85, iou_threshold=0.6) |
|
|
with CLIENT.use_configuration(custom_configuration): |
|
|
result = CLIENT.infer(temp_img_path, model_id="accident-detection-cwbvs/2") |
|
|
|
|
|
|
|
|
if os.path.exists(temp_img_path): |
|
|
os.remove(temp_img_path) |
|
|
|
|
|
|
|
|
crash_detected = False |
|
|
severity = "None" |
|
|
annotated_image = None |
|
|
|
|
|
|
|
|
if isinstance(image, np.ndarray): |
|
|
annotated_image = image.copy() |
|
|
else: |
|
|
annotated_image = np.array(pil_image) |
|
|
|
|
|
annotated_image = cv2.cvtColor(annotated_image, cv2.COLOR_RGB2BGR) |
|
|
|
|
|
|
|
|
if "predictions" in result and result["predictions"]: |
|
|
for pred in result["predictions"]: |
|
|
crash_detected = True |
|
|
|
|
|
|
|
|
class_id = pred.get("class_id", 0) |
|
|
if class_id == 1: |
|
|
severity = "Minor" |
|
|
elif class_id == 2: |
|
|
severity = "Moderate" |
|
|
elif class_id == 3: |
|
|
severity = "Severe" |
|
|
else: |
|
|
|
|
|
severity = "Moderate" |
|
|
|
|
|
|
|
|
x, y = pred.get("x", 0), pred.get("y", 0) |
|
|
width, height = pred.get("width", 0), pred.get("height", 0) |
|
|
confidence = pred.get("confidence", 0) |
|
|
|
|
|
|
|
|
x1 = int(x - width/2) |
|
|
y1 = int(y - height/2) |
|
|
x2 = int(x + width/2) |
|
|
y2 = int(y + height/2) |
|
|
|
|
|
|
|
|
img_height, img_width = annotated_image.shape[:2] |
|
|
x1 = max(0, min(x1, img_width-1)) |
|
|
y1 = max(0, min(y1, img_height-1)) |
|
|
x2 = max(0, min(x2, img_width-1)) |
|
|
y2 = max(0, min(y2, img_height-1)) |
|
|
|
|
|
|
|
|
if severity == "Minor": |
|
|
color = (0, 255, 255) |
|
|
elif severity == "Moderate": |
|
|
color = (0, 165, 255) |
|
|
else: |
|
|
color = (0, 0, 255) |
|
|
|
|
|
|
|
|
cv2.rectangle(annotated_image, (x1, y1), (x2, y2), color, 2) |
|
|
label = f"{severity} crash: {confidence:.2f}" |
|
|
cv2.putText(annotated_image, label, (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) |
|
|
|
|
|
return { |
|
|
"crash_detected": crash_detected, |
|
|
"severity": severity, |
|
|
"annotated_image": annotated_image, |
|
|
"raw_result": result |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
st.error(f"Error in crash detection: {str(e)}") |
|
|
|
|
|
if isinstance(image, np.ndarray): |
|
|
return {"crash_detected": False, "severity": "Error", "annotated_image": image, "raw_result": {}} |
|
|
else: |
|
|
return {"crash_detected": False, "severity": "Error", "annotated_image": np.array(pil_image), "raw_result": {}} |
|
|
|
|
|
def assess_crash_severity(image, crash_info): |
|
|
""" |
|
|
Uses Google Gemini AI to assess crash severity in an image. |
|
|
|
|
|
Args: |
|
|
image: PIL Image or numpy array |
|
|
crash_info: Dictionary with crash detection results |
|
|
|
|
|
Returns: |
|
|
Detailed analysis as a string |
|
|
""" |
|
|
if not api_key: |
|
|
return "API key not configured for detailed analysis" |
|
|
|
|
|
if not crash_info["crash_detected"]: |
|
|
return "No crash detected in this image" |
|
|
|
|
|
try: |
|
|
|
|
|
if isinstance(image, np.ndarray): |
|
|
image_pil = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB)) |
|
|
else: |
|
|
image_pil = image |
|
|
|
|
|
|
|
|
model_gemini = genai.GenerativeModel("gemini-1.5-flash") |
|
|
prompt = f""" |
|
|
Analyze this vehicle crash image. |
|
|
Detected crash severity: {crash_info['severity']} |
|
|
Raw detection data: {crash_info['raw_result']} |
|
|
|
|
|
Give a short, 2-line analysis of the crash. Focus on: |
|
|
1. Apparent damage level and potential injuries |
|
|
2. Possible cause or contributing factors |
|
|
|
|
|
Keep your response concise and direct. |
|
|
""" |
|
|
|
|
|
response = model_gemini.generate_content([prompt, image_pil]) |
|
|
|
|
|
if response and hasattr(response, "text"): |
|
|
return response.text.strip() |
|
|
else: |
|
|
return "Unable to generate detailed analysis" |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error in analysis: {str(e)}" |
|
|
|
|
|
def get_current_location(): |
|
|
""" |
|
|
Get the current geolocation. |
|
|
Returns approximate location as a dictionary with lat, lng, address |
|
|
""" |
|
|
try: |
|
|
g = geocoder.ip('me') |
|
|
return { |
|
|
"lat": g.lat, |
|
|
"lng": g.lng, |
|
|
"address": g.address |
|
|
} |
|
|
except Exception as e: |
|
|
return { |
|
|
"lat": 40.7128, |
|
|
"lng": -74.0060, |
|
|
"address": "Location unavailable" |
|
|
} |
|
|
|
|
|
def send_crash_alert_twilio(crash_data): |
|
|
""" |
|
|
Send a text message alert using Twilio |
|
|
|
|
|
Args: |
|
|
crash_data: Dictionary with crash details |
|
|
|
|
|
Returns: |
|
|
Boolean indicating success and message |
|
|
""" |
|
|
messaging_service_sid = "MGf47912734231e47b941784b93376839d" |
|
|
if not twilio_account_sid or not twilio_auth_token or not recipient_number: |
|
|
return False, "Twilio configuration incomplete" |
|
|
|
|
|
try: |
|
|
|
|
|
client = Client(twilio_account_sid, twilio_auth_token) |
|
|
|
|
|
|
|
|
message_body = f""" |
|
|
🚨 CRASH ALERT 🚨 |
|
|
Time: {crash_data['timestamp']} |
|
|
Severity: {crash_data['severity']} |
|
|
""" |
|
|
|
|
|
|
|
|
if twilio_from_number: |
|
|
message = client.messages.create( |
|
|
body=message_body, |
|
|
from_=twilio_from_number, |
|
|
to=recipient_number |
|
|
) |
|
|
else: |
|
|
message = client.messages.create( |
|
|
body=message_body, |
|
|
messaging_service_sid=messaging_service_sid, |
|
|
to=recipient_number |
|
|
) |
|
|
|
|
|
return True, f"Message sent with SID: {message.sid}" |
|
|
|
|
|
except Exception as e: |
|
|
return False, f"Failed to send alert: {str(e)}" |
|
|
|
|
|
|
|
|
|
|
|
st.sidebar.title("Controls") |
|
|
|
|
|
|
|
|
confidence_threshold = st.sidebar.slider( |
|
|
"Detection Confidence", |
|
|
min_value=0.0, |
|
|
max_value=1.0, |
|
|
value=0.45 |
|
|
) |
|
|
|
|
|
|
|
|
input_method = st.sidebar.radio( |
|
|
"Input Source", |
|
|
["Webcam", "Upload Image", "Upload Video"] |
|
|
) |
|
|
|
|
|
|
|
|
last_crash_time = None |
|
|
crash_detected = False |
|
|
crash_severity = "None" |
|
|
crash_analysis = "None" |
|
|
latest_crash_image = None |
|
|
alert_sent = False |
|
|
|
|
|
|
|
|
if 'total_detections' not in st.session_state: |
|
|
st.session_state.total_detections = 0 |
|
|
if 'total_crashes' not in st.session_state: |
|
|
st.session_state.total_crashes = 0 |
|
|
if 'severe_crashes' not in st.session_state: |
|
|
st.session_state.severe_crashes = 0 |
|
|
if 'alerts_sent' not in st.session_state: |
|
|
st.session_state.alerts_sent = 0 |
|
|
|
|
|
|
|
|
col1, col2 = st.columns([2, 1]) |
|
|
|
|
|
|
|
|
map_container = st.container() |
|
|
|
|
|
|
|
|
with col1: |
|
|
frame_placeholder = st.empty() |
|
|
|
|
|
sample_img = np.zeros((480, 640, 3), dtype=np.uint8) |
|
|
frame_placeholder.image(sample_img, channels="BGR") |
|
|
|
|
|
|
|
|
with col2: |
|
|
status_placeholder = st.empty() |
|
|
severity_placeholder = st.empty() |
|
|
analysis_placeholder = st.empty() |
|
|
timestamp_placeholder = st.empty() |
|
|
location_placeholder = st.empty() |
|
|
alert_status_placeholder = st.empty() |
|
|
|
|
|
|
|
|
status_placeholder.markdown(""" |
|
|
<div style="padding: 10px; border-radius: 5px; background-color: #f0f0f0;"> |
|
|
<h3>Status: Monitoring</h3> |
|
|
</div> |
|
|
""", unsafe_allow_html=True) |
|
|
|
|
|
severity_placeholder.markdown(""" |
|
|
<div style="padding: 10px; border-radius: 5px; background-color: #f0f0f0;"> |
|
|
<h4>Crash Severity:</h4> |
|
|
<p>None</p> |
|
|
</div> |
|
|
""", unsafe_allow_html=True) |
|
|
|
|
|
analysis_placeholder.markdown(""" |
|
|
<div style="padding: 10px; border-radius: 5px; background-color: #f0f0f0;"> |
|
|
<h4>Crash Analysis:</h4> |
|
|
<p>None</p> |
|
|
</div> |
|
|
""", unsafe_allow_html=True) |
|
|
|
|
|
timestamp_placeholder.markdown(""" |
|
|
<div style="padding: 10px; border-radius: 5px; background-color: #f0f0f0;"> |
|
|
<h4>Timestamp:</h4> |
|
|
<p>N/A</p> |
|
|
</div> |
|
|
""", unsafe_allow_html=True) |
|
|
|
|
|
location_placeholder.markdown(""" |
|
|
<div style="padding: 10px; border-radius: 5px; background-color: #f0f0f0;"> |
|
|
<h4>Location:</h4> |
|
|
<p>N/A</p> |
|
|
</div> |
|
|
""", unsafe_allow_html=True) |
|
|
|
|
|
|
|
|
with map_container: |
|
|
map_placeholder = st.empty() |
|
|
|
|
|
|
|
|
initial_location = get_current_location() |
|
|
m = folium.Map(location=[initial_location["lat"], initial_location["lng"]], zoom_start=15) |
|
|
folium.Marker( |
|
|
[initial_location["lat"], initial_location["lng"]], |
|
|
popup="Current Location", |
|
|
tooltip="Current Location" |
|
|
).add_to(m) |
|
|
map_placeholder.empty() |
|
|
|
|
|
def update_info_display(custom_analysis=None): |
|
|
"""Update the information display""" |
|
|
global crash_detected, crash_severity, crash_analysis, last_crash_time |
|
|
|
|
|
|
|
|
if crash_detected: |
|
|
status_placeholder.markdown(f""" |
|
|
<div style="padding: 10px; border-radius: 5px; background-color: #FF4B4B; color: white;"> |
|
|
<h3>Status: CRASH DETECTED! 🚨</h3> |
|
|
</div> |
|
|
""", unsafe_allow_html=True) |
|
|
else: |
|
|
status_placeholder.markdown(""" |
|
|
<div style="padding: 10px; border-radius: 5px; background-color: #4CAF50; color: white;"> |
|
|
<h3>Status: Monitoring</h3> |
|
|
</div> |
|
|
""", unsafe_allow_html=True) |
|
|
|
|
|
|
|
|
if crash_detected: |
|
|
severity_color = "#FF4B4B" if crash_severity.lower() == "severe" else "#FFA500" |
|
|
severity_placeholder.markdown(f""" |
|
|
<div style="padding: 10px; border-radius: 5px; background-color: {severity_color}; color: white;"> |
|
|
<h4>Crash Severity:</h4> |
|
|
<p>{crash_severity}</p> |
|
|
</div> |
|
|
""", unsafe_allow_html=True) |
|
|
|
|
|
|
|
|
display_analysis = custom_analysis if custom_analysis else crash_analysis |
|
|
analysis_placeholder.markdown(f""" |
|
|
<div style="padding: 10px; border-radius: 5px; background-color: #f0f0f0;"> |
|
|
<h4>Crash Analysis:</h4> |
|
|
<p>{display_analysis}</p> |
|
|
</div> |
|
|
""", unsafe_allow_html=True) |
|
|
else: |
|
|
severity_placeholder.markdown(""" |
|
|
<div style="padding: 10px; border-radius: 5px; background-color: #f0f0f0;"> |
|
|
<h4>Crash Severity:</h4> |
|
|
<p>None</p> |
|
|
</div> |
|
|
""", unsafe_allow_html=True) |
|
|
|
|
|
analysis_placeholder.markdown(""" |
|
|
<div style="padding: 10px; border-radius: 5px; background-color: #f0f0f0;"> |
|
|
<h4>Crash Analysis:</h4> |
|
|
<p>None</p> |
|
|
</div> |
|
|
""", unsafe_allow_html=True) |
|
|
|
|
|
|
|
|
if last_crash_time: |
|
|
crash_time = datetime.fromtimestamp(last_crash_time).strftime('%Y-%m-%d %H:%M:%S') |
|
|
timestamp_placeholder.markdown(f""" |
|
|
<div style="padding: 10px; border-radius: 5px; background-color: #f0f0f0;"> |
|
|
<h4>Last Crash Detected:</h4> |
|
|
<p>{crash_time}</p> |
|
|
</div> |
|
|
""", unsafe_allow_html=True) |
|
|
|
|
|
|
|
|
location = get_current_location() |
|
|
location_placeholder.markdown(f""" |
|
|
<div style="padding: 10px; border-radius: 5px; background-color: #f0f0f0;"> |
|
|
<h4>Location:</h4> |
|
|
<p>{location['address']}</p> |
|
|
</div> |
|
|
""", unsafe_allow_html=True) |
|
|
|
|
|
|
|
|
m = folium.Map(location=[location["lat"], location["lng"]], zoom_start=15) |
|
|
folium.Marker( |
|
|
[location["lat"], location["lng"]], |
|
|
popup=f"Crash Location<br>Severity: {crash_severity}<br>Time: {crash_time}", |
|
|
tooltip="Crash Location", |
|
|
icon=folium.Icon(color='red', icon='warning-sign') |
|
|
).add_to(m) |
|
|
|
|
|
|
|
|
with map_container: |
|
|
folium_static(m) |
|
|
else: |
|
|
timestamp_placeholder.markdown(""" |
|
|
<div style="padding: 10px; border-radius: 5px; background-color: #f0f0f0;"> |
|
|
<h4>Timestamp:</h4> |
|
|
<p>N/A</p> |
|
|
</div> |
|
|
""", unsafe_allow_html=True) |
|
|
|
|
|
location_placeholder.markdown(""" |
|
|
<div style="padding: 10px; border-radius: 5px; background-color: #f0f0f0;"> |
|
|
<h4>Location:</h4> |
|
|
<p>N/A</p> |
|
|
</div> |
|
|
""", unsafe_allow_html=True) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from camera_input_live import camera_input_live |
|
|
|
|
|
|
|
|
if input_method == "Webcam": |
|
|
|
|
|
|
|
|
|
|
|
col1, col2 = st.sidebar.columns(2) |
|
|
start_button = col1.button("Start Detection") |
|
|
stop_button = col2.button("Stop Detection") |
|
|
|
|
|
if start_button: |
|
|
st.session_state.webcam_running = True |
|
|
|
|
|
if stop_button: |
|
|
st.session_state.webcam_running = False |
|
|
st.success("Detection stopped") |
|
|
|
|
|
|
|
|
if not st.session_state.get("webcam_running", False): |
|
|
status_placeholder.markdown(""" |
|
|
<div style="padding: 10px; border-radius: 5px; background-color: #858585; color: white;"> |
|
|
<h3>Status: Idle</h3> |
|
|
</div> |
|
|
""", unsafe_allow_html=True) |
|
|
|
|
|
|
|
|
if st.session_state.get("webcam_running", False): |
|
|
|
|
|
frame = camera_input_live() |
|
|
|
|
|
|
|
|
if frame is not None: |
|
|
|
|
|
st.image(frame) |
|
|
bytes_data = frame.getvalue() |
|
|
cv2_frame = cv2.imdecode(np.frombuffer(bytes_data, np.uint8), cv2.IMREAD_COLOR) |
|
|
|
|
|
|
|
|
st.session_state.total_detections += 1 |
|
|
detection_result = detect_crash(cv2_frame) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if detection_result["crash_detected"] and not alert_sent: |
|
|
|
|
|
crash_detected = True |
|
|
crash_severity = detection_result["severity"] |
|
|
last_crash_time = time.time() |
|
|
|
|
|
|
|
|
st.session_state.total_crashes += 1 |
|
|
if crash_severity.lower() == "severe": |
|
|
st.session_state.severe_crashes += 1 |
|
|
|
|
|
|
|
|
crash_analysis = assess_crash_severity(cv2_frame, detection_result) |
|
|
|
|
|
|
|
|
update_info_display() |
|
|
|
|
|
|
|
|
location = get_current_location() |
|
|
crash_data = { |
|
|
"timestamp": datetime.fromtimestamp(last_crash_time).strftime('%Y-%m-%d %H:%M:%S'), |
|
|
"severity": crash_severity, |
|
|
"analysis": crash_analysis, |
|
|
"location": location, |
|
|
"raw_detection": detection_result["raw_result"] |
|
|
} |
|
|
|
|
|
success, message = send_crash_alert_twilio(crash_data) |
|
|
if success: |
|
|
st.session_state.alerts_sent += 1 |
|
|
alert_status_placeholder.success(f"Alert sent: {message}") |
|
|
alert_sent = True |
|
|
else: |
|
|
alert_status_placeholder.error(f"Alert failed: {message}") |
|
|
|
|
|
|
|
|
if not detection_result["crash_detected"]: |
|
|
status_placeholder.markdown(""" |
|
|
<div style="padding: 10px; border-radius: 5px; background-color: #4CAF50; color: white;"> |
|
|
<h3>Status: Monitoring</h3> |
|
|
</div> |
|
|
""", unsafe_allow_html=True) |
|
|
|
|
|
|
|
|
elif input_method == "Upload Image": |
|
|
uploaded_file = st.sidebar.file_uploader("Choose an image...", type=["jpg", "jpeg", "png"]) |
|
|
|
|
|
if uploaded_file is not None: |
|
|
|
|
|
image_bytes = uploaded_file.read() |
|
|
image = cv2.imdecode(np.frombuffer(image_bytes, np.uint8), cv2.IMREAD_COLOR) |
|
|
|
|
|
|
|
|
frame_placeholder.image(image, channels="BGR", caption="Uploaded Image") |
|
|
|
|
|
|
|
|
if st.sidebar.button("Process Image"): |
|
|
|
|
|
st.session_state.total_detections += 1 |
|
|
|
|
|
|
|
|
with st.spinner("Processing image..."): |
|
|
detection_result = detect_crash(image) |
|
|
|
|
|
|
|
|
annotated_image = detection_result["annotated_image"] |
|
|
frame_placeholder.image(annotated_image, channels="BGR", caption="Processed Image") |
|
|
|
|
|
|
|
|
if detection_result["crash_detected"]: |
|
|
|
|
|
crash_detected = True |
|
|
crash_severity = detection_result["severity"] |
|
|
last_crash_time = time.time() |
|
|
|
|
|
|
|
|
st.session_state.total_crashes += 1 |
|
|
if crash_severity.lower() == "severe": |
|
|
st.session_state.severe_crashes += 1 |
|
|
|
|
|
|
|
|
crash_analysis = assess_crash_severity(image, detection_result) |
|
|
|
|
|
|
|
|
update_info_display() |
|
|
|
|
|
|
|
|
location = get_current_location() |
|
|
crash_data = { |
|
|
"timestamp": datetime.fromtimestamp(last_crash_time).strftime('%Y-%m-%d %H:%M:%S'), |
|
|
"severity": crash_severity, |
|
|
"analysis": crash_analysis, |
|
|
"location": location, |
|
|
"raw_detection": detection_result["raw_result"] |
|
|
} |
|
|
|
|
|
success, message = send_crash_alert_twilio(crash_data) |
|
|
if success: |
|
|
st.session_state.alerts_sent += 1 |
|
|
alert_status_placeholder.success(f"Alert sent: {message}") |
|
|
else: |
|
|
alert_status_placeholder.error(f"Alert failed: {message}") |
|
|
else: |
|
|
st.info("No crash detected in this image.") |
|
|
|
|
|
|
|
|
status_placeholder.markdown(""" |
|
|
<div style="padding: 10px; border-radius: 5px; background-color: #4CAF50; color: white;"> |
|
|
<h3>Status: Monitoring</h3> |
|
|
</div> |
|
|
""", unsafe_allow_html=True) |
|
|
|
|
|
|
|
|
elif input_method == "Upload Video": |
|
|
uploaded_file = st.sidebar.file_uploader("Choose a video...", type=["mp4", "avi", "mov"]) |
|
|
|
|
|
if uploaded_file is not None: |
|
|
|
|
|
tfile = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') |
|
|
tfile.write(uploaded_file.read()) |
|
|
tfile_path = tfile.name |
|
|
tfile.close() |
|
|
|
|
|
if st.sidebar.button("Process Video"): |
|
|
try: |
|
|
cap = cv2.VideoCapture(tfile_path) |
|
|
|
|
|
if not cap.isOpened(): |
|
|
st.error("Cannot open video file") |
|
|
else: |
|
|
fps = cap.get(cv2.CAP_PROP_FPS) |
|
|
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
|
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
|
|
|
processed_frames = [] |
|
|
crash_frames = [] |
|
|
|
|
|
progress_bar = st.progress(0) |
|
|
status_text = st.empty() |
|
|
status_text.text(f"Analyzing video... (0/{frame_count} frames)") |
|
|
|
|
|
frame_number = 0 |
|
|
while True: |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
|
|
|
st.session_state.total_detections += 1 |
|
|
detection_result = detect_crash(frame) |
|
|
|
|
|
processed_frames.append({ |
|
|
"frame": detection_result["annotated_image"], |
|
|
"result": detection_result |
|
|
}) |
|
|
|
|
|
if detection_result["crash_detected"]: |
|
|
crash_frames.append({ |
|
|
"frame_number": frame_number, |
|
|
"frame": frame.copy(), |
|
|
"detection_result": detection_result, |
|
|
"timestamp": time.time(), |
|
|
"severity": detection_result["severity"] |
|
|
}) |
|
|
|
|
|
frame_number += 1 |
|
|
if frame_number % 5 == 0 or frame_number == frame_count: |
|
|
progress_value = min(frame_number / frame_count, 1.0) |
|
|
progress_bar.progress(progress_value) |
|
|
status_text.text(f"Analyzing video... ({frame_number}/{frame_count} frames)") |
|
|
|
|
|
cap.release() |
|
|
|
|
|
status_text.text("Creating processed video...") |
|
|
output_video_path = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4').name |
|
|
|
|
|
if processed_frames: |
|
|
first_frame = processed_frames[0]["frame"] |
|
|
h, w = first_frame.shape[:2] |
|
|
|
|
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
|
|
out = cv2.VideoWriter(output_video_path, fourcc, fps, (w, h)) |
|
|
|
|
|
for frame_data in processed_frames: |
|
|
out.write(frame_data["frame"]) |
|
|
|
|
|
out.release() |
|
|
time.sleep(0.5) |
|
|
|
|
|
try: |
|
|
with open(output_video_path, 'rb') as video_file: |
|
|
video_bytes = video_file.read() |
|
|
|
|
|
|
|
|
b64 = base64.b64encode(video_bytes).decode() |
|
|
href = f'<a href="data:video/mp4;base64,{b64}" download="processed_video.mp4">📥 Click here to download the processed video</a>' |
|
|
|
|
|
status_text.empty() |
|
|
progress_bar.empty() |
|
|
st.markdown(href, unsafe_allow_html=True) |
|
|
|
|
|
except Exception as e: |
|
|
st.error(f"Error preparing video download: {e}") |
|
|
|
|
|
st.success(f"Video analysis complete. {len(processed_frames)} frames processed, {len(crash_frames)} crashes detected.") |
|
|
|
|
|
if crash_frames: |
|
|
crash_frames.sort(key=lambda x: x["frame_number"]) |
|
|
last_crash = crash_frames[-1] |
|
|
|
|
|
frame_placeholder.image( |
|
|
last_crash["detection_result"]["annotated_image"], |
|
|
channels="BGR", |
|
|
caption=f"Last Detected Crash (Frame {last_crash['frame_number']})", |
|
|
use_column_width=True |
|
|
) |
|
|
|
|
|
with st.spinner("Analyzing crash severity..."): |
|
|
crash_analysis = assess_crash_severity(last_crash["frame"], last_crash["detection_result"]) |
|
|
|
|
|
crash_detected = True |
|
|
crash_severity = last_crash["severity"] |
|
|
last_crash_time = last_crash["timestamp"] |
|
|
|
|
|
st.session_state.total_crashes += len(crash_frames) |
|
|
severe_count = sum(1 for crash in crash_frames if crash["severity"].lower() == "severe") |
|
|
st.session_state.severe_crashes += severe_count |
|
|
|
|
|
update_info_display(custom_analysis=crash_analysis) |
|
|
|
|
|
location = get_current_location() |
|
|
|
|
|
crash_data = { |
|
|
"timestamp": datetime.fromtimestamp(last_crash_time).strftime('%Y-%m-%d %H:%M:%S'), |
|
|
"severity": crash_severity, |
|
|
"analysis": crash_analysis, |
|
|
"location": location, |
|
|
"raw_detection": last_crash["detection_result"]["raw_result"] |
|
|
} |
|
|
|
|
|
success, message = send_crash_alert_twilio(crash_data) |
|
|
if success: |
|
|
st.session_state.alerts_sent += 1 |
|
|
alert_status_placeholder.success(f"Alert sent: {message}") |
|
|
else: |
|
|
alert_status_placeholder.error(f"Alert failed: {message}") |
|
|
else: |
|
|
st.info("No crashes detected in this video.") |
|
|
|
|
|
try: |
|
|
os.remove(output_video_path) |
|
|
except: |
|
|
pass |
|
|
|
|
|
except Exception as e: |
|
|
st.error(f"Error processing video: {e}") |
|
|
st.exception(e) |
|
|
finally: |
|
|
try: |
|
|
if os.path.exists(tfile_path): |
|
|
os.remove(tfile_path) |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
st.markdown("---") |
|
|
col1, col2, col3, col4 = st.columns(4) |
|
|
|
|
|
with col1: |
|
|
st.markdown(f""" |
|
|
<div style="padding: 10px; border-radius: 5px; background-color: #f0f0f0; text-align: center;"> |
|
|
<h4>Total Detections</h4> |
|
|
<h2>{st.session_state.total_detections}</h2> |
|
|
</div> |
|
|
""", unsafe_allow_html=True) |
|
|
|
|
|
with col2: |
|
|
st.markdown(f""" |
|
|
<div style="padding: 10px; border-radius: 5px; background-color: #f0f0f0; text-align: center;"> |
|
|
<h4>Total Crashes</h4> |
|
|
<h2>{st.session_state.total_crashes}</h2> |
|
|
</div> |
|
|
""", unsafe_allow_html=True) |
|
|
|
|
|
with col3: |
|
|
st.markdown(f""" |
|
|
<div style="padding: 10px; border-radius: 5px; background-color: #f0f0f0; text-align: center;"> |
|
|
<h4>Severe Crashes</h4> |
|
|
<h2>{st.session_state.severe_crashes}</h2> |
|
|
</div> |
|
|
""", unsafe_allow_html=True) |
|
|
|
|
|
with col4: |
|
|
st.markdown(f""" |
|
|
<div style="padding: 10px; border-radius: 5px; background-color: #f0f0f0; text-align: center;"> |
|
|
<h4>SMS Alerts Sent</h4> |
|
|
<h2>{st.session_state.alerts_sent}</h2> |
|
|
</div> |
|
|
""", unsafe_allow_html=True) |
|
|
|
|
|
|
|
|
st.markdown("---") |
|
|
st.markdown(""" |
|
|
<p style='text-align: center;'>Vehicle Crash Detection and Severity Assessment System</p> |
|
|
<p style='text-align: center;'>© 2025</p> |
|
|
""", unsafe_allow_html=True) |