| from PIL import Image |
| from ultralytics import YOLO |
| import numpy as np |
| import cv2 |
| import torch |
|
|
|
|
|
|
| from utils import readb64, img2base64 |
|
|
| model_int8 = YOLO('weights/best.torchscript', task='detect') |
|
|
| labels = { |
| 0: 'mask_weared_incorrect', |
| 1: 'with_mask', |
| 2: 'without_mask', |
| } |
|
|
|
|
| def inference_on_image(path): |
| results = model_int8(path) |
|
|
| img = cv2.imread(path, cv2.COLOR_BGR2RGB) |
| for box in results[0].boxes: |
| img = draw_bbox_prediction(img, box) |
| |
| cv2.imshow('Detected Image', img) |
| cv2.waitKey(0) |
|
|
| return results |
|
|
| def inference_on_video(path, vid_stride=10): |
| results = model_int8(path, vid_stride=10, stream=True) |
|
|
| cap = cv2.VideoCapture(path) |
| ret, img = cap.read() |
|
|
| frame_counter = 0 |
| while True: |
| ret, img = cap.read() |
| if ret: |
| if frame_counter % 10 == 0: |
| result = next(results) |
| for box in result.boxes: |
| img = draw_bbox_prediction(img, box) |
| else: |
| cap.release() |
| break |
| |
| cv2.imshow('Detected Image', img) |
| frame_counter += 1 |
|
|
| k = cv2.waitKey(5) & 0xFF |
| if k == 27: |
| cap.release() |
| cv2.destroyAllWindows() |
| break |
|
|
| return results |
|
|
| def draw_bbox_prediction(img, box): |
| cls = box.cls.item() |
| confidence = box.conf.item() |
| label = labels[cls] |
|
|
| x1, y1, x2, y2 = map(int, list(box.xyxy.numpy()[0])) |
| scaler = (x2-x1)/(640/8) |
| cv2.rectangle(img, (x1, y1), (x2, y2), (0, 102, 255), int(2*scaler)) |
| img = cv2.rectangle(img, (x1, y1 - int(20*scaler)), (x1 + (x2 - x1)*3, y1), (0, 102, 255), -1) |
| img = cv2.putText(img, "{}: {:.3f}".format(label, confidence), (x1,y1-5),cv2.FONT_HERSHEY_SIMPLEX,0.6*scaler,(255,255,255), int(1*scaler)) |
| return img |
|
|
|
|
| class ImagePipeline: |
| def __init__(self, device='cpu', gpu_id=0, weights='weights/best.torchscript'): |
| self.model = YOLO(weights, task='detect') |
|
|
| def preprocess(self, data): |
| image_base64 = data.pop("images", data) |
|
|
| if not type(image_base64) == list: |
| image_base64 = [image_base64] |
| elif len(image_base64) > 1: |
| raise Exception("ImagePipeline only accepts 1 image/frame") |
| |
| images = [readb64(image) for image in image_base64] |
| return images |
|
|
| def inference(self, images): |
| results = self.model(images[0]) |
| return results |
|
|
| def get_response(self, inference_result): |
| response = [] |
|
|
| if not bool(set([0, 2]).intersection(inference_result[0].boxes.cls.numpy())): |
| |
| message = "Everyone is wearing mask correctly" |
| else: |
| message = "Someone is not wearing mask or incorrectly wearing mask" |
|
|
| for i, result in enumerate(inference_result): |
| for xywhn, cls, conf in zip( |
| result.boxes.xywhn, |
| result.boxes.cls, |
| result.boxes.conf |
| ): |
| xywhn = list(xywhn.numpy()) |
| response.append({ |
| 'xywhn': { |
| 'x': float(xywhn[0]), |
| 'y': float(xywhn[1]), |
| 'w': float(xywhn[2]), |
| 'h': float(xywhn[3]), |
| }, |
| 'class': cls.item(), |
| 'confidence': conf.item(), |
| }) |
|
|
| return {'results': response, |
| 'message': message} |
|
|
| def draw_bbox(self, images, inference_result): |
| img = np.array(images[0]) |
| boxes = list(inference_result[0].boxes) |
| boxes.reverse() |
|
|
|
|
| for box in boxes: |
| img = draw_bbox_prediction(img, box) |
| |
| return img |
| |
| def __call__(self, data, config_payload=None, draw_bbox=False): |
| images = self.preprocess(data) |
| inference_result = self.inference(images) |
| response = self.get_response(inference_result) |
| if draw_bbox: |
| annotated_img = self.draw_bbox(images, inference_result) |
| return response, annotated_img |
| return response |
|
|
| class VideoPipeline: |
| def __init__(self, device='cpu', gpu_id=0, weights='weights/best.torchscript'): |
| self.model = YOLO(weights, task='detect') |
|
|
| def preprocess(self, data): |
| return data |
|
|
| def inference(self, video_path, vid_stride=30): |
| results = self.model(video_path, vid_stride=vid_stride) |
| return results |
|
|
| def get_response(self, inference_result): |
| response = [] |
|
|
| |
| |
| message = "Everyone is wearing mask correctly" |
|
|
| for i, result in enumerate(inference_result): |
| |
| if set([0, 2]).issubset(inference_result[0].boxes.cls.numpy()): |
| message = "Someone is not wearing mask or incorrectly wearing mask" |
|
|
| for xywhn, cls, conf in zip( |
| result.boxes.xywhn, |
| result.boxes.cls, |
| result.boxes.conf |
| ): |
| xywhn = list(xywhn.numpy()) |
| response.append({ |
| 'xywhn': { |
| 'x': float(xywhn[0]), |
| 'y': float(xywhn[1]), |
| 'w': float(xywhn[2]), |
| 'h': float(xywhn[3]), |
| }, |
| 'class': cls.item(), |
| 'confidence': conf.item(), |
| }) |
|
|
| return {'results': response, |
| 'message': message} |
| |
| def __call__(self, data, config_payload=None): |
| data = self.preprocess(data) |
| inference_result = self.inference(data) |
| response = self.get_response(inference_result) |
| return response |
|
|
|
|
| if __name__ == '__main__': |
| import cv2 |
| import argparse |
| |
| parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) |
| parser.add_argument('--input_type', |
| default='image', |
| const='image', |
| nargs='?', |
| choices=['image', 'video'], |
| help='type of input (default: %(default)s)') |
| parser.add_argument("-p", "--path", help="filepath") |
| args = parser.parse_args() |
|
|
| if args.input_type=='image': |
| results = inference_on_image(args.path) |
| elif args.input_type == 'video': |
| results = inference_on_video(args.path) |
| |
| |
| print(results) |
|
|
| |
| |
| |
| |