Spaces:
Sleeping
Sleeping
| # ------------------------------------------------------------------------ | |
| # RF-DETR | |
| # Copyright (c) 2025 Roboflow. All Rights Reserved. | |
| # Licensed under the Apache License, Version 2.0 [see LICENSE for details] | |
| # ------------------------------------------------------------------------ | |
| # Modified from LW-DETR (https://github.com/Atten4Vis/LW-DETR) | |
| # Copyright (c) 2024 Baidu. All Rights Reserved. | |
| # ------------------------------------------------------------------------ | |
| """ | |
| export ONNX model and TensorRT engine for deployment | |
| """ | |
| import os | |
| import ast | |
| import random | |
| import argparse | |
| import subprocess | |
| import torch.nn as nn | |
| from pathlib import Path | |
| import time | |
| from collections import defaultdict | |
| import onnx | |
| import torch | |
| import onnxsim | |
| import numpy as np | |
| from PIL import Image | |
| import rfdetr.util.misc as utils | |
| import rfdetr.datasets.transforms as T | |
| from rfdetr.models import build_model | |
| from rfdetr.deploy._onnx import OnnxOptimizer | |
| import re | |
| import sys | |
| def run_command_shell(command, dry_run:bool = False) -> int: | |
| if dry_run: | |
| print("") | |
| print(f"CUDA_VISIBLE_DEVICES={os.environ['CUDA_VISIBLE_DEVICES']} {command}") | |
| print("") | |
| try: | |
| result = subprocess.run(command, shell=True, capture_output=True, text=True) | |
| return result | |
| except subprocess.CalledProcessError as e: | |
| print(f"Command failed with exit code {e.returncode}") | |
| print(f"Error output:\n{e.stderr.decode('utf-8')}") | |
| raise | |
| def make_infer_image(infer_dir, shape, batch_size, device="cuda"): | |
| if infer_dir is None: | |
| dummy = np.random.randint(0, 256, (shape[0], shape[1], 3), dtype=np.uint8) | |
| image = Image.fromarray(dummy, mode="RGB") | |
| else: | |
| image = Image.open(infer_dir).convert("RGB") | |
| transforms = T.Compose([ | |
| T.SquareResize([shape[0]]), | |
| T.ToTensor(), | |
| T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) | |
| ]) | |
| inps, _ = transforms(image, None) | |
| inps = inps.to(device) | |
| # inps = utils.nested_tensor_from_tensor_list([inps for _ in range(args.batch_size)]) | |
| inps = torch.stack([inps for _ in range(batch_size)]) | |
| return inps | |
| def export_onnx(output_dir, model, input_names, input_tensors, output_names, dynamic_axes, backbone_only=False, verbose=True, opset_version=17): | |
| export_name = "backbone_model" if backbone_only else "inference_model" | |
| output_file = os.path.join(output_dir, f"{export_name}.onnx") | |
| # Prepare model for export | |
| if hasattr(model, "export"): | |
| model.export() | |
| torch.onnx.export( | |
| model, | |
| input_tensors, | |
| output_file, | |
| input_names=input_names, | |
| output_names=output_names, | |
| export_params=True, | |
| keep_initializers_as_inputs=False, | |
| do_constant_folding=True, | |
| verbose=verbose, | |
| opset_version=opset_version, | |
| dynamic_axes=dynamic_axes) | |
| print(f'\nSuccessfully exported ONNX model: {output_file}') | |
| return output_file | |
| def onnx_simplify(onnx_dir:str, input_names, input_tensors, force=False): | |
| sim_onnx_dir = onnx_dir.replace(".onnx", ".sim.onnx") | |
| if os.path.isfile(sim_onnx_dir) and not force: | |
| return sim_onnx_dir | |
| if isinstance(input_tensors, torch.Tensor): | |
| input_tensors = [input_tensors] | |
| print(f'start simplify ONNX model: {onnx_dir}') | |
| opt = OnnxOptimizer(onnx_dir) | |
| opt.info('Model: original') | |
| opt.common_opt() | |
| opt.info('Model: optimized') | |
| opt.save_onnx(sim_onnx_dir) | |
| input_dict = {name: tensor.detach().cpu().numpy() for name, tensor in zip(input_names, input_tensors)} | |
| model_opt, check_ok = onnxsim.simplify( | |
| onnx_dir, | |
| check_n = 3, | |
| input_data=input_dict, | |
| dynamic_input_shape=False) | |
| if check_ok: | |
| onnx.save(model_opt, sim_onnx_dir) | |
| else: | |
| raise RuntimeError("Failed to simplify ONNX model.") | |
| print(f'Successfully simplified ONNX model: {sim_onnx_dir}') | |
| return sim_onnx_dir | |
| def trtexec(onnx_dir:str, args) -> None: | |
| engine_dir = onnx_dir.replace(".onnx", f".engine") | |
| # Base trtexec command | |
| trt_command = " ".join([ | |
| "trtexec", | |
| f"--onnx={onnx_dir}", | |
| f"--saveEngine={engine_dir}", | |
| f"--memPoolSize=workspace:4096 --fp16", | |
| f"--useCudaGraph --useSpinWait --warmUp=500 --avgRuns=1000 --duration=10", | |
| f"{'--verbose' if args.verbose else ''}"]) | |
| if args.profile: | |
| profile_dir = onnx_dir.replace(".onnx", f".nsys-rep") | |
| # Wrap with nsys profile command | |
| command = " ".join([ | |
| "nsys profile", | |
| f"--output={profile_dir}", | |
| "--trace=cuda,nvtx", | |
| "--force-overwrite true", | |
| trt_command | |
| ]) | |
| print(f'Profile data will be saved to: {profile_dir}') | |
| else: | |
| command = trt_command | |
| output = run_command_shell(command, args.dry_run) | |
| stats = parse_trtexec_output(output.stdout) | |
| def parse_trtexec_output(output_text): | |
| print(output_text) | |
| # Common patterns in trtexec output | |
| gpu_compute_pattern = r"GPU Compute Time: min = (\d+\.\d+) ms, max = (\d+\.\d+) ms, mean = (\d+\.\d+) ms, median = (\d+\.\d+) ms" | |
| h2d_pattern = r"Host to Device Transfer Time: min = (\d+\.\d+) ms, max = (\d+\.\d+) ms, mean = (\d+\.\d+) ms" | |
| d2h_pattern = r"Device to Host Transfer Time: min = (\d+\.\d+) ms, max = (\d+\.\d+) ms, mean = (\d+\.\d+) ms" | |
| latency_pattern = r"Latency: min = (\d+\.\d+) ms, max = (\d+\.\d+) ms, mean = (\d+\.\d+) ms" | |
| throughput_pattern = r"Throughput: (\d+\.\d+) qps" | |
| stats = {} | |
| # Extract compute times | |
| if match := re.search(gpu_compute_pattern, output_text): | |
| stats.update({ | |
| 'compute_min_ms': float(match.group(1)), | |
| 'compute_max_ms': float(match.group(2)), | |
| 'compute_mean_ms': float(match.group(3)), | |
| 'compute_median_ms': float(match.group(4)) | |
| }) | |
| # Extract H2D times | |
| if match := re.search(h2d_pattern, output_text): | |
| stats.update({ | |
| 'h2d_min_ms': float(match.group(1)), | |
| 'h2d_max_ms': float(match.group(2)), | |
| 'h2d_mean_ms': float(match.group(3)) | |
| }) | |
| # Extract D2H times | |
| if match := re.search(d2h_pattern, output_text): | |
| stats.update({ | |
| 'd2h_min_ms': float(match.group(1)), | |
| 'd2h_max_ms': float(match.group(2)), | |
| 'd2h_mean_ms': float(match.group(3)) | |
| }) | |
| if match := re.search(latency_pattern, output_text): | |
| stats.update({ | |
| 'latency_min_ms': float(match.group(1)), | |
| 'latency_max_ms': float(match.group(2)), | |
| 'latency_mean_ms': float(match.group(3)) | |
| }) | |
| # Extract throughput | |
| if match := re.search(throughput_pattern, output_text): | |
| stats['throughput_qps'] = float(match.group(1)) | |
| return stats | |
| def no_batch_norm(model): | |
| for module in model.modules(): | |
| if isinstance(module, nn.BatchNorm2d): | |
| raise ValueError("BatchNorm2d found in the model. Please remove it.") | |
| def main(args): | |
| print("git:\n {}\n".format(utils.get_sha())) | |
| print(args) | |
| # convert device to device_id | |
| if args.device == 'cuda': | |
| device_id = "0" | |
| elif args.device == 'cpu': | |
| device_id = "" | |
| else: | |
| device_id = str(int(args.device)) | |
| args.device = f"cuda:{device_id}" | |
| # device for export onnx | |
| # TODO: export onnx with cuda failed with onnx error | |
| device = torch.device("cpu") | |
| os.environ["CUDA_VISIBLE_DEVICES"] = device_id | |
| # fix the seed for reproducibility | |
| seed = args.seed + utils.get_rank() | |
| torch.manual_seed(seed) | |
| np.random.seed(seed) | |
| random.seed(seed) | |
| model, criterion, postprocessors = build_model(args) | |
| n_parameters = sum(p.numel() for p in model.parameters()) | |
| print(f"number of parameters: {n_parameters}") | |
| n_backbone_parameters = sum(p.numel() for p in model.backbone.parameters()) | |
| print(f"number of backbone parameters: {n_backbone_parameters}") | |
| n_projector_parameters = sum(p.numel() for p in model.backbone[0].projector.parameters()) | |
| print(f"number of projector parameters: {n_projector_parameters}") | |
| n_backbone_encoder_parameters = sum(p.numel() for p in model.backbone[0].encoder.parameters()) | |
| print(f"number of backbone encoder parameters: {n_backbone_encoder_parameters}") | |
| n_transformer_parameters = sum(p.numel() for p in model.transformer.parameters()) | |
| print(f"number of transformer parameters: {n_transformer_parameters}") | |
| if args.resume: | |
| checkpoint = torch.load(args.resume, map_location='cpu') | |
| model.load_state_dict(checkpoint['model'], strict=True) | |
| print(f"load checkpoints {args.resume}") | |
| if args.layer_norm: | |
| no_batch_norm(model) | |
| model.to(device) | |
| input_tensors = make_infer_image(args, device) | |
| input_names = ['input'] | |
| output_names = ['features'] if args.backbone_only else ['dets', 'labels'] | |
| dynamic_axes = None | |
| # Run model inference in pytorch mode | |
| model.eval().to("cuda") | |
| input_tensors = input_tensors.to("cuda") | |
| with torch.no_grad(): | |
| if args.backbone_only: | |
| features = model(input_tensors) | |
| print(f"PyTorch inference output shape: {features.shape}") | |
| else: | |
| outputs = model(input_tensors) | |
| dets = outputs['pred_boxes'] | |
| labels = outputs['pred_logits'] | |
| print(f"PyTorch inference output shapes - Boxes: {dets.shape}, Labels: {labels.shape}") | |
| model.cpu() | |
| input_tensors = input_tensors.cpu() | |
| output_file = export_onnx(model, args, input_names, input_tensors, output_names, dynamic_axes) | |
| if args.simplify: | |
| output_file = onnx_simplify(output_file, input_names, input_tensors, args) | |
| if args.tensorrt: | |
| output_file = trtexec(output_file, args) | |