import unittest import hypothesis.strategies as st from hypothesis import given import numpy as np from caffe2.proto import caffe2_pb2 from caffe2.python import core, workspace import caffe2.python.hypothesis_test_util as hu import caffe2.python.ideep_test_util as mu @unittest.skipIf(not workspace.C.use_mkldnn, "No MKLDNN support.") class ElementwiseSumTest(hu.HypothesisTestCase): @given(size=st.integers(7, 9), input_channels=st.integers(1, 3), batch_size=st.integers(1, 3), inputs=st.integers(2, 7), inplace=st.booleans(), **mu.gcs) def test_elementwise_sum(self, size, input_channels, batch_size, inputs, inplace, gc, dc): op = core.CreateOperator( "Sum", ["X_{}".format(i) for i in range(inputs)], ["X_0" if inplace else "Y"], ) Xs = [np.random.rand(batch_size, input_channels, size, size).astype( np.float32) for _ in range(inputs)] self.assertDeviceChecks(dc, op, Xs, [0]) @given(size=st.integers(7, 9), input_channels=st.integers(1, 3), batch_size=st.integers(1, 3), inputs=st.integers(2, 7), inplace=st.booleans(), **mu.gcs_cpu_ideep) def test_elementwise_sum_fallback(self, size, input_channels, batch_size, inputs, inplace, gc, dc): op = core.CreateOperator( "Sum", ["X_{}".format(i) for i in range(inputs)], ["X_0" if inplace else "Y"], device_option=dc[1] ) Xs = [np.random.rand(batch_size, input_channels, size, size).astype( np.float32) for _ in range(inputs)] sum_val = Xs[0] workspace.FeedBlob("X_0", Xs[0], dc[0]) for i, x in enumerate(Xs): if i == 0: continue sum_val += x workspace.FeedBlob("X_{}".format(i), x, dc[1]) workspace.RunOperatorOnce(op) Y = workspace.FetchBlob("X_0" if inplace else "Y") if not np.allclose(sum_val, Y, atol=0.01, rtol=0.01): print(Y.flatten()) print(sum_val.flatten()) print(np.max(np.abs(Y - sum_val))) self.assertTrue(False) @given(size=st.integers(7, 9), input_channels=st.integers(1, 3), batch_size=st.integers(1, 3), inputs=st.integers(2, 7), inplace=st.booleans(), **mu.gcs_cpu_ideep) def test_int8_elementwise_sum(self, size, input_channels, batch_size, inputs, inplace, gc, dc): sum_fp32 = core.CreateOperator( "Sum", ["X_{}".format(i) for i in range(inputs)], ["X_0" if inplace else "Y"], ) Xs = [np.random.rand(batch_size, input_channels, size, size).astype( np.float32) for _ in range(inputs)] old_ws_name = workspace.CurrentWorkspace() workspace.SwitchWorkspace("_device_check_", True) Xi_scales = [] Xi_zero_points = [] for i, X in enumerate(Xs): workspace.FeedBlob("X_{}".format(i), X, dc[0]) if X.min() >= 0: Xi_scales.append(np.absolute(X).max() / 0xFF) Xi_zero_points.append(0) else: Xi_scales.append(np.absolute(X).max() / 0x7F) Xi_zero_points.append(128) workspace.RunOperatorOnce(sum_fp32) Y = workspace.FetchBlob("X_0" if inplace else "Y") if Y.min() >= 0: Y_scale = np.absolute(Y).max() / 0xFF Y_zero_point = 0 else: Y_scale = np.absolute(Y).max() / 0x7F Y_zero_point = 128 workspace.ResetWorkspace() net = caffe2_pb2.NetDef() for i, Xi in enumerate(Xs): workspace.FeedBlob("Xi_{}".format(i), Xi, dc[1]) sw2nhwc = core.CreateOperator( "NCHW2NHWC", ["Xi_{}".format(i)], ["Xi_{}_nhwc".format(i)], device_option=dc[1] ) quantize = core.CreateOperator( "Int8Quantize", ["Xi_{}_nhwc".format(i)], ["Xi_{}_quantized".format(i)], engine="DNNLOWP", device_option=dc[1], Y_zero_point=Xi_zero_points[i], Y_scale=Xi_scales[i], ) net.op.extend([sw2nhwc, quantize]) sum = core.CreateOperator( "Int8Sum", ["Xi_{}_quantized".format(i) for i in range(inputs)], ["Xi_0_quantized" if inplace else "Y_quantized"], engine="DNNLOWP", device_option=dc[1], Y_zero_point=Y_zero_point, Y_scale=Y_scale, ) dequantize = core.CreateOperator( "Int8Dequantize", ["Xi_0_quantized" if inplace else "Y_quantized"], ["Y_nhwc"], engine="DNNLOWP", device_option=dc[1], ) sw2nchw = core.CreateOperator( "NHWC2NCHW", ["Y_nhwc"], ["Y_out"], device_option=dc[1] ) net.op.extend([sum, dequantize, sw2nchw]) workspace.RunNetOnce(net) Y_out = workspace.FetchBlob("Y_out") MSE = np.square(np.subtract(Y, Y_out)).mean() if MSE > 0.005: print(Y.flatten()) print(Y_out.flatten()) print(np.max(np.abs(Y_out - Y))) print("MSE", MSE) self.assertTrue(False) workspace.SwitchWorkspace(old_ws_name) if __name__ == "__main__": unittest.main()