Yeqing0814's picture
Upload folder using huggingface_hub
a6dd040 verified
# Copyright (c) 2020 NVIDIA CORPORATION.
# Copyright (c) 2018-2020 Chris Choy (chrischoy@ai.stanford.edu).
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
# of the Software, and to permit persons to whom the Software is furnished to do
# so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# Please cite "4D Spatio-Temporal ConvNets: Minkowski Convolutional Neural
# Networks", CVPR'19 (https://arxiv.org/abs/1904.08755) if you use any part
# of the code.
import torch
import unittest
import time
import numpy as np
import MinkowskiEngineBackend._C as _C
from MinkowskiEngine import (
SparseTensor,
MinkowskiAlgorithm,
MinkowskiConvolution,
MinkowskiConvolutionFunction,
MinkowskiConvolutionTranspose,
MinkowskiConvolutionTransposeFunction,
MinkowskiGenerativeConvolutionTranspose,
MinkowskiChannelwiseConvolution,
KernelGenerator,
)
from MinkowskiEngine.utils import batched_coordinates
from tests.python.common import data_loader, load_file
from utils.gradcheck import gradcheck
LEAK_TEST_ITER = 100000
class TestConvolution(unittest.TestCase):
def test_expansion(self):
print(f"{self.__class__.__name__}: test_expansion")
in_channels, out_channels, D = 2, 2, 2
coords, feats, labels = data_loader(in_channels)
feats = feats.double()
feats.requires_grad_()
# Initialize context
conv = MinkowskiConvolution(
in_channels,
out_channels,
kernel_size=3,
stride=2,
bias=False,
expand_coordinates=True,
dimension=D,
).double()
input = SparseTensor(
feats,
coordinates=coords,
minkowski_algorithm=MinkowskiAlgorithm.SPEED_OPTIMIZED,
)
print(input)
output = conv(input)
print(output)
if not torch.cuda.is_available():
return
input = SparseTensor(
feats,
coordinates=coords,
minkowski_algorithm=MinkowskiAlgorithm.SPEED_OPTIMIZED,
device="cuda",
)
conv = conv.to("cuda")
print(input)
output = conv(input)
print(output)
def test_kernel_map(self):
print(f"{self.__class__.__name__}: test_gpu")
if not torch.cuda.is_available():
return
in_channels, out_channels, D = 2, 2, 2
coords, feats, labels = data_loader(in_channels)
feats = feats.double()
feats.requires_grad_()
# Initialize context
conv1 = MinkowskiConvolution(
in_channels, out_channels, kernel_size=2, stride=2, bias=True, dimension=D
).double()
conv2 = MinkowskiConvolution(
in_channels, out_channels, kernel_size=3, stride=2, bias=True, dimension=D
).double()
device = torch.device("cuda")
input = SparseTensor(
feats,
coordinates=coords,
device=device,
minkowski_algorithm=MinkowskiAlgorithm.SPEED_OPTIMIZED,
)
print(input)
conv1 = conv1.to(device)
conv2 = conv2.to(device)
output = conv2(conv1(input))
print(output)
def test_gpu(self):
print(f"{self.__class__.__name__}: test_gpu")
if not torch.cuda.is_available():
return
in_channels, out_channels, D = 2, 3, 2
coords, feats, labels = data_loader(in_channels)
feats = feats.double()
feats.requires_grad_()
# Initialize context
conv = MinkowskiConvolution(
in_channels, out_channels, kernel_size=3, stride=2, bias=True, dimension=D
)
print(conv)
input = SparseTensor(feats, coordinates=coords)
conv = conv.double()
output = conv(input)
print(output)
device = torch.device("cuda")
input = SparseTensor(feats.to(device), coordinates=coords.to(device))
conv = conv.to(device)
output = conv(input)
print(output)
# Check backward
fn = MinkowskiConvolutionFunction()
grad = output.F.clone().zero_()
grad[0] = 1
output.F.backward(grad)
self.assertTrue(
gradcheck(
fn,
(
input.F,
conv.kernel,
conv.kernel_generator,
conv.convolution_mode,
input.coordinate_map_key,
None,
input.coordinate_manager,
),
)
)
def test(self):
print(f"{self.__class__.__name__}: test")
in_channels, out_channels, D = 2, 3, 2
coords, feats, labels = data_loader(in_channels)
feats = feats.double()
feats.requires_grad_()
input = SparseTensor(feats, coordinates=coords)
# Initialize context
conv = MinkowskiConvolution(
in_channels, out_channels, kernel_size=3, stride=2, bias=True, dimension=D
)
conv = conv.double()
output = conv(input)
print(output)
self.assertEqual(input.coordinate_map_key.get_tensor_stride(), [1, 1])
self.assertEqual(output.coordinate_map_key.get_tensor_stride(), [2, 2])
if torch.cuda.is_available():
input_gpu = SparseTensor(feats, coordinates=coords, device="cuda")
conv_gpu = conv.cuda()
output_gpu = conv_gpu(input_gpu)
self.assertTrue(torch.allclose(output_gpu.F.var(0).cpu(), output.F.var(0)))
self.assertTrue(
torch.allclose(output_gpu.F.mean(0).cpu(), output.F.mean(0))
)
# kernel_map = input.coords_man.kernel_map(
# 1, 2, stride=2, kernel_size=3)
# print(kernel_map)
# Check backward
fn = MinkowskiConvolutionFunction()
conv = conv.cpu()
self.assertTrue(
gradcheck(
fn,
(
input.F,
conv.kernel,
conv.kernel_generator,
conv.convolution_mode,
input.coordinate_map_key,
output.coordinate_map_key,
input.coordinate_manager,
),
)
)
for i in range(LEAK_TEST_ITER):
input = SparseTensor(feats, coordinates=coords)
conv(input).F.sum().backward()
if i % 1000 == 0:
print(i)
def test_analytic(self):
print(f"{self.__class__.__name__}: test")
in_channels, out_channels, D = 2, 2, 1
coords = torch.IntTensor([[0, 0], [0, 1], [0, 2]])
feats = torch.FloatTensor([[0, 1], [1, 0], [1, 1]])
input = SparseTensor(feats, coordinates=coords)
# Initialize context
conv = MinkowskiConvolution(
in_channels, out_channels, kernel_size=2, stride=2, bias=False, dimension=D
)
conv.kernel[:] = torch.FloatTensor([[[1, 2], [2, 1]], [[0, 1], [1, 0]]])
output = conv(input)
print(output)
conv = MinkowskiConvolution(
in_channels, out_channels, kernel_size=2, stride=1, bias=False, dimension=D
)
conv.kernel[:] = torch.FloatTensor([[[1, 2], [2, 1]], [[0, 1], [1, 0]]])
output = conv(input)
print(output)
class TestConvolutionMode(unittest.TestCase):
def test_gpu(self):
print(f"{self.__class__.__name__}: test_gpu")
if not torch.cuda.is_available():
return
in_channels, out_channels, D = 3, 2, 2
coords, feats, labels = data_loader(in_channels, batch_size=20)
feats = feats.double()
feats.requires_grad_()
device = torch.device("cuda")
conv = (
MinkowskiConvolution(
in_channels,
out_channels,
kernel_size=2,
stride=1,
bias=False,
dimension=D,
)
.to(device)
.double()
)
# Initialize context
for mode in [_C.ConvolutionMode.DIRECT_GEMM, _C.ConvolutionMode.COPY_GEMM]:
conv.convolution_mode = mode
input = SparseTensor(feats, coordinates=coords, device=device)
print(mode, input.F.numel(), len(input), input)
output = conv(input)
print(output)
# Check backward
fn = MinkowskiConvolutionFunction()
grad = output.F.clone().zero_()
grad[0] = 1
output.F.backward(grad)
self.assertTrue(
gradcheck(
fn,
(
input.F,
conv.kernel,
conv.kernel_generator,
conv.convolution_mode,
input.coordinate_map_key,
None,
input.coordinate_manager,
),
)
)
class TestConvolutionTranspose(unittest.TestCase):
def test_gpu(self):
print(f"{self.__class__.__name__}: test_gpu")
if not torch.cuda.is_available():
return
device = torch.device("cuda")
in_channels, out_channels, D = 2, 3, 2
coords, feats, labels = data_loader(in_channels)
feats = feats.double()
feats.requires_grad_()
input = SparseTensor(feats.to(device), coordinates=coords.to(device))
# Initialize context
conv = (
MinkowskiConvolution(
in_channels,
out_channels,
kernel_size=3,
stride=2,
bias=True,
dimension=D,
)
.double()
.to(device)
)
conv_tr = (
MinkowskiConvolutionTranspose(
out_channels,
in_channels,
kernel_size=3,
stride=2,
bias=True,
dimension=D,
)
.double()
.to(device)
)
tr_input = conv(input)
print(tr_input)
output = conv_tr(tr_input)
print(output)
# Check backward
fn = MinkowskiConvolutionTransposeFunction()
self.assertTrue(
gradcheck(
fn,
(
tr_input.F,
conv_tr.kernel,
conv_tr.kernel_generator,
conv_tr.convolution_mode,
tr_input.coordinate_map_key,
output.coordinate_map_key,
tr_input.coordinate_manager,
),
)
)
def test(self):
print(f"{self.__class__.__name__}: test")
in_channels, out_channels, D = 2, 3, 2
coords, feats, labels = data_loader(in_channels)
feats = feats.double()
feats.requires_grad_()
input = SparseTensor(feats, coordinates=coords)
# Initialize context
conv = MinkowskiConvolution(
in_channels, out_channels, kernel_size=3, stride=2, bias=True, dimension=D
).double()
conv_tr = MinkowskiConvolutionTranspose(
out_channels, in_channels, kernel_size=2, stride=2, bias=True, dimension=D
).double()
print("Initial input: ", input)
input = conv(input)
print("Conv output: ", input)
output = conv_tr(input)
print("Conv tr output: ", output)
# Check backward
fn = MinkowskiConvolutionTransposeFunction()
self.assertTrue(
gradcheck(
fn,
(
input.F,
conv_tr.kernel,
conv_tr.kernel_generator,
conv_tr.convolution_mode,
input.coordinate_map_key,
output.coordinate_map_key,
input.coordinate_manager,
),
)
)
def test_analytic(self):
print(f"{self.__class__.__name__}: test")
in_channels, out_channels, D = 2, 2, 2
coords = torch.IntTensor([[0, 0, 0], [0, 1, 1], [0, 2, 1]])
feats = torch.FloatTensor([[0, 1], [1, 0], [1, 1]])
input = SparseTensor(feats, coordinates=coords)
# Initialize context
conv = MinkowskiConvolution(
in_channels, out_channels, kernel_size=2, stride=2, bias=False, dimension=D
)
conv.kernel[:] = torch.FloatTensor(
[[[1, 2], [2, 1]], [[0, 1], [1, 0]], [[0, 1], [1, 1]], [[1, 1], [1, 0]]]
)
output = conv(input)
print(output)
conv_tr = MinkowskiConvolutionTranspose(
in_channels, out_channels, kernel_size=2, stride=2, bias=False, dimension=D
)
conv_tr.kernel[:] = torch.FloatTensor(
[[[1, 2], [2, 1]], [[0, 1], [1, 0]], [[0, 1], [1, 1]], [[1, 1], [1, 0]]]
)
output_tr = conv_tr(output)
print(output_tr)
def test_analytic_odd(self):
print(f"{self.__class__.__name__}: test")
in_channels, out_channels, D = 2, 2, 2
coords = torch.IntTensor([[0, 0, 0], [0, 1, 1], [0, 2, 1]])
feats = torch.FloatTensor([[0, 1], [1, 0], [1, 1]])
input = SparseTensor(feats, coordinates=coords)
# Initialize context
conv = MinkowskiConvolution(
in_channels, out_channels, kernel_size=3, stride=2, bias=False, dimension=D
)
conv.kernel[:] = torch.FloatTensor(
[
[[1, 2], [2, 1]],
[[0, 1], [1, 0]],
[[0, 1], [1, 1]],
[[1, 1], [1, 0]],
[[1, 1], [1, 0]],
[[2, 1], [1, 0.5]],
[[1, 1], [1, 0.1]],
[[1, 1], [1, 0.7]],
[[1, 0.3], [1, 0.5]],
]
)
output = conv(input)
print(output)
conv_tr = MinkowskiConvolutionTranspose(
in_channels, out_channels, kernel_size=3, stride=2, bias=False, dimension=D
)
conv_tr.kernel[:] = torch.FloatTensor(
[
[[1, 2], [2, 1]],
[[0, 1], [1, 0]],
[[0, 1], [1, 1]],
[[1, 1], [1, 0]],
[[1, 1], [1, 0]],
[[2, 1], [1, 0.5]],
[[1, 1], [1, 0.1]],
[[1, 1], [1, 0.7]],
[[1, 0.3], [1, 0.5]],
]
)
output_tr = conv_tr(output)
print(output_tr)
class TestGenerativeConvolutionTranspose(unittest.TestCase):
def test_gpu(self):
print(f"{self.__class__.__name__}: test_gpu")
if not torch.cuda.is_available():
return
device = torch.device("cuda")
in_channels, out_channels, D = 2, 3, 2
coords, feats, labels = data_loader(in_channels)
feats = feats.double()
feats.requires_grad_()
input = SparseTensor(feats.to(device), coordinates=coords.to(device))
# Initialize context
conv = (
MinkowskiConvolution(
in_channels,
out_channels,
kernel_size=3,
stride=2,
bias=True,
dimension=D,
)
.double()
.to(device)
)
conv_tr = (
MinkowskiGenerativeConvolutionTranspose(
out_channels,
in_channels,
kernel_size=3,
stride=2,
bias=True,
dimension=D,
)
.double()
.to(device)
)
tr_input = conv(input)
print(tr_input)
output = conv_tr(tr_input)
print(output)
# Check backward
fn = MinkowskiConvolutionTransposeFunction()
self.assertTrue(
gradcheck(
fn,
(
tr_input.F,
conv_tr.kernel,
conv_tr.kernel_generator,
conv_tr.convolution_mode,
tr_input.coordinate_map_key,
output.coordinate_map_key,
tr_input.coordinate_manager,
),
)
)
def test(self):
print(f"{self.__class__.__name__}: test")
in_channels, out_channels, D = 2, 3, 2
coords, feats, labels = data_loader(in_channels)
feats = feats.double()
feats.requires_grad_()
input = SparseTensor(feats, coordinates=coords)
# Initialize context
conv = MinkowskiConvolution(
in_channels, out_channels, kernel_size=3, stride=2, bias=True, dimension=D
).double()
conv_tr = MinkowskiGenerativeConvolutionTranspose(
out_channels, in_channels, kernel_size=3, stride=2, bias=True, dimension=D
).double()
print("Initial input: ", input)
input = conv(input)
print("Conv output: ", input)
output = conv_tr(input)
print("Conv tr output: ", output)
# Check backward
fn = MinkowskiConvolutionTransposeFunction()
self.assertTrue(
gradcheck(
fn,
(
input.F,
conv_tr.kernel,
conv_tr.kernel_generator,
conv_tr.convolution_mode,
input.coordinate_map_key,
output.coordinate_map_key,
input.coordinate_manager,
),
)
)
class TestChannelwiseConvolution(unittest.TestCase):
def test(self):
print(f"{self.__class__.__name__}: test")
in_channels, out_channels, D = 2, 3, 2
coords, feats, labels = data_loader(in_channels)
feats = feats.double()
feats.requires_grad_()
input = SparseTensor(feats, coordinates=coords)
# Initialize context
conv = MinkowskiChannelwiseConvolution(
in_channels, kernel_size=3, stride=2, bias=True, dimension=D
)
conv = conv.double()
output = conv(input)
print(output)
self.assertEqual(input.coordinate_map_key.get_tensor_stride(), [1, 1])
self.assertEqual(output.coordinate_map_key.get_tensor_stride(), [2, 2])
class TestPCD(unittest.TestCase):
def test_forward(self):
coords, colors, pcd = load_file("1.ply")
device = "cuda"
X = []
Y = []
W = []
for IC in [3, 8, 16, 24, 32, 48, 64, 96, 128]:
for OC in [3, 8, 16, 24, 32, 48, 64, 96, 128, 192, 256]:
for batch_size in [1, 5, 10, 15, 20]:
for voxel_size in [0.2, 0.1, 0.075, 0.05, 0.025]:
min_times = []
for mode in [
_C.ConvolutionMode.DIRECT_GEMM,
_C.ConvolutionMode.COPY_GEMM,
]:
min_time = 100000
dcoords = torch.from_numpy(
np.floor(coords / voxel_size)
).int()
bcoords = batched_coordinates(
[dcoords for i in range(batch_size)]
)
in_feats = torch.rand(len(bcoords), IC).to(0)
sinput = SparseTensor(
in_feats, coordinates=bcoords, device=device
)
conv = MinkowskiConvolution(
in_channels=IC,
out_channels=OC,
kernel_size=3,
stride=2,
convolution_mode=mode,
dimension=3,
).to(device)
soutput = conv(sinput)
loss = soutput.F.sum()
for i in range(10):
stime = time.time()
loss.backward()
min_time = min(time.time() - stime, min_time)
min_times.append(min_time)
X.append(
[
IC,
OC,
len(sinput),
len(soutput),
]
)
Y.append(np.argmin(min_times))
W.append(np.abs(min_times[0] - min_times[1]))
print(X[-1], Y[-1], W[-1])
import pickle as pkl
with open("forward-speed.pkl", "wb") as f:
pkl.dump([X, Y, W], f)
def test_backward(self):
coords, colors, pcd = load_file("1.ply")
device = "cuda"
X = []
Y = []
W = []
for IC in [8, 16, 24, 32, 48, 64, 96, 128]:
for OC in [8, 16, 24, 32, 48, 64, 96, 128, 192, 256]:
for batch_size in [1, 5, 10, 15, 20]:
for voxel_size in [0.2, 0.1, 0.075, 0.05, 0.025]:
min_times = []
for mode in [
_C.ConvolutionMode.DIRECT_GEMM,
_C.ConvolutionMode.COPY_GEMM,
]:
min_time = 100000
dcoords = torch.from_numpy(
np.floor(coords / voxel_size)
).int()
bcoords = batched_coordinates(
[dcoords for i in range(batch_size)]
)
in_feats = torch.rand(len(bcoords), IC).to(0)
sinput = SparseTensor(
in_feats, coordinates=bcoords, device=device
)
conv = MinkowskiConvolution(
in_channels=IC,
out_channels=OC,
kernel_size=3,
stride=2,
convolution_mode=mode,
dimension=3,
).to(device)
soutput = conv(sinput)
loss = soutput.F.sum()
for i in range(5):
stime = time.time()
loss.backward()
min_time = min(time.time() - stime, min_time)
min_times.append(min_time)
X.append(
[
IC,
OC,
len(sinput),
len(soutput),
]
)
Y.append(np.argmin(min_times))
W.append(np.abs(min_times[0] - min_times[1]))
print(X[-1], Y[-1], W[-1])
import pickle as pkl
with open("backward-speed.pkl", "wb") as f:
pkl.dump([X, Y, W], f)