|
|
r"""Importing this file includes common utility methods for checking quantized |
|
|
tensors and modules. |
|
|
""" |
|
|
import numpy as np |
|
|
import torch |
|
|
from contextlib import contextmanager |
|
|
from torch.testing._internal.common_utils import TEST_WITH_ASAN, TEST_WITH_TSAN, TEST_WITH_UBSAN, IS_PPC, IS_MACOS, IS_WINDOWS |
|
|
|
|
|
supported_qengines = torch.backends.quantized.supported_engines |
|
|
supported_qengines.remove('none') |
|
|
|
|
|
|
|
|
|
|
|
if 'qnnpack' in supported_qengines and any([IS_PPC, TEST_WITH_ASAN, TEST_WITH_TSAN, TEST_WITH_UBSAN, IS_MACOS, IS_WINDOWS]): |
|
|
supported_qengines.remove('qnnpack') |
|
|
|
|
|
def _conv_output_shape(input_size, kernel_size, padding, stride, dilation, |
|
|
output_padding=0): |
|
|
"""Computes the output shape given convolution parameters.""" |
|
|
return np.floor((input_size + 2 * padding - kernel_size - (kernel_size - 1) |
|
|
* (dilation - 1)) / stride) + 2 * output_padding + 1 |
|
|
|
|
|
|
|
|
def _quantize(x, scale, zero_point, qmin=None, qmax=None, dtype=np.uint8): |
|
|
"""Quantizes a numpy array.""" |
|
|
if qmin is None: |
|
|
qmin = np.iinfo(dtype).min |
|
|
if qmax is None: |
|
|
qmax = np.iinfo(dtype).max |
|
|
qx = np.round(x / scale + zero_point).astype(np.int64) |
|
|
qx = np.clip(qx, qmin, qmax) |
|
|
qx = qx.astype(dtype) |
|
|
return qx |
|
|
|
|
|
|
|
|
def _dequantize(qx, scale, zero_point): |
|
|
"""Dequantizes a numpy array.""" |
|
|
x = (qx.astype(float) - zero_point) * scale |
|
|
return x |
|
|
|
|
|
|
|
|
def _requantize(x, multiplier, zero_point, qmin=0, qmax=255, qtype=np.uint8): |
|
|
"""Requantizes a numpy array, i.e., intermediate int32 or int16 values are |
|
|
converted back to given type""" |
|
|
qx = (x * multiplier).round() + zero_point |
|
|
qx = np.clip(qx, qmin, qmax).astype(qtype) |
|
|
return qx |
|
|
|
|
|
def _calculate_dynamic_qparams(X, dtype, reduce_range=False, qscheme=torch.per_tensor_affine): |
|
|
"""Calculate the dynamic quantization parameters (scale, zero_point) |
|
|
according to the min and max element of the tensor""" |
|
|
assert qscheme in (torch.per_tensor_affine, torch.per_tensor_symmetric) |
|
|
if qscheme == torch.per_tensor_symmetric: |
|
|
assert dtype == torch.qint8 |
|
|
if isinstance(X, torch.Tensor): |
|
|
X = X.numpy() |
|
|
if dtype == torch.qint8: |
|
|
if reduce_range: |
|
|
qmin, qmax = -64, 63 |
|
|
else: |
|
|
qmin, qmax = -128, 127 |
|
|
else: |
|
|
if reduce_range: |
|
|
qmin, qmax = 0, 127 |
|
|
else: |
|
|
qmin, qmax = 0, 255 |
|
|
min_val = X.min() |
|
|
max_val = X.max() |
|
|
is_symmetric = (qscheme == torch.per_tensor_symmetric) |
|
|
if min_val == max_val: |
|
|
scale = 1.0 |
|
|
zero_point = 0 |
|
|
else: |
|
|
if is_symmetric: |
|
|
max_val = max(max_val, -min_val) |
|
|
min_val = -max_val |
|
|
scale = (max_val - min_val) / (qmax - qmin) |
|
|
scale = max(scale, np.finfo(np.float32).eps) |
|
|
zero_point = 0 |
|
|
else: |
|
|
max_val = max(max_val, 0.0) |
|
|
min_val = min(min_val, 0.0) |
|
|
scale = (max_val - min_val) / (qmax - qmin) |
|
|
scale = max(scale, np.finfo(np.float32).eps) |
|
|
zero_point = qmin - round(min_val / scale) |
|
|
zero_point = max(qmin, zero_point) |
|
|
zero_point = min(qmax, zero_point) |
|
|
return [float(scale), int(zero_point)] |
|
|
|
|
|
def _calculate_dynamic_per_channel_qparams(X, dtype): |
|
|
"""Calculate the dynamic quantization parameters (scale, zero_point) |
|
|
according to the min and max element of the tensor""" |
|
|
if isinstance(X, torch.Tensor): |
|
|
X = X.numpy() |
|
|
qmin, qmax = torch.iinfo(dtype).min, torch.iinfo(dtype).max |
|
|
n_levels = qmax - qmin |
|
|
scale = np.zeros(X.shape[0], dtype=np.float64) |
|
|
zero_point = np.zeros(X.shape[0], dtype=np.int64) |
|
|
for i in range(zero_point.shape[0]): |
|
|
min_val = X.min() |
|
|
max_val = X.max() |
|
|
if min_val == max_val: |
|
|
scale[i] = 1.0 |
|
|
zero_point[i] = 0 |
|
|
else: |
|
|
max_val = max(max_val, 0.0) |
|
|
min_val = min(min_val, 0.0) |
|
|
scale[i] = (max_val - min_val) / n_levels |
|
|
scale[i] = max(scale[i], np.finfo(np.float32).eps) |
|
|
zero_point[i] = qmin - round(min_val / scale[i]) |
|
|
zero_point[i] = max(qmin, zero_point[i]) |
|
|
zero_point[i] = min(qmax, zero_point[i]) |
|
|
|
|
|
return scale, zero_point |
|
|
|
|
|
def _snr(x, x_hat): |
|
|
"""Calculates the signal to noise ratio and returns the signal and noise |
|
|
power, as well as the SNR in dB. |
|
|
If the input is a list/tuple this function is called recursively on each |
|
|
element. The result will have the same nested structure as the inputs. |
|
|
|
|
|
Args: |
|
|
x, x_hat: Either a tensor or a nested list/tuple of tensors. |
|
|
Returns: |
|
|
signal, noise, SNR(in dB): Either floats or a nested list of floats |
|
|
""" |
|
|
if isinstance(x, (list, tuple)): |
|
|
assert(len(x) == len(x_hat)) |
|
|
res = [] |
|
|
for idx in range(len(x)): |
|
|
res.append(_snr(x[idx], x_hat[idx])) |
|
|
return res |
|
|
if x_hat.is_quantized: |
|
|
x_hat = x_hat.dequantize() |
|
|
if x.is_quantized: |
|
|
x = x.dequantize() |
|
|
noise = (x - x_hat).norm() |
|
|
if noise == 0: |
|
|
return 0.0, float('inf'), float('inf') |
|
|
signal = x.norm() |
|
|
snr = signal / noise |
|
|
snr_db = 20 * snr.log10() |
|
|
return signal, noise, snr_db |
|
|
|
|
|
@contextmanager |
|
|
def override_quantized_engine(qengine): |
|
|
previous = torch.backends.quantized.engine |
|
|
torch.backends.quantized.engine = qengine |
|
|
try: |
|
|
yield |
|
|
finally: |
|
|
torch.backends.quantized.engine = previous |
|
|
|
|
|
@contextmanager |
|
|
def override_cpu_allocator_for_qnnpack(qengine_is_qnnpack): |
|
|
try: |
|
|
if qengine_is_qnnpack: |
|
|
torch._C._set_default_mobile_cpu_allocator() |
|
|
yield |
|
|
finally: |
|
|
if qengine_is_qnnpack: |
|
|
torch._C._unset_default_mobile_cpu_allocator() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def override_qengines(qfunction): |
|
|
def test_fn(*args, **kwargs): |
|
|
for qengine in supported_qengines: |
|
|
with override_quantized_engine(qengine): |
|
|
|
|
|
qfunction(*args, **kwargs) |
|
|
return test_fn |
|
|
|
|
|
def qengine_is_fbgemm(): |
|
|
return torch.backends.quantized.engine == 'fbgemm' |
|
|
def qengine_is_qnnpack(): |
|
|
return torch.backends.quantized.engine == 'qnnpack' |
|
|
def qengine_is_onednn(): |
|
|
return torch.backends.quantized.engine == 'onednn' |
|
|
def qengine_is_x86(): |
|
|
return torch.backends.quantized.engine == 'x86' |
|
|
|
|
|
|
|
|
def _permute_to_axis_zero(X, axis): |
|
|
new_axis_list = list(range(X.dim())) |
|
|
new_axis_list[axis] = 0 |
|
|
new_axis_list[0] = axis |
|
|
y = X.permute(tuple(new_axis_list)) |
|
|
return y, new_axis_list |
|
|
|
|
|
|
|
|
|
|
|
def _fake_quantize_per_channel_affine_reference(X, per_channel_scale, per_channel_zero_point, axis, quant_min, quant_max): |
|
|
dtype = X.dtype |
|
|
X, permute_axis_list = _permute_to_axis_zero(X.to(torch.float32), axis) |
|
|
res = torch.zeros_like(X) |
|
|
|
|
|
for i in range(X.size()[0]): |
|
|
res[i] = (torch.clamp(torch.round(X[i] * (1.0 / per_channel_scale[i]) + |
|
|
per_channel_zero_point[i]), quant_min, quant_max) - per_channel_zero_point[i]) * per_channel_scale[i] |
|
|
|
|
|
out = res.permute(tuple(permute_axis_list)) |
|
|
return out.to(dtype) |
|
|
|
|
|
|
|
|
|
|
|
def _fake_quantize_per_channel_affine_grad_reference(dY, X, per_channel_scale, per_channel_zero_point, axis, quant_min, quant_max): |
|
|
dtype = X.dtype |
|
|
X, permute_axis_list = _permute_to_axis_zero(X.to(torch.float32), axis) |
|
|
Xq = torch.zeros_like(X) |
|
|
for i in range(X.size()[0]): |
|
|
Xq[i] = torch.round(X[i] * (1.0 / per_channel_scale[i]) + per_channel_zero_point[i]) |
|
|
Xq = Xq.permute(tuple(permute_axis_list)) |
|
|
mask = (Xq >= quant_min) * (Xq <= quant_max) |
|
|
res = torch.zeros_like(dY) |
|
|
res[mask] = dY[mask] |
|
|
return res.to(dtype) |
|
|
|
|
|
def to_tensor(X, device): |
|
|
if not isinstance(X, torch.Tensor): |
|
|
X = torch.tensor(X) |
|
|
else: |
|
|
X = X.clone().detach() |
|
|
return X.to(device=torch.device(device), dtype=torch.float32) |
|
|
|