Spaces:
Runtime error
Runtime error
| import cv2 | |
| import numpy as np | |
| import os | |
| import time | |
| import ffmpeg | |
| class CaesarYolo: | |
| def __init__(self) -> None: | |
| self.CONFIDENCE = 0.5 | |
| self.SCORE_THRESHOLD = 0.5 | |
| self.IOU_THRESHOLD = 0.5 | |
| self.current_dir = os.path.realpath(__file__).replace(f"/CaesarYolo.py","") | |
| config_path = f"{self.current_dir}/cfg/yolov3.cfg" | |
| weights_path = f"{self.current_dir}/weights/yolov3.weights" | |
| self.font_scale = 1 | |
| self.thickness = 1 | |
| self.LABELS = open(f"{self.current_dir}/data/coco.names").read().strip().split("\n") | |
| self.COLORS = np.random.randint(0, 255, size=(len(self.LABELS), 3), dtype="uint8") | |
| self.net = cv2.dnn.readNetFromDarknet(config_path, weights_path) | |
| self.ln = self.net.getLayerNames() | |
| try: | |
| self.ln = [self.ln[i[0] - 1] for i in self.net.getUnconnectedOutLayers()] | |
| except IndexError: | |
| # in case getUnconnectedOutLayers() returns 1D array when CUDA isn't available | |
| self.ln = [self.ln[i - 1] for i in self.net.getUnconnectedOutLayers()] | |
| def compress_video(video_full_path, output_file_name, target_size): | |
| # Reference: https://en.wikipedia.org/wiki/Bit_rate#Encoding_bit_rate | |
| min_audio_bitrate = 32000 | |
| max_audio_bitrate = 256000 | |
| probe = ffmpeg.probe(video_full_path) | |
| # Video duration, in s. | |
| duration = float(probe['format']['duration']) | |
| # Audio bitrate, in bps. | |
| audio_bitrate = float(next((s for s in probe['streams'] if s['codec_type'] == 'audio'), None)['bit_rate']) | |
| # Target total bitrate, in bps. | |
| target_total_bitrate = (target_size * 1024 * 8) / (1.073741824 * duration) | |
| # Target audio bitrate, in bps | |
| if 10 * audio_bitrate > target_total_bitrate: | |
| audio_bitrate = target_total_bitrate / 10 | |
| if audio_bitrate < min_audio_bitrate < target_total_bitrate: | |
| audio_bitrate = min_audio_bitrate | |
| elif audio_bitrate > max_audio_bitrate: | |
| audio_bitrate = max_audio_bitrate | |
| # Target video bitrate, in bps. | |
| video_bitrate = target_total_bitrate - audio_bitrate | |
| i = ffmpeg.input(video_full_path) | |
| ffmpeg.output(i, os.devnull, | |
| **{'c:v': 'libx264', 'b:v': video_bitrate, 'pass': 1, 'f': 'mp4'} | |
| ).overwrite_output().run() | |
| ffmpeg.output(i, output_file_name, | |
| **{'c:v': 'libx264', 'b:v': video_bitrate, 'pass': 2, 'c:a': 'aac', 'b:a': audio_bitrate} | |
| ).overwrite_output().run() | |
| def video_load(self,videofile): | |
| self.video_file = f"{ self.current_dir}/{videofile}" | |
| if self.video_file: | |
| self.cap = cv2.VideoCapture(self.video_file) | |
| _, image = self.cap.read() | |
| h, w = image.shape[:2] | |
| fourcc = cv2.VideoWriter_fourcc(*"XVID") | |
| frames = self.cap.get(cv2.CAP_PROP_FRAME_COUNT) | |
| fps = self.cap.get(cv2.CAP_PROP_FPS) | |
| # calculate duration of the video | |
| self.duration_seconds = round(frames / fps) | |
| self.out = cv2.VideoWriter(f"{self.current_dir}/output.avi", fourcc, 20.0, (w, h)) | |
| self.overall_time_taken = [] | |
| def caesar_object_detect(self,image,verbose=False): | |
| if self.video_file and image is "video": | |
| _,image = self.cap.read() | |
| try: | |
| h, w = image.shape[:2] | |
| except AttributeError as aex: | |
| return None,None,None | |
| blob = cv2.dnn.blobFromImage(image, 1/255.0, (416, 416), swapRB=True, crop=False) | |
| self.net.setInput(blob) | |
| start = time.perf_counter() | |
| layer_outputs = self.net.forward(self.ln) | |
| time_took = time.perf_counter() - start | |
| if verbose == True: | |
| print("Time took:", time_took) | |
| if self.video_file: | |
| self.overall_time_taken.append(time_took) | |
| time_elapsed = round(sum(self.overall_time_taken),3) | |
| approx_finish = self.duration_seconds *4.6 # seconds | |
| boxes, confidences, class_ids = [], [], [] | |
| # loop over each of the layer outputs | |
| for output in layer_outputs: | |
| # loop over each of the object detections | |
| for detection in output: | |
| # extract the class id (label) and confidence (as a probability) of | |
| # the current object detection | |
| scores = detection[5:] | |
| class_id = np.argmax(scores) | |
| confidence = scores[class_id] | |
| # discard weak predictions by ensuring the detected | |
| # probability is greater than the minimum probability | |
| if confidence > self.CONFIDENCE: | |
| # scale the bounding box coordinates back relative to the | |
| # size of the image, keeping in mind that YOLO actually | |
| # returns the center (x, y)-coordinates of the bounding | |
| # box followed by the boxes' width and height | |
| box = detection[:4] * np.array([w, h, w, h]) | |
| (centerX, centerY, width, height) = box.astype("int") | |
| # use the center (x, y)-coordinates to derive the top and | |
| # and left corner of the bounding box | |
| x = int(centerX - (width / 2)) | |
| y = int(centerY - (height / 2)) | |
| # update our list of bounding box coordinates, confidences, | |
| # and class IDs | |
| boxes.append([x, y, int(width), int(height)]) | |
| confidences.append(float(confidence)) | |
| class_ids.append(class_id) | |
| # perform the non maximum suppression given the scores defined before | |
| idxs = cv2.dnn.NMSBoxes(boxes, confidences, self.SCORE_THRESHOLD, self.IOU_THRESHOLD) | |
| self.font_scale = 1 | |
| self.thickness = 1 | |
| # ensure at least one detection exists | |
| if len(idxs) > 0: | |
| # loop over the indexes we are keeping | |
| for i in idxs.flatten(): | |
| # extract the bounding box coordinates | |
| x, y = boxes[i][0], boxes[i][1] | |
| w, h = boxes[i][2], boxes[i][3] | |
| # draw a bounding box rectangle and label on the image | |
| color = [int(c) for c in self.COLORS[class_ids[i]]] | |
| cv2.rectangle(image, (x, y), (x + w, y + h), color=color, thickness=self.thickness) | |
| text = f"{self.LABELS[class_ids[i]]}: {confidences[i]:.2f}" | |
| # calculate text width & height to draw the transparent boxes as background of the text | |
| (text_width, text_height) = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, fontScale=self.font_scale, thickness=self.thickness)[0] | |
| text_offset_x = x | |
| text_offset_y = y - 5 | |
| box_coords = ((text_offset_x, text_offset_y), (text_offset_x + text_width + 2, text_offset_y - text_height)) | |
| overlay = image.copy() | |
| cv2.rectangle(overlay, box_coords[0], box_coords[1], color=color, thickness=cv2.FILLED) | |
| # add opacity (transparency to the box) | |
| image = cv2.addWeighted(overlay, 0.6, image, 0.4, 0) | |
| # now put the text (label: confidence %) | |
| cv2.putText(image, text, (x, y - 5), cv2.FONT_HERSHEY_SIMPLEX, | |
| fontScale=self.font_scale, color=(0, 0, 0), thickness=self.thickness) | |
| if self.video_file: | |
| self.out.write(image) | |
| return image,time_elapsed,approx_finish | |
| elif not self.video_file: | |
| return image,0,0 | |
| if __name__ == "__main__": | |
| def test(): | |
| caesaryolo = CaesarYolo() | |
| caesaryolo.video_load("car-detection.mp4") | |
| while True: | |
| image,time_elapsed,end_time = caesaryolo.caesar_object_detect("video") | |
| if image is not None: | |
| print(round(time_elapsed,3),"out of",end_time) | |
| cv2.imshow("image", image) | |
| if ord("q") == cv2.waitKey(1): | |
| break | |
| else: | |
| break | |
| caesaryolo.cap.release() | |
| cv2.destroyAllWindows() | |
| def convert_avi_to_mp4(avi_file_path, output_name): | |
| os.system(f"ffmpeg -y -i {avi_file_path} {output_name}") | |
| return True | |
| CURRENT_DIR = os.path.realpath(__file__).replace(f"/CaesarYolo.py","") | |
| #convert_avi_to_mp4(,) | |
| import subprocess | |
| #process = subprocess.Popen(ffmpeg_command, stdout = subprocess.PIPE, stderr = subprocess.STDOUT, bufsize = -1) | |