Spaces:
Paused
Paused
| import cv2 | |
| import os | |
| import numpy as np | |
| import yaml | |
| from paddle.inference import Config, create_predictor, PrecisionType | |
| from PIL import Image | |
| from .download import get_model_path | |
| from .preprocess import preprocess, Resize, NormalizeImage, Permute, PadStride, decode_image | |
| from .visualize import draw_det | |
| class Detector(object): | |
| def __init__(self, model_name): | |
| parent_path = os.path.abspath(os.path.join(__file__, *(['..'] * 2))) | |
| yml_file = os.path.join(parent_path, 'configs/{}.yml'.format(model_name)) | |
| with open(yml_file, 'r') as f: | |
| yml_conf = yaml.safe_load(f) | |
| infer_model = get_model_path(yml_conf['model_path']) | |
| infer_params = get_model_path(yml_conf['param_path']) | |
| config = Config(infer_model, infer_params) | |
| device = yml_conf.get('device', 'CPU') | |
| run_mode = yml_conf.get('mode', 'paddle') | |
| cpu_threads = yml_conf.get('cpu_threads', 1) | |
| if device == 'CPU': | |
| config.disable_gpu() | |
| config.set_cpu_math_library_num_threads(cpu_threads) | |
| elif device == 'GPU': | |
| # initial GPU memory(M), device ID | |
| config.enable_use_gpu(200, 0) | |
| # optimize graph and fuse op | |
| config.switch_ir_optim(True) | |
| precision_map = { | |
| 'trt_int8': Config.Precision.Int8, | |
| 'trt_fp32': Config.Precision.Float32, | |
| 'trt_fp16': Config.Precision.Half | |
| } | |
| if run_mode in precision_map.keys(): | |
| config.enable_tensorrt_engine( | |
| workspace_size=(1 << 25) * batch_size, | |
| max_batch_size=batch_size, | |
| min_subgraph_size=yml_conf['min_subgraph_size'], | |
| precision_mode=precision_map[run_mode], | |
| use_static=True, | |
| use_calib_mode=False) | |
| if yml_conf['use_dynamic_shape']: | |
| min_input_shape = { | |
| 'image': [batch_size, 3, 640, 640], | |
| 'scale_factor': [batch_size, 2] | |
| } | |
| max_input_shape = { | |
| 'image': [batch_size, 3, 1280, 1280], | |
| 'scale_factor': [batch_size, 2] | |
| } | |
| opt_input_shape = { | |
| 'image': [batch_size, 3, 1024, 1024], | |
| 'scale_factor': [batch_size, 2] | |
| } | |
| config.set_trt_dynamic_shape_info(min_input_shape, max_input_shape, | |
| opt_input_shape) | |
| # disable print log when predict | |
| config.disable_glog_info() | |
| # enable shared memory | |
| config.enable_memory_optim() | |
| # disable feed, fetch OP, needed by zero_copy_run | |
| config.switch_use_feed_fetch_ops(False) | |
| self.predictor = create_predictor(config) | |
| self.yml_conf = yml_conf | |
| self.preprocess_ops = self.create_preprocess_ops(yml_conf) | |
| self.input_names = self.predictor.get_input_names() | |
| self.output_names = self.predictor.get_output_names() | |
| self.draw_threshold = yml_conf.get('draw_threshold', 0.5) | |
| self.class_names = yml_conf['label_list'] | |
| def create_preprocess_ops(self, yml_conf): | |
| preprocess_ops = [] | |
| for op_info in yml_conf['Preprocess']: | |
| new_op_info = op_info.copy() | |
| op_type = new_op_info.pop('type') | |
| preprocess_ops.append(eval(op_type)(**new_op_info)) | |
| return preprocess_ops | |
| def create_inputs(self, image_files): | |
| inputs = dict() | |
| im_list, im_info_list = [], [] | |
| for im_path in image_files: | |
| im, im_info = preprocess(im_path, self.preprocess_ops) | |
| im_list.append(im) | |
| im_info_list.append(im_info) | |
| inputs['im_shape'] = np.stack([e['im_shape'] for e in im_info_list], axis=0).astype('float32') | |
| inputs['scale_factor'] = np.stack([e['scale_factor'] for e in im_info_list], axis=0).astype('float32') | |
| inputs['image'] = np.stack(im_list, axis=0).astype('float32') | |
| return inputs | |
| def __call__(self, image_file): | |
| inputs = self.create_inputs([image_file]) | |
| for name in self.input_names: | |
| input_tensor = self.predictor.get_input_handle(name) | |
| input_tensor.copy_from_cpu(inputs[name]) | |
| self.predictor.run() | |
| boxes_tensor = self.predictor.get_output_handle(self.output_names[0]) | |
| np_boxes = boxes_tensor.copy_to_cpu() | |
| boxes_num = self.predictor.get_output_handle(self.output_names[1]) | |
| np_boxes_num = boxes_num.copy_to_cpu() | |
| if np_boxes_num.sum() <= 0: | |
| np_boxes = np.zeros([0, 6]) | |
| if isinstance(image_file, str): | |
| image = Image.open(image_file).convert('RGB') | |
| elif isinstance(image_file, np.ndarray): | |
| image = image_file | |
| expect_boxes = (np_boxes[:, 1] > self.draw_threshold) & (np_boxes[:, 0] > -1) | |
| np_boxes = np_boxes[expect_boxes, :] | |
| image = draw_det(image, np_boxes, self.class_names) | |
| return image, {'bboxes': np_boxes.tolist()} |