Spaces:
Runtime error
Runtime error
| """ | |
| **Aim:** This is the final code of video blur along with UI | |
| **Author:** Shalu Singh | |
| **Starting Date:** 12/9/23 | |
| **Ending Date:** 14/1/24 | |
| """ | |
| # import libraries | |
| import tensorflow as tf | |
| import tensorflow_hub as hub | |
| import numpy as np | |
| from PIL import Image | |
| import cv2 | |
| import os | |
| import pandas as pd | |
| import keras | |
| import gradio | |
| from concurrent.futures import ThreadPoolExecutor | |
| from moviepy.editor import VideoFileClip, concatenate_videoclips | |
| # path to ouput video | |
| out_video_path = 'blured_op_video.mp4' | |
| # class label | |
| coco_classes = { | |
| 0: 'unlabeled', | |
| 1: 'person', | |
| 2: 'bicycle', | |
| 3: 'car', | |
| 4: 'motorcycle', | |
| 5: "airplane", | |
| 6: "bus", | |
| 7: "train", | |
| 8: "truck", | |
| 9: "boat", | |
| 10:" traffic light", | |
| 11: "fire hydrant", | |
| 12: "street sign", | |
| 13: "stop |sign", | |
| 14: "parking meter", | |
| 15: "bench", | |
| 16: "bird", | |
| 17: "cat", | |
| 18: "dog", | |
| 19: "horse", | |
| 20: "sheep", | |
| 21: "cow", | |
| 22: "elephant", | |
| 23:" bear", | |
| 24: "zebra", | |
| 25: "giraffe", | |
| 26: "hat", | |
| 27: "backpack", | |
| 28: "umbrella", | |
| 29: "shoe", | |
| 30: "eye glasses", | |
| 31: "handbag", | |
| 32:" tie", | |
| 33: "suitcase", | |
| 34:" frisbee", | |
| 35: "skis", | |
| 36: "snowboard", | |
| 37: "sports ball", | |
| 38: "kite", | |
| 39: "baseball bat", | |
| 40: "baseball glove", | |
| 41: "skateboard", | |
| 42: "surfboard", | |
| 43: "tennis racket", | |
| 44: "bottle", | |
| 45: "plate", | |
| 46: "wine glass", | |
| 47: "cup", | |
| 48: "fork", | |
| 49: "knife", | |
| 50: "spoon", | |
| 51: "bowl", | |
| 52: "banana", | |
| 53:"apple", | |
| 54:"sandwich", | |
| 55:" orange", | |
| 56: "broccoli", | |
| 57: "carrot", | |
| 58: "hot dog", | |
| 59:' pizza', | |
| 60: "donut", | |
| 61: 'cake', | |
| 62: "chair", | |
| 63: "couch", | |
| 64: "potted plant", | |
| 65: "bed", | |
| 66: "mirror", | |
| 67: "dining table", | |
| 68: "window", | |
| 69: "desk", | |
| 70: "toilet", | |
| 71: "door", | |
| 72: "tv", | |
| 73:" laptop", | |
| 74: "mouse", | |
| 75: "remote", | |
| 76:" keyboard", | |
| 77: "cell phone", | |
| 78: "microwave", | |
| 79: "oven", | |
| 80: "toaster", | |
| 81: "sink", | |
| 82: "refrigerator", | |
| 83: "blender", | |
| 84: "book", | |
| 85:"clock", | |
| 86: "vase", | |
| 87: "scissors", | |
| 88: "teddy bear", | |
| 89: "hair drier", | |
| 90: "toothbrush", | |
| } | |
| coco_encode = {value:key for key,value in coco_classes.items()} | |
| coco_labels = list(coco_classes.values()) | |
| # function: blur the image | |
| def blur_image(image = None,coordinates = None,blur_value = 3): | |
| #print('*********** INSIDE [blur_image()] *********]') | |
| img = image.copy() # copy the image to work on new image | |
| if (coordinates is not None): | |
| #print('Performing image blur operation...') | |
| for coord in (coordinates): | |
| ymin,xmin,ymax,xmax = coord | |
| #print('Image shape:',img.shape) | |
| # Extract region of intrest | |
| Y_min,X_min,Y_max,X_max = int(ymin*img.shape[0]),int(xmin*img.shape[1]),int(ymax*img.shape[0]),int(xmax*img.shape[1]) | |
| #print('Y_min,Y_max',Y_min,Y_max) | |
| #print('X_min,X_max',X_min,X_max) | |
| roi = img[Y_min:Y_max,X_min:X_max] | |
| #show_img(roi,'Original_roi') | |
| # blur the extracted img using Gausian blur | |
| try: | |
| roi = cv2.GaussianBlur(roi,ksize = (blur_value,blur_value),sigmaX = 0) | |
| #show_img(roi,title='blured roi') | |
| # replace the original roi with blured_roi | |
| img[Y_min:Y_max, X_min:X_max] = roi | |
| except: | |
| pass | |
| return img | |
| # function: filter detection boxs | |
| def filter_detection(detector_output,select_classes,thr = 0.6): | |
| # print('********* INSIDE [filter_detection()] **********') | |
| detection_boxs = detector_output['detection_boxes'] | |
| detection_class = detector_output['detection_classes'] | |
| detection_scores = detector_output['detection_scores'] | |
| # get the masking to select classes which user choosed | |
| masked_classes = np.isin(detection_class,select_classes) | |
| # select only selected classes | |
| detection_class = detection_class[masked_classes] | |
| detection_boxs = detection_boxs[masked_classes] | |
| detection_scores = detection_scores[masked_classes] | |
| # filter the detection boxses based on threshold | |
| selected_scores = detection_scores[detection_scores >= thr] | |
| selected_class = detection_class[detection_scores >= thr] | |
| selected_boxs = detection_boxs[detection_scores >= thr].numpy() | |
| return selected_boxs,selected_class,selected_scores | |
| # get the input video | |
| # load video from local disk | |
| def load_input(ip_path): | |
| #print('******* INSIDE [load_input] ********') | |
| try: | |
| cap = cv2.VideoCapture(ip_path) | |
| print('Video loaded successfully!') | |
| return cap | |
| except: | |
| print("Failed! to load video") | |
| #function: get video property like frame_width,frame_heigh,frame_per_second(fps),codecc | |
| def out_video(cap): | |
| #print('******** INSIDE [out_video] ***********') | |
| frame_width = int(cap.get(3)) # width of the fames in the video | |
| frame_height = int(cap.get(4)) # height of the frame in the video | |
| fps = int(cap.get(5)) # frame per second | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| video_duration = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))/ fps | |
| codecc = cv2.VideoWriter_fourcc(*'mp4V') # codecc for output video ( h264 codecc) | |
| # video property info | |
| print('Frame Width:',frame_width) | |
| print('Frame height:',frame_height) | |
| print('Frame Per Second:',fps) | |
| print('Total frames:',total_frames) | |
| print('video_duration: {} minutes'.format(round(video_duration/60),2)) | |
| # VideoWriter object to save blured video | |
| out = cv2.VideoWriter(out_video_path,codecc,fps,(frame_width,frame_height)) | |
| return out,fps,total_frames,video_duration | |
| # function: to get time range to perfrom blur | |
| def time_range(start_time,end_time): | |
| #print('*********** INSIDE [time_range()] ************') | |
| start_time,end_time = start_time,end_time # change to second(s) format | |
| return start_time,end_time | |
| # function: to check if time range is valid or not | |
| def is_valid_time_range(start_time,end_time,video_duration): | |
| #print('********** INSIDE [valid_time_range()] *************') | |
| return (0 <= start_time < end_time <= video_duration) | |
| # load model | |
| object_detection_model = hub.load("https://www.kaggle.com/models/tensorflow/efficientdet/frameworks/TensorFlow2/variations/d2/versions/1") | |
| def blur_video(input_video_path, u_classes, start_time, end_time): | |
| print('STARTING OF PROCESSING...') | |
| print("u_classes:",u_classes,type(u_classes)) | |
| label_encode = np.array([coco_encode[i] for i in u_classes], dtype='float16') | |
| print('label_encode:',label_encode,type(label_encode)) | |
| cap = load_input(ip_path=input_video_path) | |
| out, fps, total_frames, video_duration = out_video(cap) | |
| start_time, end_time = time_range(start_time, end_time) | |
| if is_valid_time_range(start_time, end_time, video_duration): | |
| start_frame = int(start_time * fps) | |
| end_frame = int(end_time * fps) | |
| print('Start Frame:', start_frame) | |
| print('End Frame:', end_frame) | |
| with ThreadPoolExecutor(max_workers=4) as executor: # Adjust max_workers as needed | |
| futures = [] | |
| for i in range(total_frames): | |
| ret, frame = cap.read() | |
| if ret: | |
| frame = tf.expand_dims(frame, axis=0) | |
| else: | |
| break | |
| if start_frame <= i <= end_frame: | |
| print('Blured_frame:',i) | |
| future = executor.submit(blur_process, frame, label_encode) | |
| futures.append(future) | |
| else: | |
| out.write(frame[0].numpy()) | |
| for future in futures: | |
| blured_img = future.result() | |
| out.write(blured_img) | |
| cap.release() | |
| out.release() | |
| return out_video_path | |
| def blur_process(frame,l_encoder,blur_value): | |
| print('label_encode',l_encoder) | |
| frame = np.expand_dims(frame,axis = 0) | |
| detector_output = object_detection_model(frame) | |
| boxes,classes,scores = filter_detection(detector_output,l_encoder) | |
| blured_img = blur_image(frame[0],boxes,blur_value) | |
| return blured_img | |
| def process_and_concat_video(input_video_path,u_classes,blur_value,start_time, end_time): | |
| label_encode = np.array([coco_encode[i] for i in u_classes],dtype = 'float16') | |
| # Load the full video clip | |
| full_video_clip = VideoFileClip(input_video_path) | |
| # Process the specified part of the video | |
| processed_clip = full_video_clip.subclip(start_time, end_time).set_duration(end_time - start_time) | |
| processed_clip = processed_clip.fl_image(lambda frame: blur_process(frame,label_encode,blur_value)) | |
| print('final clip fps:',full_video_clip.fps) | |
| print('processed_clip fps:',processed_clip.fps) | |
| # Concatenate the processed and unprocessed parts | |
| final_clip = concatenate_videoclips([full_video_clip.subclip(0, start_time), | |
| processed_clip, | |
| full_video_clip.subclip(end_time, None)]) | |
| final_clip.set_fps = 25 # Assuming desired FPS is 25 | |
| # Write the final video to an output file with the specified fps | |
| out_video_path = "output_video.mp4" | |
| final_clip.write_videofile(out_video_path, codec="h264", audio_codec="aac",fps = 25) | |
| return out_video_path | |
| if __name__ == "__main__": | |
| import gradio as gr | |
| iface = gr.Interface( | |
| fn=process_and_concat_video, | |
| inputs=[ | |
| gr.Video(label="Upload Video"), | |
| gr.CheckboxGroup(choices=coco_labels[1:], label="Select Classes"), | |
| gr.Slider(label = "blur intensity",minimum = 3,maximum = 90, step = 3), | |
| gr.Number(label="Start Time (seconds)"), | |
| gr.Number(label="End Time (seconds)"), | |
| ], | |
| outputs= "video", | |
| title = 'BlurVista 👓' | |
| ) | |
| iface.launch(debug = True,inline = False) | |