Spaces:
Runtime error
Runtime error
| #https://streamlit-webrtc-example.herokuapp.com/ | |
| #https://github.com/amineHY/WebApp-Computer-Vision-streamlit | |
| import streamlit as st | |
| import os | |
| from os import listdir | |
| import wget | |
| from PIL import Image | |
| import io | |
| import numpy as np | |
| import cv2 | |
| import itertools | |
| import sys | |
| from deepface import DeepFace | |
| def load_model(): | |
| wpath = 'test_detection/yolov5/weights/crowdhuman_yolov5m.pt' | |
| if not os.path.exists(wpath): | |
| os.system("python -m pip install numpy torch pandas Pillow opencv-python-headless PyYAML>=5.3.1 torchvision>=0.8.1 matplotlib seaborn>=0.11.0 easydict") | |
| with st.spinner('Downloading model weights for crowdhuman_yolov5m'): | |
| os.system('wget -nc https://github.com/mikel-brostrom/Yolov5_DeepSort_Pytorch/releases/download/v.2.0/crowdhuman_yolov5m.pt -O test_detection/yolov5/weights/crowdhuman_yolov5m.pt') | |
| else: | |
| print("Model is here.") | |
| #=================================================================================== | |
| # Ft saving uploaded video to directory | |
| def save_uploaded_vid(uploadedfile): | |
| with open(os.path.join("test_detection/data", uploadedfile.name),"wb") as f: | |
| f.write(uploadedfile.getbuffer()) | |
| return #st.success("Video saved in data dir ") | |
| #@st.cache(ttl=3600, max_entries=10) | |
| def load_output_video(vid): | |
| if isinstance(vid, str): | |
| video = open(vid, 'rb') | |
| else: | |
| video = vid.read() | |
| vname = vid.name | |
| save_uploaded_vid(vid) | |
| return video | |
| def starter(): | |
| st.image('test_detection/data/LOGOGlob.png', width = 400) | |
| st.title("Test of Person detection") | |
| st.text("") | |
| st.text("") | |
| #st.success("Welcome! Please upload a video!") | |
| args = { 'ParisManif' : 'ParisManif.mp4' } | |
| vid_upload = st.file_uploader(label= 'Welcome! Please upload a video! ', type = ['mp4', 'avi']) | |
| vid_open = "test_detection/data/"+args['ParisManif'] if vid_upload is None else vid_upload | |
| vname = args['ParisManif'] if vid_upload is None else vid_upload.name | |
| video = load_output_video(vid_open) | |
| st.video(video) | |
| vidcap = cv2.VideoCapture( "test_detection/data/"+vname) | |
| #frames = cv.get_frames("data/"+vname) | |
| success, frame0 = vidcap.read() | |
| frame0 = cv2.cvtColor(frame0, cv2.COLOR_BGR2RGB) | |
| return vname, frame0 | |
| #=================================================================================== | |
| def prediction(vname): | |
| vpath='test_detection/data/'+vname | |
| wpath = 'test_detection/yolov5/weights/crowdhuman_yolov5m.pt' | |
| if os.path.exists(wpath): | |
| with st.spinner('Running detection...'): | |
| st.write('vpath befor calling track : ', vpath) | |
| os.system("python test_detection/track.py --yolo_weights test_detection/yolov5/weights/crowdhuman_yolov5m.pt --img 352 --save-vid --save-txt --classes 1 --conf-thres 0.4 --source " + vpath) | |
| path = 'inference/output/' | |
| #st.write(os.path.exists(path), os.path.exists('inference/output/output'+vname)) | |
| st.write(os.listdir('test_detection/')) | |
| st.write(os.listdir(path)) | |
| with st.spinner('Video loading...'): | |
| os.system("ffmpeg -i inference/output/"+vname + " -vcodec libx264 -y inference/output/output_"+vname) | |
| path = 'inference/output/output_'+vname | |
| if os.path.exists(path): | |
| video_file = open(path, 'rb') | |
| video_bytes = video_file.read() | |
| st.video(video_bytes) | |
| #=================================================================================== | |
| def extract_heads(filepath, frame0): | |
| nbperson = 0 | |
| listhead = [] | |
| if os.path.exists(filepath): | |
| #st.write("filepath : ", filepath) | |
| array_from_file = np.loadtxt(filepath, dtype=int) | |
| #st.write('np of array load : ', array_from_file.shape) | |
| array_from_file = np.delete(array_from_file,np.s_[7:10], axis=1) | |
| nbperson = np.unique(array_from_file[:,1]).shape[0] | |
| st.subheader('Display some detected heads :') | |
| st.write(' Number of detected heads : ', nbperson ) | |
| rows = 5 | |
| cols = 5 | |
| nbheads = rows*cols | |
| frame = frame0 | |
| cont = array_from_file | |
| for a in range(nbheads): | |
| numh = a | |
| head = frame[cont[numh][3]:cont[numh][3]+cont[numh][5],cont[numh][2]:cont[numh][2]+cont[numh][4],:] | |
| listhead.append(head) | |
| #st.write('Len of liste heads : ', len(listhead)) | |
| return nbperson, listhead | |
| #********************************************************************************************* | |
| def display_heads_(nbperson, listhead): | |
| rows_ = 5 | |
| cols_ = 5 | |
| k = 0 | |
| for i in range(1, rows_): | |
| cols = st.columns(cols_) | |
| for j in range(1, cols_): | |
| k = k + 1 | |
| cols[j].header("Person " + str(k)) | |
| cols[j].image(listhead[k], use_column_width=True, width=150) | |
| return | |
| #********************************************************************************************* | |
| def display_heads_analysis(nbperson, listhead): | |
| rows_ = 5 | |
| cols_ = 5 | |
| k = 0 | |
| for i in range(1, rows_): | |
| cols = st.columns(cols_) | |
| for j in range(1, cols_): | |
| k = k + 1 | |
| cols[j].header("Person " + str(k)) | |
| obj = DeepFace.analyze(img_path = listhead[k], actions = ['age', 'gender'], enforce_detection=False) | |
| #capt = 'age : '+ obj['age'] + ', gender : ' + obj['gender'] | |
| capt = 'age : '+ str(obj['age']) + ', gender : ' + str(obj['gender']) | |
| cols[j].image(listhead[k], use_column_width=True, width=150, caption = capt ) | |
| return | |
| #=================================================================================== | |
| def main(): | |
| vname, frame0 = starter() | |
| nbperson = 0 | |
| if st.button('Heads detection!'): | |
| prediction(vname) | |
| filepath = 'inference/output/'+vname | |
| filepath = filepath[:-3]+'txt' | |
| st.success("Click again to retry or try a different video by uploading") | |
| nbperson, listhead = extract_heads(filepath, frame0) | |
| display_heads_(nbperson, listhead) | |
| #st.subheader('Continue to face analysis !') | |
| #display_heads_analysis(nbperson, listhead) | |
| return | |
| if __name__ == '__main__': | |
| os.system('git clone --recurse-submodules https://github.com/nnassime/test_detection.git') | |
| load_model() | |
| st.write(os.listdir('test_detection/')) | |
| st.write(os.listdir('test_detection/yolov5/weights/')) | |
| st.write(os.listdir('test_detection/deep_sort_pytorch/deep_sort/deep/checkpoint/')) | |
| main() | |