| |
|
| |
|
| |
|
| |
|
| |
|
| | import unittest |
| | import hypothesis.strategies as st |
| | from hypothesis import given, settings |
| | import numpy as np |
| | from caffe2.proto import caffe2_pb2 |
| | from caffe2.python import core, workspace |
| | from caffe2.python.transformations import optimizeForMKLDNN |
| | import caffe2.python.hypothesis_test_util as hu |
| | import caffe2.python.ideep_test_util as mu |
| |
|
| |
|
| | @unittest.skipIf(not workspace.C.use_mkldnn, "No MKLDNN support.") |
| | class ConvTest(hu.HypothesisTestCase): |
| | @given(stride=st.integers(1, 3), |
| | pad=st.integers(0, 3), |
| | kernel=st.integers(3, 5), |
| | size=st.integers(8, 10), |
| | input_channels=st.integers(1, 3), |
| | output_channels=st.integers(1, 5), |
| | batch_size=st.integers(1, 3), |
| | use_bias=st.booleans(), |
| | training_mode=st.booleans(), |
| | group=st.integers(1, 2), |
| | **mu.gcs) |
| | @settings(max_examples=10, deadline=None) |
| | def test_convolution(self, stride, pad, kernel, size, |
| | input_channels, output_channels, |
| | batch_size, use_bias, training_mode, group, gc, dc): |
| | training = 1 if training_mode else 0 |
| | op = core.CreateOperator( |
| | "Conv", |
| | ["X", "w", "b"] if use_bias else ["X", "w"], |
| | ["Y"], |
| | stride=stride, |
| | pad=pad, |
| | kernel=kernel, |
| | group=group, |
| | training_mode=training, |
| | ) |
| | X = np.random.rand( |
| | batch_size, input_channels * group, size, size).astype(np.float32) - 0.5 |
| | w = np.random.rand(output_channels * group, input_channels, kernel, kernel) \ |
| | .astype(np.float32) - 0.5 |
| | b = np.random.rand(output_channels * group).astype(np.float32) - 0.5 |
| |
|
| | inputs = [X, w, b] if use_bias else [X, w] |
| | self.assertDeviceChecks(dc, op, inputs, [0]) |
| |
|
| | if training_mode: |
| | for i in range(len(inputs)): |
| | self.assertGradientChecks(gc, op, inputs, i, [0], threshold=0.01) |
| |
|
| | @settings(max_examples=10, deadline=None) |
| | @given(stride=st.integers(1, 3), |
| | pad=st.integers(0, 3), |
| | size=st.integers(8, 10), |
| | input_channels=st.integers(16, 32), |
| | output_channels=st.integers(16, 32), |
| | batch_size=st.integers(1, 3), |
| | use_bias=st.booleans(), |
| | training_mode=st.booleans(), |
| | **mu.gcs) |
| | def test_winograd_convolution(self, stride, pad, size, |
| | input_channels, output_channels, |
| | batch_size, use_bias, training_mode, gc, dc): |
| | training = 1 if training_mode else 0 |
| | conv3x3_winograd_algorithm = 1 |
| | kernel = 3 |
| | op = core.CreateOperator( |
| | "Conv", |
| | ["X", "w", "b"] if use_bias else ["X", "w"], |
| | ["Y"], |
| | stride=stride, |
| | pad=pad, |
| | kernel=kernel, |
| | training_mode=training, |
| | algorithm=conv3x3_winograd_algorithm |
| | ) |
| | X = np.random.rand( |
| | batch_size, input_channels, size, size).astype(np.float32) - 0.5 |
| | w = np.random.rand( |
| | output_channels, input_channels, kernel, kernel) \ |
| | .astype(np.float32) - 0.5 |
| | b = np.random.rand(output_channels).astype(np.float32) - 0.5 |
| |
|
| | inputs = [X, w, b] if use_bias else [X, w] |
| | self.assertDeviceChecks(dc, op, inputs, [0]) |
| |
|
| | if training_mode: |
| | for i in range(len(inputs)): |
| | self.assertGradientChecks(gc, op, inputs, i, [0], threshold=0.01) |
| |
|
| | @given(batch_size=st.integers(1, 3), **mu.gcs) |
| | def test_depthwise_convolution(self, batch_size, gc, dc): |
| | op = core.CreateOperator( |
| | "Conv", |
| | ["X", "w", "b"], |
| | ["Y"], |
| | stride=1, |
| | pad=0, |
| | kernel=1, |
| | group=4, |
| | device_option=dc[0] |
| | ) |
| | op1 = core.CreateOperator( |
| | "Conv", |
| | ["X", "w", "b"], |
| | ["Y"], |
| | stride=1, |
| | pad=0, |
| | kernel=1, |
| | group=4, |
| | device_option=dc[1] |
| | ) |
| | X = np.random.rand(batch_size, 544, 14, 14).astype(np.float32) |
| | w = np.random.rand(544, 136, 1, 1).astype(np.float32) |
| | b = np.random.rand(544).astype(np.float32) |
| |
|
| | workspace.SwitchWorkspace("_device_check_", True) |
| | workspace.FeedBlob('X', X, dc[0]) |
| | workspace.FeedBlob('w', w, dc[0]) |
| | workspace.FeedBlob('b', b, dc[0]) |
| | workspace.RunOperatorOnce(op) |
| | Y0 = workspace.FetchBlob('Y') |
| |
|
| | workspace.ResetWorkspace() |
| | workspace.FeedBlob('X', X, dc[1]) |
| | workspace.FeedBlob('w', w, dc[1]) |
| | workspace.FeedBlob('b', b, dc[1]) |
| | net = core.Net("net") |
| | old_net = caffe2_pb2.NetDef() |
| | old_net.op.extend([op1]) |
| | net.Proto().CopyFrom(old_net) |
| | optimizeForMKLDNN(net) |
| | workspace.RunOperatorOnce(net.Proto().op[0]) |
| | Y1 = workspace.FetchBlob('Y') |
| |
|
| | if not np.allclose(Y0, Y1, atol=0.01, rtol=0.01): |
| | print(Y1.flatten()) |
| | print(Y0.flatten()) |
| | print(np.max(np.abs(Y1 - Y0))) |
| | self.assertTrue(False) |
| |
|
| | workspace.ResetWorkspace() |
| | workspace.FeedBlob('X', X, dc[1]) |
| | workspace.FeedBlob('w', w, dc[1]) |
| | workspace.FeedBlob('b', b, dc[1]) |
| | workspace.RunOperatorOnce(op1) |
| | Y2 = workspace.FetchBlob('Y') |
| |
|
| | if not np.allclose(Y0, Y2, atol=0.01, rtol=0.01): |
| | print(Y2.flatten()) |
| | print(Y0.flatten()) |
| | print(np.max(np.abs(Y2 - Y0))) |
| | self.assertTrue(False) |
| |
|
| |
|
| |
|
| | if __name__ == "__main__": |
| | unittest.main() |
| |
|