|
|
from ultralytics import YOLO |
|
|
import base64 |
|
|
import cv2 |
|
|
import io |
|
|
import numpy as np |
|
|
from ultralytics.utils.plotting import Annotator |
|
|
import streamlit as st |
|
|
from streamlit_image_coordinates import streamlit_image_coordinates |
|
|
import pandas as pd |
|
|
import ollama |
|
|
import bs4 |
|
|
import tempfile |
|
|
|
|
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
|
from langchain_community.document_loaders import WebBaseLoader |
|
|
from langchain_community.document_loaders import CSVLoader |
|
|
from langchain_community.vectorstores import Chroma |
|
|
from langchain_community.embeddings import OllamaEmbeddings |
|
|
from langchain_core.output_parsers import StrOutputParser |
|
|
from langchain_core.runnables import RunnablePassthrough |
|
|
|
|
|
def set_background(image_file1,image_file2): |
|
|
|
|
|
with open(image_file1, "rb") as f: |
|
|
img_data1 = f.read() |
|
|
b64_encoded1 = base64.b64encode(img_data1).decode() |
|
|
with open(image_file2, "rb") as f: |
|
|
img_data2 = f.read() |
|
|
b64_encoded2 = base64.b64encode(img_data2).decode() |
|
|
style = f""" |
|
|
<style> |
|
|
.stApp{{ |
|
|
background-image: url(data:image/png;base64,{b64_encoded1}); |
|
|
background-size: cover; |
|
|
|
|
|
}} |
|
|
.st-emotion-cache-6qob1r{{ |
|
|
background-image: url(data:image/png;base64,{b64_encoded2}); |
|
|
background-size: cover; |
|
|
border: 5px solid rgb(14, 17, 23); |
|
|
|
|
|
}} |
|
|
</style> |
|
|
""" |
|
|
st.markdown(style, unsafe_allow_html=True) |
|
|
|
|
|
set_background('pngtree-city-map-navigation-interface-picture-image_1833642.png','2024-05-18_14-57-09_5235.png') |
|
|
|
|
|
st.title("Traffic Flow and Optimization Toolkit") |
|
|
|
|
|
sb = st.sidebar |
|
|
|
|
|
sb.markdown("🛰️ **Navigation**") |
|
|
page_names = ["PS1", "PS2", "PS3","Chat with Results"] |
|
|
page = sb.radio("", page_names, index=0) |
|
|
st.session_state['n'] = sb.slider("Number of ROIs",1,5) |
|
|
|
|
|
if page == 'PS1': |
|
|
uploaded_file = st.file_uploader("Choose a video...", type=["mp4", "mpeg"]) |
|
|
model = YOLO('yolov8n.pt') |
|
|
if uploaded_file is not None: |
|
|
with tempfile.NamedTemporaryFile(delete=False) as temp: |
|
|
temp.write(uploaded_file.read()) |
|
|
if 'roi_list1' not in st.session_state: |
|
|
st.session_state['roi_list1'] = [] |
|
|
if "all_rois1" not in st.session_state: |
|
|
st.session_state['all_rois1'] = [] |
|
|
classes = model.names |
|
|
|
|
|
done_1 = st.button('Selection Done') |
|
|
|
|
|
while len(st.session_state["all_rois1"]) < st.session_state['n']: |
|
|
cap = cv2.VideoCapture(temp.name) |
|
|
while not done_1: |
|
|
ret,frame=cap.read() |
|
|
cv2.putText(frame,'SELECT ROI',(100,100),cv2.FONT_HERSHEY_SIMPLEX, 1,(0,0,255),4) |
|
|
if not ret: |
|
|
st.write('ROI selection unsuccessfull') |
|
|
break |
|
|
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
|
value = streamlit_image_coordinates(frame,key='numpy',width=750) |
|
|
st.session_state["roi_list1"].append([int(value['x']*2.55),int(value['y']*2.55)]) |
|
|
st.write(st.session_state["roi_list1"]) |
|
|
if cv2.waitKey(0)&0xFF==27: |
|
|
break |
|
|
cap.release() |
|
|
st.session_state["all_rois1"].append(st.session_state["roi_list1"]) |
|
|
st.session_state["roi_list1"] = [] |
|
|
done_1 = False |
|
|
|
|
|
st.write('ROI indices: ',st.session_state["all_rois1"][0]) |
|
|
|
|
|
|
|
|
|
|
|
cap = cv2.VideoCapture(temp.name) |
|
|
st.write("Detection started") |
|
|
st.session_state['fps'] = cap.get(cv2.CAP_PROP_FPS) |
|
|
st.write(f"FPS OF VIDEO: {st.session_state['fps']}") |
|
|
avg_list = [] |
|
|
count = 0 |
|
|
frame_placeholder = st.empty() |
|
|
st.session_state["data1"] = {} |
|
|
for i in range(len(st.session_state["all_rois1"])): |
|
|
st.session_state["data1"][f"ROI{i}"] = [] |
|
|
while cap.isOpened(): |
|
|
ret,frame=cap.read() |
|
|
if not ret: |
|
|
break |
|
|
count += 1 |
|
|
if count % 3 != 0: |
|
|
continue |
|
|
k = 0 |
|
|
for roi_list_here1 in st.session_state["all_rois1"]: |
|
|
max = [0,0] |
|
|
min = [10000,10000] |
|
|
roi_list_here = roi_list_here1[1:] |
|
|
for i in range(len(roi_list_here)): |
|
|
if roi_list_here[i][0] > max[0]: |
|
|
max[0] = roi_list_here[i][0] |
|
|
if roi_list_here[i][1] > max[1]: |
|
|
max[1] = roi_list_here[i][1] |
|
|
if roi_list_here[i][0] < min[0]: |
|
|
min[0] = roi_list_here[i][0] |
|
|
if roi_list_here[i][1] < min[1]: |
|
|
min[1] = roi_list_here[i][1] |
|
|
frame_cropped = frame[min[1]:max[1],min[0]:max[0]] |
|
|
roi_corners = np.array([roi_list_here],dtype=np.int32) |
|
|
mask = np.zeros(frame.shape,dtype=np.uint8) |
|
|
mask.fill(255) |
|
|
channel_count = frame.shape[2] |
|
|
ignore_mask_color = (255,)*channel_count |
|
|
cv2.fillPoly(mask,roi_corners,0) |
|
|
mask_cropped = mask[min[1]:max[1],min[0]:max[0]] |
|
|
roi = cv2.bitwise_or(frame_cropped,mask_cropped) |
|
|
|
|
|
|
|
|
number = [] |
|
|
results = model.predict(roi) |
|
|
for r in results: |
|
|
boxes = r.boxes |
|
|
counter = 0 |
|
|
for box in boxes: |
|
|
counter += 1 |
|
|
name = classes[box.cls.numpy()[0]] |
|
|
conf = str(round(box.conf.numpy()[0],2)) |
|
|
text = name+""+conf |
|
|
bbox = box.xyxy[0].numpy() |
|
|
cv2.rectangle(frame,(int(bbox[0])+min[0],int(bbox[1])+min[1]),(int(bbox[2])+min[0],int(bbox[3])+min[1]),(0,255,0),2) |
|
|
cv2.putText(frame,text,(int(bbox[0])+min[0],int(bbox[1])+min[1]-5),cv2.FONT_HERSHEY_SIMPLEX, 1,(0,0,255),2) |
|
|
number.append(counter) |
|
|
avg = sum(number)/len(number) |
|
|
stats = str(round(avg,2)) |
|
|
if count%10 == 0: |
|
|
st.session_state["data1"][f"ROI{k}"].append(avg) |
|
|
k+=1 |
|
|
cv2.putText(frame,stats,(min[0],min[1]),cv2.FONT_HERSHEY_SIMPLEX, 1,(0,0,0),4) |
|
|
cv2.polylines(frame,roi_corners,True,(255,0,0),2) |
|
|
cv2.putText(frame,'The average number of vehicles in the Regions of Interest',(100,100),cv2.FONT_HERSHEY_SIMPLEX, 1,(0,0,255),4) |
|
|
frame_placeholder.image(frame,channels='BGR') |
|
|
cap.release() |
|
|
st.write("The resultant data is:") |
|
|
st.write(st.session_state.data1) |
|
|
else: |
|
|
st.error('PLEASE UPLOAD', icon="🚨") |
|
|
|
|
|
elif page == "PS3": |
|
|
uploaded_file1 = st.file_uploader("Choose a video...", type=["mp4", "mpeg"]) |
|
|
model1 = YOLO("yolov8n.pt") |
|
|
model2 = YOLO("best.pt") |
|
|
if uploaded_file1 is not None: |
|
|
with tempfile.NamedTemporaryFile(delete=False) as temp: |
|
|
temp.write(uploaded_file1.read()) |
|
|
if 'roi_list2' not in st.session_state: |
|
|
st.session_state['roi_list2'] = [] |
|
|
if "all_rois2" not in st.session_state: |
|
|
st.session_state['all_rois2'] = [] |
|
|
classes = model1.names |
|
|
|
|
|
done_2 = st.button('Selection Done') |
|
|
|
|
|
while len(st.session_state["all_rois2"]) < st.session_state['n']: |
|
|
cap = cv2.VideoCapture(temp.name) |
|
|
while not done_2: |
|
|
ret,frame=cap.read() |
|
|
cv2.putText(frame,'SELECT ROI',(100,100),cv2.FONT_HERSHEY_SIMPLEX, 1,(0,0,255),4) |
|
|
if not ret: |
|
|
st.write('ROI selection has concluded') |
|
|
break |
|
|
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
|
value = streamlit_image_coordinates(frame,key='numpy',width=750) |
|
|
st.session_state["roi_list2"].append([int(value['x']*2.5),int(value['y']*2.5)]) |
|
|
st.write(st.session_state["roi_list2"]) |
|
|
if cv2.waitKey(0)&0xFF==27: |
|
|
break |
|
|
cap.release() |
|
|
st.session_state["all_rois2"].append(st.session_state["roi_list2"]) |
|
|
st.session_state["roi_list2"] = [] |
|
|
done_2 = False |
|
|
|
|
|
st.write('ROI indices: ',st.session_state["all_rois2"][0]) |
|
|
|
|
|
|
|
|
|
|
|
cap = cv2.VideoCapture(temp.name) |
|
|
st.write("Detection started") |
|
|
avg_list = [] |
|
|
count = 0 |
|
|
frame_placeholder = st.empty() |
|
|
st.session_state.data = {} |
|
|
for i in range(len(st.session_state["all_rois2"])): |
|
|
st.session_state["data"][f"ROI{i}"] = [] |
|
|
for i in range(len(st.session_state['all_rois2'])): |
|
|
st.session_state.data[f"ROI{i}"] = [] |
|
|
while cap.isOpened(): |
|
|
ret,frame=cap.read() |
|
|
if not ret: |
|
|
break |
|
|
count += 1 |
|
|
if count % 3 != 0: |
|
|
continue |
|
|
|
|
|
k = 0 |
|
|
for roi_list_here1 in st.session_state["all_rois2"]: |
|
|
max = [0,0] |
|
|
min = [10000,10000] |
|
|
roi_list_here = roi_list_here1[1:] |
|
|
for i in range(len(roi_list_here)-1): |
|
|
if roi_list_here[i][0] > max[0]: |
|
|
max[0] = roi_list_here[i][0] |
|
|
if roi_list_here[i][1] > max[1]: |
|
|
max[1] = roi_list_here[i][1] |
|
|
if roi_list_here[i][0] < min[0]: |
|
|
min[0] = roi_list_here[i][0] |
|
|
if roi_list_here[i][1] < min[1]: |
|
|
min[1] = roi_list_here[i][1] |
|
|
frame_cropped = frame[min[1]:max[1],min[0]:max[0]] |
|
|
roi_corners = np.array([roi_list_here],dtype=np.int32) |
|
|
mask = np.zeros(frame.shape,dtype=np.uint8) |
|
|
mask.fill(255) |
|
|
channel_count = frame.shape[2] |
|
|
ignore_mask_color = (255,)*channel_count |
|
|
cv2.fillPoly(mask,roi_corners,0) |
|
|
mask_cropped = mask[min[1]:max[1],min[0]:max[0]] |
|
|
roi = cv2.bitwise_or(frame_cropped,mask_cropped) |
|
|
|
|
|
|
|
|
number = [] |
|
|
results = model1.predict(roi) |
|
|
results_pothole = model2.predict(source=frame) |
|
|
for r in results: |
|
|
boxes = r.boxes |
|
|
counter = 0 |
|
|
for box in boxes: |
|
|
counter += 1 |
|
|
name = classes[box.cls.numpy()[0]] |
|
|
conf = str(round(box.conf.numpy()[0],2)) |
|
|
text = name+conf |
|
|
bbox = box.xyxy[0].numpy() |
|
|
cv2.rectangle(frame,(int(bbox[0])+min[0],int(bbox[1])+min[1]),(int(bbox[2])+min[0],int(bbox[3])+min[1]),(0,255,0),2) |
|
|
cv2.putText(frame,text,(int(bbox[0])+min[0],int(bbox[1])+min[1]-5),cv2.FONT_HERSHEY_SIMPLEX, 0.4,(0,0,255),2) |
|
|
number.append(counter) |
|
|
for r in results_pothole: |
|
|
masks = r.masks |
|
|
boxes = r.boxes.cpu().numpy() |
|
|
xyxys = boxes.xyxy |
|
|
confs = boxes.conf |
|
|
if masks is not None: |
|
|
shapes = np.ones_like(frame) |
|
|
for mask,conf,xyxy in zip(masks,confs,xyxys): |
|
|
polygon = mask.xy[0] |
|
|
if conf >= 0.49 and len(polygon)>=3: |
|
|
cv2.fillPoly(shapes,pts=np.int32([polygon]),color=(0,0,255,0.5)) |
|
|
frame = cv2.addWeighted(frame,0.7,shapes,0.3,gamma=0) |
|
|
cv2.rectangle(frame,(int(xyxy[0]),int(xyxy[1])),(int(xyxy[2]),int(xyxy[3])),(0,0,255),2) |
|
|
cv2.putText(frame,'Pothole '+str(conf),(int(xyxy[0]),int(xyxy[1])-5),cv2.FONT_HERSHEY_SIMPLEX, 1,(0,0,255),2) |
|
|
|
|
|
avg = sum(number)/len(number) |
|
|
stats = str(round(avg,2)) |
|
|
if count % 10 == 0: |
|
|
st.session_state.data[f"ROI{k}"].append(avg) |
|
|
k+=1 |
|
|
cv2.putText(frame,stats,(min[0],min[1]),cv2.FONT_HERSHEY_SIMPLEX, 1,(0,0,0),4) |
|
|
cv2.polylines(frame,roi_corners,True,(255,0,0),2) |
|
|
if counter >= 5: |
|
|
cv2.putText(frame,'!!CONGESTION MORE THAN '+str(counter)+' Objects',(min[0]+20,min[1]+20),cv2.FONT_HERSHEY_SIMPLEX, 1,(0,0,0),4) |
|
|
cv2.polylines(frame,roi_corners,True,(255,0,0),2) |
|
|
cv2.putText(frame,'Objects in the Regions of Interest',(100,100),cv2.FONT_HERSHEY_SIMPLEX, 1,(0,0,255),4) |
|
|
frame_placeholder.image(frame,channels='BGR') |
|
|
cap.release() |
|
|
st.write("The result is:") |
|
|
st.write(st.session.data) |
|
|
|
|
|
else: |
|
|
st.error('PLEASE UPLOAD', icon="🚨") |
|
|
|
|
|
elif page == "PS2": |
|
|
st.header("CLICK ON RUN SCRIPT TO START A TRAFFIC SIMULATION") |
|
|
script = st.button("RUN SCRIPT") |
|
|
st.error("This simulation, Unfortunately was not optimized for deployed but still works for locally run version", icon="🚨") |
|
|
st.write("check out this [link](https://github.com/pranavsrinivasa/Traffic-Optimization-Projects)") |
|
|
|
|
|
elif page == "Chat with Results": |
|
|
st.title('Chat with the Results') |
|
|
st.write("Please upload the relevant CSV data to get started") |
|
|
|
|
|
st.error("This model, Unfortunately was not optimized for deployed but still works for locally run version", icon="🚨") |
|
|
st.write("check out this [link](https://github.com/pranavsrinivasa/Traffic-Optimization-Projects)") |
|
|
|