| | |
| | import io |
| | import numpy as np |
| | import os |
| | import re |
| | import tempfile |
| | import unittest |
| | from typing import Callable |
| | import torch |
| | import torch.onnx.symbolic_helper as sym_help |
| | from packaging import version |
| | from torch._C import ListType |
| | from torch.onnx import register_custom_op_symbolic |
| |
|
| | from detectron2 import model_zoo |
| | from detectron2.config import CfgNode, LazyConfig, instantiate |
| | from detectron2.data import DatasetCatalog |
| | from detectron2.data.detection_utils import read_image |
| | from detectron2.modeling import build_model |
| | from detectron2.structures import Boxes, Instances, ROIMasks |
| | from detectron2.utils.file_io import PathManager |
| |
|
| |
|
| | """ |
| | Internal utilities for tests. Don't use except for writing tests. |
| | """ |
| |
|
| |
|
| | def get_model_no_weights(config_path): |
| | """ |
| | Like model_zoo.get, but do not load any weights (even pretrained) |
| | """ |
| | cfg = model_zoo.get_config(config_path) |
| | if isinstance(cfg, CfgNode): |
| | if not torch.cuda.is_available(): |
| | cfg.MODEL.DEVICE = "cpu" |
| | return build_model(cfg) |
| | else: |
| | return instantiate(cfg.model) |
| |
|
| |
|
| | def random_boxes(num_boxes, max_coord=100, device="cpu"): |
| | """ |
| | Create a random Nx4 boxes tensor, with coordinates < max_coord. |
| | """ |
| | boxes = torch.rand(num_boxes, 4, device=device) * (max_coord * 0.5) |
| | boxes.clamp_(min=1.0) |
| | |
| | |
| | |
| | |
| | boxes[:, 2:] += boxes[:, :2] |
| | return boxes |
| |
|
| |
|
| | def get_sample_coco_image(tensor=True): |
| | """ |
| | Args: |
| | tensor (bool): if True, returns 3xHxW tensor. |
| | else, returns a HxWx3 numpy array. |
| | |
| | Returns: |
| | an image, in BGR color. |
| | """ |
| | try: |
| | file_name = DatasetCatalog.get("coco_2017_val_100")[0]["file_name"] |
| | if not PathManager.exists(file_name): |
| | raise FileNotFoundError() |
| | except IOError: |
| | |
| | file_name = PathManager.get_local_path( |
| | "http://images.cocodataset.org/train2017/000000000009.jpg" |
| | ) |
| | ret = read_image(file_name, format="BGR") |
| | if tensor: |
| | ret = torch.from_numpy(np.ascontiguousarray(ret.transpose(2, 0, 1))) |
| | return ret |
| |
|
| |
|
| | def convert_scripted_instances(instances): |
| | """ |
| | Convert a scripted Instances object to a regular :class:`Instances` object |
| | """ |
| | assert hasattr( |
| | instances, "image_size" |
| | ), f"Expect an Instances object, but got {type(instances)}!" |
| | ret = Instances(instances.image_size) |
| | for name in instances._field_names: |
| | val = getattr(instances, "_" + name, None) |
| | if val is not None: |
| | ret.set(name, val) |
| | return ret |
| |
|
| |
|
| | def assert_instances_allclose(input, other, *, rtol=1e-5, msg="", size_as_tensor=False): |
| | """ |
| | Args: |
| | input, other (Instances): |
| | size_as_tensor: compare image_size of the Instances as tensors (instead of tuples). |
| | Useful for comparing outputs of tracing. |
| | """ |
| | if not isinstance(input, Instances): |
| | input = convert_scripted_instances(input) |
| | if not isinstance(other, Instances): |
| | other = convert_scripted_instances(other) |
| |
|
| | if not msg: |
| | msg = "Two Instances are different! " |
| | else: |
| | msg = msg.rstrip() + " " |
| |
|
| | size_error_msg = msg + f"image_size is {input.image_size} vs. {other.image_size}!" |
| | if size_as_tensor: |
| | assert torch.equal( |
| | torch.tensor(input.image_size), torch.tensor(other.image_size) |
| | ), size_error_msg |
| | else: |
| | assert input.image_size == other.image_size, size_error_msg |
| | fields = sorted(input.get_fields().keys()) |
| | fields_other = sorted(other.get_fields().keys()) |
| | assert fields == fields_other, msg + f"Fields are {fields} vs {fields_other}!" |
| |
|
| | for f in fields: |
| | val1, val2 = input.get(f), other.get(f) |
| | if isinstance(val1, (Boxes, ROIMasks)): |
| | |
| | assert torch.allclose(val1.tensor, val2.tensor, atol=100 * rtol), ( |
| | msg + f"Field {f} differs too much!" |
| | ) |
| | elif isinstance(val1, torch.Tensor): |
| | if val1.dtype.is_floating_point: |
| | mag = torch.abs(val1).max().cpu().item() |
| | assert torch.allclose(val1, val2, atol=mag * rtol), ( |
| | msg + f"Field {f} differs too much!" |
| | ) |
| | else: |
| | assert torch.equal(val1, val2), msg + f"Field {f} is different!" |
| | else: |
| | raise ValueError(f"Don't know how to compare type {type(val1)}") |
| |
|
| |
|
| | def reload_script_model(module): |
| | """ |
| | Save a jit module and load it back. |
| | Similar to the `getExportImportCopy` function in torch/testing/ |
| | """ |
| | buffer = io.BytesIO() |
| | torch.jit.save(module, buffer) |
| | buffer.seek(0) |
| | return torch.jit.load(buffer) |
| |
|
| |
|
| | def reload_lazy_config(cfg): |
| | """ |
| | Save an object by LazyConfig.save and load it back. |
| | This is used to test that a config still works the same after |
| | serialization/deserialization. |
| | """ |
| | with tempfile.TemporaryDirectory(prefix="detectron2") as d: |
| | fname = os.path.join(d, "d2_cfg_test.yaml") |
| | LazyConfig.save(cfg, fname) |
| | return LazyConfig.load(fname) |
| |
|
| |
|
| | def min_torch_version(min_version: str) -> bool: |
| | """ |
| | Returns True when torch's version is at least `min_version`. |
| | """ |
| | try: |
| | import torch |
| | except ImportError: |
| | return False |
| |
|
| | installed_version = version.parse(torch.__version__.split("+")[0]) |
| | min_version = version.parse(min_version) |
| | return installed_version >= min_version |
| |
|
| |
|
| | def has_dynamic_axes(onnx_model): |
| | """ |
| | Return True when all ONNX input/output have only dynamic axes for all ranks |
| | """ |
| | return all( |
| | not dim.dim_param.isnumeric() |
| | for inp in onnx_model.graph.input |
| | for dim in inp.type.tensor_type.shape.dim |
| | ) and all( |
| | not dim.dim_param.isnumeric() |
| | for out in onnx_model.graph.output |
| | for dim in out.type.tensor_type.shape.dim |
| | ) |
| |
|
| |
|
| | def register_custom_op_onnx_export( |
| | opname: str, symbolic_fn: Callable, opset_version: int, min_version: str |
| | ) -> None: |
| | """ |
| | Register `symbolic_fn` as PyTorch's symbolic `opname`-`opset_version` for ONNX export. |
| | The registration is performed only when current PyTorch's version is < `min_version.` |
| | IMPORTANT: symbolic must be manually unregistered after the caller function returns |
| | """ |
| | if min_torch_version(min_version): |
| | return |
| | register_custom_op_symbolic(opname, symbolic_fn, opset_version) |
| | print(f"_register_custom_op_onnx_export({opname}, {opset_version}) succeeded.") |
| |
|
| |
|
| | def unregister_custom_op_onnx_export(opname: str, opset_version: int, min_version: str) -> None: |
| | """ |
| | Unregister PyTorch's symbolic `opname`-`opset_version` for ONNX export. |
| | The un-registration is performed only when PyTorch's version is < `min_version` |
| | IMPORTANT: The symbolic must have been manually registered by the caller, otherwise |
| | the incorrect symbolic may be unregistered instead. |
| | """ |
| |
|
| | |
| | |
| | try: |
| | from torch.onnx import unregister_custom_op_symbolic as _unregister_custom_op_symbolic |
| | except ImportError: |
| |
|
| | def _unregister_custom_op_symbolic(symbolic_name, opset_version): |
| | import torch.onnx.symbolic_registry as sym_registry |
| | from torch.onnx.symbolic_helper import _onnx_main_opset, _onnx_stable_opsets |
| |
|
| | def _get_ns_op_name_from_custom_op(symbolic_name): |
| | try: |
| | from torch.onnx.utils import get_ns_op_name_from_custom_op |
| |
|
| | ns, op_name = get_ns_op_name_from_custom_op(symbolic_name) |
| | except ImportError as import_error: |
| | if not bool( |
| | re.match(r"^[a-zA-Z0-9-_]*::[a-zA-Z-_]+[a-zA-Z0-9-_]*$", symbolic_name) |
| | ): |
| | raise ValueError( |
| | f"Invalid symbolic name {symbolic_name}. Must be `domain::name`" |
| | ) from import_error |
| |
|
| | ns, op_name = symbolic_name.split("::") |
| | if ns == "onnx": |
| | raise ValueError(f"{ns} domain cannot be modified.") from import_error |
| |
|
| | if ns == "aten": |
| | ns = "" |
| |
|
| | return ns, op_name |
| |
|
| | def _unregister_op(opname: str, domain: str, version: int): |
| | try: |
| | sym_registry.unregister_op(op_name, ns, ver) |
| | except AttributeError as attribute_error: |
| | if sym_registry.is_registered_op(opname, domain, version): |
| | del sym_registry._registry[(domain, version)][opname] |
| | if not sym_registry._registry[(domain, version)]: |
| | del sym_registry._registry[(domain, version)] |
| | else: |
| | raise RuntimeError( |
| | f"The opname {opname} is not registered." |
| | ) from attribute_error |
| |
|
| | ns, op_name = _get_ns_op_name_from_custom_op(symbolic_name) |
| | for ver in _onnx_stable_opsets + [_onnx_main_opset]: |
| | if ver >= opset_version: |
| | _unregister_op(op_name, ns, ver) |
| |
|
| | if min_torch_version(min_version): |
| | return |
| | _unregister_custom_op_symbolic(opname, opset_version) |
| | print(f"_unregister_custom_op_onnx_export({opname}, {opset_version}) succeeded.") |
| |
|
| |
|
| | skipIfOnCPUCI = unittest.skipIf( |
| | os.environ.get("CI") and not torch.cuda.is_available(), |
| | "The test is too slow on CPUs and will be executed on CircleCI's GPU jobs.", |
| | ) |
| |
|
| |
|
| | def skipIfUnsupportedMinOpsetVersion(min_opset_version, current_opset_version=None): |
| | """ |
| | Skips tests for ONNX Opset versions older than min_opset_version. |
| | """ |
| |
|
| | def skip_dec(func): |
| | def wrapper(self): |
| | try: |
| | opset_version = self.opset_version |
| | except AttributeError: |
| | opset_version = current_opset_version |
| | if opset_version < min_opset_version: |
| | raise unittest.SkipTest( |
| | f"Unsupported opset_version {opset_version}" |
| | f", required is {min_opset_version}" |
| | ) |
| | return func(self) |
| |
|
| | return wrapper |
| |
|
| | return skip_dec |
| |
|
| |
|
| | def skipIfUnsupportedMinTorchVersion(min_version): |
| | """ |
| | Skips tests for PyTorch versions older than min_version. |
| | """ |
| | reason = f"module 'torch' has __version__ {torch.__version__}" f", required is: {min_version}" |
| | return unittest.skipIf(not min_torch_version(min_version), reason) |
| |
|
| |
|
| | |
| | def _pytorch1111_symbolic_opset9_to(g, self, *args): |
| | """aten::to() symbolic that must be used for testing with PyTorch < 1.11.1.""" |
| |
|
| | def is_aten_to_device_only(args): |
| | if len(args) == 4: |
| | |
| | return ( |
| | args[0].node().kind() == "prim::device" |
| | or args[0].type().isSubtypeOf(ListType.ofInts()) |
| | or ( |
| | sym_help._is_value(args[0]) |
| | and args[0].node().kind() == "onnx::Constant" |
| | and isinstance(args[0].node()["value"], str) |
| | ) |
| | ) |
| | elif len(args) == 5: |
| | |
| | |
| | dtype = sym_help._get_const(args[1], "i", "dtype") |
| | return dtype is None |
| | elif len(args) in (6, 7): |
| | |
| | |
| | |
| | dtype = sym_help._get_const(args[0], "i", "dtype") |
| | return dtype is None |
| | return False |
| |
|
| | |
| | if is_aten_to_device_only(args): |
| | return self |
| |
|
| | if len(args) == 4: |
| | |
| | |
| | |
| | dtype = args[0] |
| | if sym_help._is_value(args[0]) and args[0].node().kind() == "onnx::Constant": |
| | tval = args[0].node()["value"] |
| | if isinstance(tval, torch.Tensor): |
| | if len(tval.shape) == 0: |
| | tval = tval.item() |
| | dtype = int(tval) |
| | else: |
| | dtype = tval |
| |
|
| | if sym_help._is_value(dtype) or isinstance(dtype, torch.Tensor): |
| | |
| | dtype = args[0].type().scalarType() |
| | return g.op("Cast", self, to_i=sym_help.cast_pytorch_to_onnx[dtype]) |
| | else: |
| | |
| | |
| | return g.op("Cast", self, to_i=sym_help.scalar_type_to_onnx[dtype]) |
| | elif len(args) == 5: |
| | |
| | dtype = sym_help._get_const(args[1], "i", "dtype") |
| | |
| | return g.op("Cast", self, to_i=sym_help.scalar_type_to_onnx[dtype]) |
| | elif len(args) == 6: |
| | |
| | dtype = sym_help._get_const(args[0], "i", "dtype") |
| | |
| | return g.op("Cast", self, to_i=sym_help.scalar_type_to_onnx[dtype]) |
| | elif len(args) == 7: |
| | |
| | dtype = sym_help._get_const(args[0], "i", "dtype") |
| | |
| | return g.op("Cast", self, to_i=sym_help.scalar_type_to_onnx[dtype]) |
| | else: |
| | return sym_help._onnx_unsupported("Unknown aten::to signature") |
| |
|
| |
|
| | |
| | def _pytorch1111_symbolic_opset9_repeat_interleave(g, self, repeats, dim=None, output_size=None): |
| |
|
| | |
| | from torch.onnx.symbolic_opset9 import expand, unsqueeze |
| |
|
| | input = self |
| | |
| | |
| | if sym_help._is_none(dim): |
| | input = sym_help._reshape_helper(g, self, g.op("Constant", value_t=torch.tensor([-1]))) |
| | dim = 0 |
| | else: |
| | dim = sym_help._maybe_get_scalar(dim) |
| |
|
| | repeats_dim = sym_help._get_tensor_rank(repeats) |
| | repeats_sizes = sym_help._get_tensor_sizes(repeats) |
| | input_sizes = sym_help._get_tensor_sizes(input) |
| | if repeats_dim is None: |
| | raise RuntimeError( |
| | "Unsupported: ONNX export of repeat_interleave for unknown " "repeats rank." |
| | ) |
| | if repeats_sizes is None: |
| | raise RuntimeError( |
| | "Unsupported: ONNX export of repeat_interleave for unknown " "repeats size." |
| | ) |
| | if input_sizes is None: |
| | raise RuntimeError( |
| | "Unsupported: ONNX export of repeat_interleave for unknown " "input size." |
| | ) |
| |
|
| | input_sizes_temp = input_sizes.copy() |
| | for idx, input_size in enumerate(input_sizes): |
| | if input_size is None: |
| | input_sizes[idx], input_sizes_temp[idx] = 0, -1 |
| |
|
| | |
| | if repeats_dim == 0 or (repeats_dim == 1 and repeats_sizes[0] == 1): |
| | if not sym_help._is_tensor(repeats): |
| | repeats = g.op("Constant", value_t=torch.LongTensor(repeats)) |
| | if input_sizes[dim] == 0: |
| | return sym_help._onnx_opset_unsupported_detailed( |
| | "repeat_interleave", |
| | 9, |
| | 13, |
| | "Unsupported along dimension with unknown input size", |
| | ) |
| | else: |
| | reps = input_sizes[dim] |
| | repeats = expand(g, repeats, g.op("Constant", value_t=torch.tensor([reps])), None) |
| |
|
| | |
| | elif repeats_dim == 1: |
| | if input_sizes[dim] == 0: |
| | return sym_help._onnx_opset_unsupported_detailed( |
| | "repeat_interleave", |
| | 9, |
| | 13, |
| | "Unsupported along dimension with unknown input size", |
| | ) |
| | if repeats_sizes[0] is None: |
| | return sym_help._onnx_opset_unsupported_detailed( |
| | "repeat_interleave", 9, 13, "Unsupported for cases with dynamic repeats" |
| | ) |
| | assert ( |
| | repeats_sizes[0] == input_sizes[dim] |
| | ), "repeats must have the same size as input along dim" |
| | reps = repeats_sizes[0] |
| | else: |
| | raise RuntimeError("repeats must be 0-dim or 1-dim tensor") |
| |
|
| | final_splits = list() |
| | r_splits = sym_help._repeat_interleave_split_helper(g, repeats, reps, 0) |
| | if isinstance(r_splits, torch._C.Value): |
| | r_splits = [r_splits] |
| | i_splits = sym_help._repeat_interleave_split_helper(g, input, reps, dim) |
| | if isinstance(i_splits, torch._C.Value): |
| | i_splits = [i_splits] |
| | input_sizes[dim], input_sizes_temp[dim] = -1, 1 |
| | for idx, r_split in enumerate(r_splits): |
| | i_split = unsqueeze(g, i_splits[idx], dim + 1) |
| | r_concat = [ |
| | g.op("Constant", value_t=torch.LongTensor(input_sizes_temp[: dim + 1])), |
| | r_split, |
| | g.op("Constant", value_t=torch.LongTensor(input_sizes_temp[dim + 1 :])), |
| | ] |
| | r_concat = g.op("Concat", *r_concat, axis_i=0) |
| | i_split = expand(g, i_split, r_concat, None) |
| | i_split = sym_help._reshape_helper( |
| | g, |
| | i_split, |
| | g.op("Constant", value_t=torch.LongTensor(input_sizes)), |
| | allowzero=0, |
| | ) |
| | final_splits.append(i_split) |
| | return g.op("Concat", *final_splits, axis_i=dim) |
| |
|