Spaces:
Build error
Build error
| import os | |
| os.environ["MY_ENV_VARIABLE"] = "True" | |
| os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE" | |
| import pandas as pd | |
| import numpy as np | |
| import cv2 | |
| import json | |
| import subprocess | |
| from deep_sort_realtime.deepsort_tracker import DeepSort | |
| from _collections import deque | |
| from stqdm import stqdm | |
| from collections import Counter | |
| import time | |
| import shutil | |
| from ultralytics import YOLO | |
| from ultralytics.engine.results import Results | |
| from model_utils import get_yolo, get_system_stat | |
| from streamlit_webrtc import RTCConfiguration, VideoTransformerBase, webrtc_streamer | |
| from DistanceEstimation import * | |
| from streamlit_autorefresh import st_autorefresh | |
| import av | |
| import torch | |
| import intel_extension_for_pytorch as ipex | |
| from pathlib import Path | |
| import openvino as ov | |
| import streamlit as st | |
| # colors for visualization for image visualization | |
| COLORS = [(56, 56, 255), (151, 157, 255), (31, 112, 255), (29, 178, 255), (49, 210, 207), (10, 249, 72), (23, 204, 146), | |
| (134, 219, 61), (52, 147, 26), (187, 212, 0), (168, 153, 44), (255, 194, 0), (147, 69, 52), (255, 115, 100), | |
| (236, 24, 0), (255, 56, 132), (133, 0, 82), (255, 56, 203), (200, 149, 255), (199, 55, 255)] | |
| def result_to_json(result: Results, tracker=None): | |
| """ | |
| Convert result from ultralytics YOLOv8 prediction to json format | |
| Parameters: | |
| result: Results from ultralytics YOLOv8 prediction | |
| tracker: DeepSort tracker | |
| Returns: | |
| result_list_json: detection result in json format | |
| """ | |
| len_results = len(result.boxes) | |
| result_list_json = [ | |
| { | |
| 'class_id': int(result.boxes.cls[idx]), | |
| 'class': result.names[int(result.boxes.cls[idx])], | |
| 'confidence': float(result.boxes.conf[idx]), | |
| 'bbox': { | |
| 'x_min': int(result.boxes.xyxy[idx][0]), | |
| 'y_min': int(result.boxes.xyxy[idx][1]), | |
| 'x_max': int(result.boxes.xyxy[idx][2]), | |
| 'y_max': int(result.boxes.xyxy[idx][3]), | |
| }, | |
| } for idx in range(len_results) | |
| ] | |
| if result.masks is not None: | |
| for idx in range(len_results): | |
| result_list_json[idx]['mask'] = cv2.resize(result.masks.data[idx].cpu().numpy(),(result.orig_shape[1], result.orig_shape[0])).tolist() | |
| result_list_json[idx]['segments'] = [seg.tolist() for seg in result.masks.xy[idx]] | |
| if tracker is not None: | |
| bbs = [ | |
| ( | |
| [ | |
| result_list_json[idx]['bbox']['x_min'], | |
| result_list_json[idx]['bbox']['y_min'], | |
| result_list_json[idx]['bbox']['x_max'] - result_list_json[idx]['bbox']['x_min'], | |
| result_list_json[idx]['bbox']['y_max'] - result_list_json[idx]['bbox']['y_min'] | |
| ], | |
| result_list_json[idx]['confidence'], | |
| result_list_json[idx]['class'], | |
| ) for idx in range(len_results) | |
| ] | |
| tracks = tracker.update_tracks(bbs, frame=result.orig_img) | |
| for idx in range(len(result_list_json)): | |
| track_idx = next((i for i, track in enumerate(tracks) if track.det_conf is not None and np.isclose(track.det_conf, result_list_json[idx]['confidence'])), -1) | |
| if track_idx != -1: | |
| result_list_json[idx]['object_id'] = int(tracks[track_idx].track_id) | |
| return result_list_json | |
| def view_result_ultralytics(result: Results, result_list_json, centers=None): | |
| """ | |
| Visualize result from ultralytics YOLOv8 prediction using ultralytics YOLOv8 built-in visualization function | |
| Parameters: | |
| result: Results from ultralytics YOLOv8 prediction | |
| result_list_json: detection result in json format | |
| centers: list of deque of center points of bounding boxes | |
| Returns: | |
| result_image_ultralytics: result image from ultralytics YOLOv8 built-in visualization function | |
| """ | |
| result_image_ultralytics = result.plot() | |
| for result_json in result_list_json: | |
| class_color = COLORS[result_json['class_id'] % len(COLORS)] | |
| if 'object_id' in result_json and centers is not None: | |
| centers[result_json['object_id']].append((int((result_json['bbox']['x_min'] + result_json['bbox']['x_max']) / 2), int((result_json['bbox']['y_min'] + result_json['bbox']['y_max']) / 2))) | |
| for j in range(1, len(centers[result_json['object_id']])): | |
| if centers[result_json['object_id']][j - 1] is None or centers[result_json['object_id']][j] is None: | |
| continue | |
| thickness = int(np.sqrt(64 / float(j + 1)) * 2) | |
| cv2.line(result_image_ultralytics, centers[result_json['object_id']][j - 1], centers[result_json['object_id']][j], class_color, thickness) | |
| return result_image_ultralytics | |
| def view_result_default(result: Results, result_list_json, centers=None, image=None): | |
| """ | |
| Visualize result from ultralytics YOLOv8 prediction using default visualization function | |
| Parameters: | |
| result: Results from ultralytics YOLOv8 prediction | |
| result_list_json: detection result in json format | |
| centers: list of deque of center points of bounding boxes | |
| Returns: | |
| result_image_default: result image from default visualization function | |
| """ | |
| ALPHA = 0.5 | |
| # image = result.orig_img | |
| result_image_ultralytics = image.copy() if image is not None else result.orig_img.copy() | |
| for result in result_list_json: | |
| class_color = COLORS[result['class_id'] % len(COLORS)] | |
| fontScale = 1 | |
| if 'mask' in result: | |
| image_mask = np.stack([np.array(result['mask']) * class_color[0], np.array(result['mask']) * class_color[1], np.array(result['mask']) * class_color[2]], axis=-1).astype(np.uint8) | |
| image = cv2.addWeighted(image, 1, image_mask, ALPHA, 0) | |
| text = f"{result['class']} {result['object_id']}: {result['confidence']:.2f}" if 'object_id' in result else f"{result['class']}: {result['confidence']:.2f}" | |
| cv2.rectangle(image, (result['bbox']['x_min'], result['bbox']['y_min']), (result['bbox']['x_max'], result['bbox']['y_max']), class_color, 2) | |
| (text_width, text_height), baseline = cv2.getTextSize(text, cv2.FONT_HERSHEY_DUPLEX, 0.90, 5) | |
| cv2.rectangle(image, (result['bbox']['x_min'], result['bbox']['y_min'] - text_height - baseline), (result['bbox']['x_min'] + text_width, result['bbox']['y_min']), class_color, -1) | |
| cv2.putText(image, text , (result['bbox']['x_min'], result['bbox']['y_min'] - baseline), cv2.FONT_HERSHEY_DUPLEX, 0.90, (255, 255, 255), 1) | |
| if 'object_id' in result and centers is not None: | |
| centers[result['object_id']].append((int((result['bbox']['x_min'] + result['bbox']['x_max']) / 2), int((result['bbox']['y_min'] + result['bbox']['y_max']) / 2))) | |
| for j in range(1, len(centers[result['object_id']])): | |
| if centers[result['object_id']][j - 1] is None or centers[result['object_id']][j] is None: | |
| continue | |
| thickness = int(np.sqrt(64 / float(j + 1)) * 2) | |
| cv2.line(image, centers[result['object_id']][j - 1], centers[result['object_id']][j], class_color, thickness) | |
| return image | |
| def image_processing(frame, model, image_viewer=view_result_default, tracker=None, centers=None): | |
| """ | |
| Process image frame using ultralytics YOLOv8 model and possibly DeepSort tracker if it is provided | |
| Parameters: | |
| frame: image frame | |
| model: ultralytics YOLOv8 model | |
| image_viewer: function to visualize result, default is view_result_default, can be view_result_ultralytics | |
| tracker: DeepSort tracker | |
| centers: list of deque of center points of bounding boxes | |
| Returns: | |
| result_image: result image with bounding boxes, class names, confidence scores, object masks, and possibly object IDs | |
| result_list_json: detection result in json format | |
| """ | |
| # Preserve the original image for visualization | |
| original_image = frame.copy() | |
| gray_image = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
| denoised_image = cv2.GaussianBlur(gray_image, (1, 1), 0) | |
| # equalized_image = cv2.equalizeHist(denoised_image) | |
| clahe = cv2.createCLAHE(clipLimit=0.5, tileGridSize=(8, 8)) | |
| enhanced_image = clahe.apply(denoised_image) | |
| image = cv2.merge([enhanced_image, enhanced_image, enhanced_image]) | |
| processed_image = image | |
| st.image(processed_image, caption="Processed image", channels="BGR") | |
| results = model.predict(processed_image) | |
| result_list_json = result_to_json(results[0], tracker=tracker) | |
| result_image = image_viewer(results[0], result_list_json, centers=centers, image=original_image) | |
| return result_image, result_list_json | |
| def video_processing(video_file, model, image_viewer=view_result_default, tracker=None, centers=None): | |
| results = model.predict(video_file) | |
| model_name = model.ckpt_path.split('/')[-1].split('.')[0] | |
| output_folder = os.path.join('output_videos', os.path.splitext(os.path.basename(video_file))[0]) | |
| os.makedirs(output_folder, exist_ok=True) | |
| video_file_name_out = os.path.join(output_folder, f"{os.path.splitext(os.path.basename(video_file))[0]}_{model_name}_output.mp4") | |
| result_video_json_file = os.path.join(output_folder, f"{os.path.splitext(os.path.basename(video_file))[0]}_{model_name}_output.json") | |
| for file_path in [video_file_name_out, result_video_json_file]: | |
| if os.path.exists(file_path): | |
| os.remove(file_path) | |
| json_file = open(result_video_json_file, 'w') | |
| first_frame = results[0].orig_img | |
| height, width = first_frame.shape[:2] | |
| video_writer = cv2.VideoWriter(video_file_name_out, cv2.VideoWriter_fourcc(*'mp4v'), 30, (width, height)) | |
| result_list = [] | |
| frame_count = 0 | |
| for result in stqdm(results, desc="Processing video"): | |
| result_list_json = result_to_json(result, tracker=tracker) | |
| result_image = image_viewer(result, result_list_json, centers=centers) | |
| video_writer.write(result_image) | |
| result_list.append(result_list_json) | |
| frame_count += 1 | |
| json.dump(result_list, json_file, indent=2) | |
| json_file.close() | |
| video_writer.release() | |
| if frame_count == 0 or os.path.getsize(video_file_name_out) == 0: | |
| raise FileNotFoundError(f"The video file {video_file_name_out} was not created or is empty.") | |
| return video_file_name_out, result_video_json_file | |
| st.image("assets/nsidelogoo.png") | |
| st.sidebar.image("assets/nsidelogoo.png") | |
| def load_vino_model(model_path, device="CPU"): | |
| core = ov.Core() | |
| ov_model = core.read_model(model_path) | |
| ov_config = {} | |
| if device != "CPU": | |
| ov_model.reshape({0: [1, 3, 640, 640]}) | |
| if "GPU" in device or ("AUTO" in device and "GPU" in core.available_devices): | |
| ov_config = {"GPU_DISABLE_WINOGRAD_CONVOLUTION": "YES"} | |
| compiled_model = core.compile_model(ov_model, device, ov_config) | |
| return compiled_model | |
| def infer_with_vino_model(compiled_model, *args): | |
| result = compiled_model(args) | |
| return torch.from_numpy(result[0]) | |
| def load_model(model_path): | |
| # Load and return the YOLO model | |
| return YOLO(model_path) | |
| device = "CPU" | |
| model_vino_path = Path("yolov8xcdark_openvino_model/") | |
| model_openvino = load_model(model_vino_path) | |
| # model_openvino = load_vino_model(model_vino_path, device) | |
| model_path = "yolov8xcdark.pt" | |
| model = load_model(model_path) | |
| st.write("Optimized Openvino Yolov8c Models loaded successfully!") | |
| model_seg_path = "yolov8xcdark-seg.pt" | |
| model_seg = load_model(model_seg_path) | |
| source = ("Image Detection📸", "Video Detections📽️", "Live Camera Detection🤳🏻","RTSP","MOBILE CAM") | |
| source_index = st.sidebar.selectbox("Select Input type", range( | |
| len(source)), format_func=lambda x: source[x]) | |
| # Image detection section | |
| if source_index == 0: | |
| st.title("Image Processing using YOLOv8c Dark Detector") | |
| image_file = st.file_uploader("Upload an image 🔽", type=["jpg", "jpeg", "png"]) | |
| process_image_button = st.button("Detect") | |
| process_seg_button = st.button("Click here for Segmentation result") | |
| with st.spinner("Detecting with 💕"): | |
| if image_file is None and process_image_button: | |
| st.warning("Please upload an image file to be processed!") | |
| if image_file is not None and process_image_button: | |
| st.write(" ") | |
| st.sidebar.success("Successfully uploaded") | |
| st.sidebar.image(image_file, caption="Uploaded image") | |
| img = cv2.imdecode(np.frombuffer(image_file.read(), np.uint8), 1) | |
| img, result_list_json = image_processing(img, model_openvino) | |
| st.success("✅ Task Detect : Detection using custom-trained v8 model") | |
| st.image(img, caption="Detected image", channels="BGR") | |
| detected_classes = [item['class'] for item in result_list_json] | |
| class_fq = Counter(detected_classes) | |
| df_fq = pd.DataFrame(class_fq.items(), columns=['Class', 'Number']) | |
| st.write("Class Frequency:") | |
| st.dataframe(df_fq) | |
| if image_file is not None and process_seg_button: | |
| st.write(" ") | |
| st.sidebar.success("Successfully uploaded") | |
| st.sidebar.image(image_file,caption="Uploaded image") | |
| img = cv2.imdecode(np.frombuffer(image_file.read(), np.uint8), 1) | |
| ## for detection with bb & segmentation masks | |
| img, result_list_json = image_processing(img, model_seg) | |
| st.success("✅ Task Segment: Segmentation using v8 model") | |
| st.image(img, caption="Segmented image", channels="BGR") | |
| detected_classes = [item['class'] for item in result_list_json] | |
| class_fq = Counter(detected_classes) | |
| df_fq = pd.DataFrame(class_fq.items(), columns=['Class', 'Number']) | |
| st.write("Class Frequency:") | |
| st.dataframe(df_fq) | |
| # Video & Live cam section | |
| if source_index == 1: | |
| st.header("Video Detections using YOLOv8c Dark Detector") | |
| video_file = st.file_uploader("Upload a video", type=["mp4"]) | |
| process_video_button = st.button("Process Video") | |
| if video_file is None and process_video_button: | |
| st.warning("Please upload a video file to be processed!") | |
| if video_file is not None and process_video_button: | |
| with st.spinner(text='Detecting with 💕...'): | |
| tracker = DeepSort(max_age=5) | |
| centers = [deque(maxlen=30) for _ in range(10000)] | |
| with open(video_file.name, "wb") as f: | |
| f.write(video_file.read()) | |
| video_file_out, result_video_json_file = video_processing(video_file.name, model, tracker=tracker, centers=centers) | |
| os.remove(video_file.name) | |
| st.write("Processing video...") | |
| st.video(video_file_out) | |
| with open(result_video_json_file, "r") as f: | |
| result_json = json.load(f) | |
| st.json(result_json) | |
| if source_index == 2: | |
| st.header("Live Stream Processing using YOLOv8c Dark Detector") | |
| tab_webcam = st.tabs(["Webcam Detections"]) | |
| p_time = 0 | |
| st.sidebar.title('Settings') | |
| # Choose the model | |
| model_type = "YOLOv8" | |
| sample_img = cv2.imread('assets/detective.png') | |
| FRAME_WINDOW = st.image(sample_img, channels='BGR') | |
| cap = None | |
| if not model_type == 'YOLO Model': | |
| if model_type == 'YOLOv8': | |
| # GPU | |
| gpu_option = st.sidebar.radio( | |
| 'Choose between:', ('CPU', 'GPU')) | |
| # Model | |
| if gpu_option == 'CPU': | |
| model = model | |
| if gpu_option == 'GPU': | |
| model = model | |
| # Load Class names | |
| class_labels = model.names | |
| # Confidence | |
| confidence = st.sidebar.slider( | |
| 'Detection Confidence', min_value=0.0, max_value=1.0, value=0.25) | |
| # Draw thickness | |
| draw_thick = st.sidebar.slider( | |
| 'Draw Thickness:', min_value=1, | |
| max_value=20, value=3 | |
| ) | |
| # Inference Mode | |
| # Web-cam | |
| cam_options = st.selectbox('Webcam Channel', | |
| ('Select Channel', '0', '1', '2', '3')) | |
| if not cam_options == 'Select Channel': | |
| pred = st.checkbox(f'Predict Using {model_type}') | |
| cap = cv2.VideoCapture(int(cam_options)) | |
| if (cap != None) and pred: | |
| stframe1 = st.empty() | |
| stframe2 = st.empty() | |
| stframe3 = st.empty() | |
| tracker = DeepSort(max_age=5) | |
| centers = [deque(maxlen=30) for _ in range(10000)] | |
| while True: | |
| success, img = cap.read() | |
| if not success: | |
| st.error( | |
| f" NOT working\nCheck {cam_options} properly!!", | |
| icon="🚨" | |
| ) | |
| break | |
| # # Call get_yolo to get detections | |
| # img, current_no_class = get_yolo(img, model_type, model, confidence, class_labels, draw_thick) | |
| # Call DeepSort for tracking | |
| img, result_list_json = image_processing(img, model, image_viewer=view_result_default, tracker=tracker, centers=centers) | |
| # # Call get_frame_output to overlay distance information | |
| processed_frame = get_live_frame_output(img, result_list_json) | |
| # Display the processed frame | |
| FRAME_WINDOW.image(processed_frame, channels='BGR') | |
| st.cache_data.clear() | |
| # FPS | |
| c_time = time.time() | |
| fps = 1 / (c_time - p_time) | |
| p_time = c_time | |
| # Current number of classes | |
| # Current number of classes | |
| detected_classes = [item['class'] for item in result_list_json] | |
| class_fq = Counter(detected_classes) | |
| df_fq = pd.DataFrame(class_fq.items(), columns=['Class', 'Number']) | |
| # Updating Inference results | |
| get_system_stat(stframe1, stframe2, stframe3, fps, df_fq) | |
| if source_index == 3: | |
| st.header("Live Stream Processing using YOLOv8c Dark Detector") | |
| tab_rtsp = st.tabs(["RTSP Detections"]) | |
| p_time = 0 | |
| st.sidebar.title('Settings') | |
| # Choose the model | |
| model_type = "YOLOv8" | |
| sample_img = cv2.imread('detective.png') | |
| FRAME_WINDOW = st.image(sample_img, channels='BGR') | |
| cap = None | |
| if not model_type == 'YOLO Model': | |
| if model_type == 'YOLOv8': | |
| # GPU | |
| gpu_option = st.sidebar.radio( | |
| 'Choose between:', ('CPU', 'GPU')) | |
| # Model | |
| if gpu_option == 'CPU': | |
| model = model | |
| # model = custom(path_or_model=path_model_file) | |
| if gpu_option == 'GPU': | |
| model = model | |
| # model = custom(path_or_model=path_model_file, gpu=True) | |
| # Load Class names | |
| class_labels = model.names | |
| # Confidence | |
| confidence = st.sidebar.slider( | |
| 'Detection Confidence', min_value=0.0, max_value=1.0, value=0.25) | |
| # Draw thickness | |
| draw_thick = st.sidebar.slider( | |
| 'Draw Thickness:', min_value=1, | |
| max_value=20, value=3 | |
| ) | |
| rtsp_url = st.text_input( | |
| 'RTSP URL:', | |
| 'eg: rtsp://admin:name6666@198.162.1.58/cam/realmonitor?channel=0&subtype=0' | |
| ) | |
| pred1 = st.checkbox(f'Predict Using rtsp {model_type}') | |
| cap = cv2.VideoCapture(rtsp_url) | |
| if (cap != None) and pred1: | |
| stframe1 = st.empty() | |
| stframe2 = st.empty() | |
| stframe3 = st.empty() | |
| tracker = DeepSort(max_age=5) | |
| centers = [deque(maxlen=30) for _ in range(10000)] | |
| while True: | |
| success, img = cap.read() | |
| if not success: | |
| st.error( | |
| f" NOT working\nCheck {cam_options} properly!!", | |
| icon="🚨" | |
| ) | |
| break | |
| # Call DeepSort for tracking | |
| img, result_list_json = image_processing(img, model, image_viewer=view_result_default, tracker=tracker, centers=centers) | |
| # # Call get_frame_output to overlay distance information | |
| processed_frame = get_live_frame_output(img, result_list_json) | |
| # Display the processed frame | |
| FRAME_WINDOW.image(processed_frame, channels='BGR') | |
| # FPS | |
| c_time = time.time() | |
| fps = 1 / (c_time - p_time) | |
| p_time = c_time | |
| # Current number of classes | |
| detected_classes = [item['class'] for item in result_list_json] | |
| class_fq = Counter(detected_classes) | |
| df_fq = pd.DataFrame(class_fq.items(), columns=['Class', 'Number']) | |
| # Updating Inference results | |
| get_system_stat(stframe1, stframe2, stframe3, fps, df_fq) | |
| class VideoTransformer(VideoTransformerBase): | |
| def __init__(self) -> None: | |
| super().__init__() | |
| self.frame_count = 0 | |
| self.tracker = DeepSort(max_age=5) # Initialize the DeepSort tracker | |
| self.centers = [deque(maxlen=30) for _ in range(10000)] # Initialize centers deque | |
| def transform(self, frame): | |
| img = frame.to_ndarray(format="bgr24") | |
| # Process the frame using image_processing | |
| img, result_list_json = image_processing(img, model, image_viewer=view_result_default, tracker=self.tracker, centers=self.centers) | |
| # Call get_frame_output to overlay distance information | |
| processed_frame = get_live_frame_output(img, result_list_json) | |
| return processed_frame | |
| def recv(self, frame: av.VideoFrame) -> av.VideoFrame: | |
| new_image = self.transform(frame) | |
| return av.VideoFrame.from_ndarray(new_image, format="bgr24") | |
| # Streamlit application | |
| if source_index == 4: | |
| st.header("Live Stream Processing using YOLOv8c Dark Detector") | |
| webcam_st = st.tabs(["St webcam"]) | |
| p_time = 0 | |
| RTC_CONFIGURATION = RTCConfiguration({"iceServers": [{"urls": ["stun:stun.l.google.com:19302"]}]}) | |
| count = st_autorefresh(interval=4500, limit=1000000, key="fizzbuzzcounter") | |
| try: | |
| webrtc_streamer( | |
| key="test", | |
| media_stream_constraints={"video": True, "audio": False}, | |
| video_processor_factory=VideoTransformer | |
| ) | |
| except Exception as e: | |
| st.error(f"Error initializing WebRTC: {e}") | |
| st.cache_data.clear() | |