| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import argparse |
| import json |
| import os |
| from pathlib import Path |
|
|
| if 'PYTORCH_CUDA_ALLOC_CONF' in os.environ: |
| alloc_conf = os.environ['PYTORCH_CUDA_ALLOC_CONF'] |
| if 'expandable_segments' in alloc_conf: |
| |
| new_conf = ','.join([opt for opt in alloc_conf.split(',') if 'expandable_segments' not in opt]) |
| if new_conf: |
| os.environ['PYTORCH_CUDA_ALLOC_CONF'] = new_conf |
| else: |
| os.environ.pop('PYTORCH_CUDA_ALLOC_CONF', None) |
|
|
| import cv2 |
| import numpy as np |
| import torch |
| from loguru import logger |
| from PIL import Image |
| from tqdm import tqdm |
|
|
| from damo.base_models.core.ops import RepConv |
| from damo.config.base import parse_config |
| from damo.detectors.detector import build_local_model |
| from damo.utils import postprocess |
| from damo.utils.demo_utils import transform_img |
| from damo.structures.image_list import ImageList |
| from damo.structures.bounding_box import BoxList |
|
|
|
|
| def pad_image(img, target_size): |
| """Pad image to target size.""" |
| n, c, h, w = img.shape |
| assert n == 1 |
| assert h <= target_size[0] and w <= target_size[1] |
| target_size = [n, c, target_size[0], target_size[1]] |
| pad_imgs = torch.zeros(*target_size) |
| pad_imgs[:, :c, :h, :w].copy_(img) |
|
|
| img_sizes = [img.shape[-2:]] |
| pad_sizes = [pad_imgs.shape[-2:]] |
|
|
| return ImageList(pad_imgs, img_sizes, pad_sizes) |
|
|
|
|
| def get_image_files(image_dir): |
| """Get all image files from directory.""" |
| image_extensions = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif', '.webp'} |
| image_dir = Path(image_dir) |
| image_files = [] |
| for ext in image_extensions: |
| image_files.extend(image_dir.glob(f'*{ext}')) |
| image_files.extend(image_dir.glob(f'*{ext.upper()}')) |
| return sorted(image_files) |
|
|
|
|
| class BatchInfer: |
| def __init__(self, config, infer_size=[640, 640], device='cuda', ckpt=None): |
| """Initialize inference engine.""" |
| self.ckpt_path = ckpt |
| suffix = ckpt.split('.')[-1] |
| if suffix == 'onnx': |
| self.engine_type = 'onnx' |
| elif suffix == 'trt': |
| self.engine_type = 'tensorRT' |
| elif suffix in ['pt', 'pth']: |
| self.engine_type = 'torch' |
| else: |
| raise ValueError(f'Unknown checkpoint format: {suffix}') |
| |
| if torch.cuda.is_available() and device == 'cuda': |
| self.device = 'cuda' |
| else: |
| self.device = 'cpu' |
| logger.warning('CUDA not available, using CPU') |
|
|
| if "class_names" in config.dataset: |
| self.class_names = config.dataset.class_names |
| else: |
| self.class_names = [] |
| for i in range(config.model.head.num_classes): |
| self.class_names.append(str(i)) |
| self.class_names = tuple(self.class_names) |
|
|
| self.infer_size = infer_size |
| config.dataset.size_divisibility = 0 |
| self.config = config |
| self.model = self._build_engine(self.config, self.engine_type) |
|
|
| def _build_engine(self, config, engine_type): |
| """Build inference engine.""" |
| logger.info(f'Inference with {engine_type} engine!') |
| if engine_type == 'torch': |
| model = build_local_model(config, self.device) |
| ckpt = torch.load(self.ckpt_path, map_location=self.device) |
| model.load_state_dict(ckpt['model'], strict=True) |
| for layer in model.modules(): |
| if isinstance(layer, RepConv): |
| layer.switch_to_deploy() |
| model.eval() |
| return model |
| elif engine_type == 'tensorRT': |
| raise NotImplementedError('TensorRT inference not implemented in this script. Use demo.py instead.') |
| elif engine_type == 'onnx': |
| raise NotImplementedError('ONNX inference not implemented in this script. Use demo.py instead.') |
| else: |
| raise NotImplementedError(f'{engine_type} is not supported yet! Please use one of [onnx, torch, tensorRT]') |
|
|
| def preprocess(self, origin_img): |
| """Preprocess image for inference.""" |
| img = transform_img(origin_img, 0, |
| **self.config.test.augment.transform, |
| infer_size=self.infer_size) |
| oh, ow, _ = origin_img.shape |
| img = pad_image(img.tensors, self.infer_size) |
| img = img.to(self.device) |
| return img, (ow, oh) |
|
|
| def forward(self, origin_image): |
| """Run inference on image.""" |
| image, origin_shape = self.preprocess(origin_image) |
| with torch.no_grad(): |
| output = self.model(image) |
| return output, image, origin_shape |
|
|
| def postprocess_to_coco(self, preds, image, origin_shape): |
| """Postprocess predictions to COCO format.""" |
| output = preds[0] |
| output = output.resize(origin_shape) |
| output = output.convert('xywh') |
| |
| |
| if len(output) == 0: |
| return [] |
| |
| bboxes = output.bbox.cpu().detach().numpy() |
| scores = output.get_field('scores').cpu().detach().numpy() |
| labels = output.get_field('labels').cpu().detach().numpy() |
| |
| |
| |
| category_ids = labels + 1 |
| |
| coco_results = [] |
| for k in range(len(bboxes)): |
| coco_results.append({ |
| 'image_id': None, |
| 'category_id': int(category_ids[k]), |
| 'bbox': bboxes[k].tolist(), |
| 'score': float(scores[k]), |
| }) |
| |
| return coco_results |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser('DAMO-YOLO Directory Inference') |
| parser.add_argument( |
| '--model_path', |
| required=True, |
| type=str, |
| help='Path to model checkpoint (.pth, .pt)' |
| ) |
| parser.add_argument( |
| '--config', |
| required=True, |
| type=str, |
| help='Path to config file' |
| ) |
| parser.add_argument( |
| '--image_dir', |
| required=True, |
| type=str, |
| help='Path to directory containing images' |
| ) |
| parser.add_argument( |
| '--output_json', |
| required=True, |
| type=str, |
| help='Path to output JSON file (COCO format)' |
| ) |
| parser.add_argument( |
| '--infer_size', |
| nargs='+', |
| type=int, |
| default=[640, 640], |
| help='Inference image size [height width]' |
| ) |
| parser.add_argument( |
| '--device', |
| default='cuda', |
| type=str, |
| help='Device for inference (cuda or cpu)' |
| ) |
| parser.add_argument( |
| '--conf_threshold', |
| default=None, |
| type=float, |
| help='Confidence threshold (uses config default if not specified)' |
| ) |
| |
| args = parser.parse_args() |
| |
| |
| config = parse_config(args.config) |
| |
| |
| if args.conf_threshold is not None: |
| config.model.head.nms_conf_thre = args.conf_threshold |
| |
| |
| if len(args.infer_size) == 1: |
| infer_size = [args.infer_size[0], args.infer_size[0]] |
| elif len(args.infer_size) == 2: |
| infer_size = args.infer_size |
| else: |
| raise ValueError('infer_size should be 1 or 2 values') |
| |
| |
| logger.info(f'Loading model from {args.model_path}') |
| infer_engine = BatchInfer( |
| config, |
| infer_size=infer_size, |
| device=args.device, |
| ckpt=args.model_path |
| ) |
| |
| |
| image_files = get_image_files(args.image_dir) |
| if len(image_files) == 0: |
| logger.error(f'No image files found in {args.image_dir}') |
| return |
| |
| logger.info(f'Found {len(image_files)} images') |
| |
| |
| all_results = [] |
| |
| for img_id, image_path in enumerate(tqdm(image_files, desc='Processing images')): |
| |
| origin_img = cv2.imread(str(image_path)) |
| if origin_img is None: |
| logger.warning(f'Failed to load image: {image_path}') |
| continue |
| |
| origin_img = cv2.cvtColor(origin_img, cv2.COLOR_BGR2RGB) |
| |
| |
| preds, image, origin_shape = infer_engine.forward(origin_img) |
| |
| |
| coco_results = infer_engine.postprocess_to_coco(preds, image, origin_shape) |
| |
| |
| image_id = image_path.stem |
| |
| for result in coco_results: |
| result['image_id'] = image_id |
| all_results.append(result) |
| |
| |
| output_dir = Path(args.output_json).parent |
| output_dir.mkdir(parents=True, exist_ok=True) |
| |
| with open(args.output_json, 'w') as f: |
| json.dump(all_results, f, indent=2) |
| |
| logger.info(f'Saved {len(all_results)} detections to {args.output_json}') |
| logger.info(f'Processed {len(image_files)} images') |
|
|
|
|
| if __name__ == '__main__': |
| main() |
|
|
|
|
|
|