S23DR-P2R / model /pointnet_util.py
colin1842's picture
Upload 28 files
5a60eac verified
import torch
from torch.autograd import Variable
from torch.autograd import Function
import torch.nn as nn
from typing import Tuple
# import pc_util
import sys
import os
os.environ['LD_LIBRARY_PATH'] = '/usr/local/cuda/lib64:/usr/local/cuda/extras/CUPTI/lib64:' + os.environ.get('LD_LIBRARY_PATH', '')
sys.path.append('pc_util-1.0-py3.10-linux-x86_64.egg')
import pc_util
# class FurthestPointSampling(Function):
# @staticmethod
# def forward(ctx, xyz: torch.Tensor, npoint: int, wd: float = 1.0, wf: float = 0.0) -> torch.Tensor:
# """
# Uses iterative furthest point sampling to select a set of npoint features that have the largest
# minimum distance
# :param ctx:
# :param xyz: (B, N, C) where N > npoint
# :param npoint: int, number of features in the sampled set
# :param wd: float, weight of xyz distance
# :param wf: float, weight of fea distance
# :return:
# output: (B, npoint) tensor containing the set
# """
# xyz = xyz.contiguous()
# B, N, C = xyz.size()
# output = torch.cuda.IntTensor(B, npoint)
# temp = torch.cuda.FloatTensor(B, N).fill_(1e10)
# pc_util.furthest_point_sampling_wrapper(B, C, N, npoint, wd, wf, xyz, temp, output)
# # pc_util.furthest_point_sampling_wrapper(B, N, npoint, xyz, temp, output)
# ctx.mark_non_differentiable(output)
# return output
# @staticmethod
# def backward(ctx, grad_out):
# return ()
class FurthestPointSampling(Function):
@staticmethod
def forward(ctx, xyz: torch.Tensor, npoint: int, wd: float = 1.0, wf: float = 0.0) -> torch.Tensor:
"""
Uses iterative furthest point sampling to select a set of npoint features that have the largest
minimum distance.
:param ctx:
:param xyz: (B, N, C) where N > npoint
:param npoint: int, number of features in the sampled set
:param wd: float, weight of xyz distance
:param wf: float, weight of fea distance
:return:
output: (B, npoint) tensor containing the set
"""
if not torch.cuda.is_available():
raise RuntimeError("CUDA is not available, and no CPU fallback is implemented.")
xyz = xyz.contiguous()
B, N, C = xyz.size()
device = torch.device('cuda')
output = torch.zeros(B, npoint, dtype=torch.int32, device=device)
temp = torch.full((B, N), 1e10, dtype=torch.float32, device=device)
pc_util.furthest_point_sampling_wrapper(B, C, N, npoint, wd, wf, xyz, temp, output)
ctx.mark_non_differentiable(output)
return output
@staticmethod
def backward(ctx, grad_out):
return ()
furthest_point_sample = FurthestPointSampling.apply
# class GatherOperation(Function):
# @staticmethod
# def forward(ctx, features: torch.Tensor, idx: torch.Tensor) -> torch.Tensor:
# """
# :param ctx:
# :param features: (B, C, N)
# :param idx: (B, npoint) index tensor of the features to gather
# :return:
# output: (B, C, npoint)
# """
# features = features.contiguous()
# idx = idx.contiguous()
# B, npoint = idx.size()
# _, C, N = features.size()
# output = torch.cuda.FloatTensor(B, C, npoint)
# pc_util.gather_points_wrapper(B, C, N, npoint, features, idx, output)
# ctx.save_for_backwards = (idx, features)
# return output
# @staticmethod
# def backward(ctx, grad_out):
# idx, features = ctx.saved_tensors
# B, npoint = idx.size()
# _, C, N = features.size()
# grad_features = Variable(torch.cuda.FloatTensor(B, C, N).zero_())
# grad_out_data = grad_out.data.contiguous()
# pc_util.gather_points_grad_wrapper(B, C, N, npoint, grad_out_data, idx, grad_features.data)
# return grad_features, None
class GatherOperation(Function):
@staticmethod
def forward(ctx, features: torch.Tensor, idx: torch.Tensor) -> torch.Tensor:
"""
:param ctx:
:param features: (B, C, N)
:param idx: (B, npoint) index tensor of the features to gather
:return:
output: (B, C, npoint)
"""
if not torch.cuda.is_available():
raise RuntimeError("CUDA is not available, and no CPU fallback is implemented.")
features = features.contiguous()
idx = idx.contiguous()
B, npoint = idx.size()
_, C, N = features.size()
device = torch.device('cuda')
output = torch.zeros(B, C, npoint, dtype=torch.float32, device=device)
pc_util.gather_points_wrapper(B, C, N, npoint, features, idx, output)
ctx.save_for_backward(idx, features)
return output
@staticmethod
def backward(ctx, grad_out):
idx, features = ctx.saved_tensors
B, npoint = idx.size()
_, C, N = features.size()
device = torch.device('cuda')
grad_features = torch.zeros(B, C, N, dtype=torch.float32, device=device)
grad_out_data = grad_out.contiguous()
pc_util.gather_points_grad_wrapper(B, C, N, npoint, grad_out_data, idx, grad_features)
return grad_features, None
gather_operation = GatherOperation.apply
class ThreeNN(Function):
@staticmethod
def forward(ctx, unknown: torch.Tensor, known: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
"""
Find the three nearest neighbors of unknown in known
:param ctx:
:param unknown: (B, N, 3)
:param known: (B, M, 3)
:return:
dist: (B, N, 3) l2 distance to the three nearest neighbors
idx: (B, N, 3) index of 3 nearest neighbors
"""
unknown = unknown.contiguous()
known = known.contiguous()
B, N, _ = unknown.size()
m = known.size(1)
dist2 = torch.cuda.FloatTensor(B, N, 3)
idx = torch.cuda.IntTensor(B, N, 3)
pc_util.three_nn_wrapper(B, N, m, unknown, known, dist2, idx)
return torch.sqrt(dist2), idx
@staticmethod
def backward(ctx, a=None, b=None):
return ()
three_nn = ThreeNN.apply
class ThreeInterpolate(Function):
@staticmethod
def forward(ctx, features: torch.Tensor, idx: torch.Tensor, weight: torch.Tensor) -> torch.Tensor:
"""
Performs weight linear interpolation on 3 features
:param ctx:
:param features: (B, C, M) Features descriptors to be interpolated from
:param idx: (B, n, 3) three nearest neighbors of the target features in features
:param weight: (B, n, 3) weights
:return:
output: (B, C, N) tensor of the interpolated features
"""
features = features.contiguous()
idx = idx.contiguous()
weight = weight.contiguous()
B, c, m = features.size()
n = idx.size(1)
ctx.save_for_backward(idx, weight, features)
output = torch.cuda.FloatTensor(B, c, n)
pc_util.three_interpolate_wrapper(B, c, m, n, features, idx, weight, output)
return output
@staticmethod
def backward(ctx, grad_out: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
"""
:param ctx:
:param grad_out: (B, C, N) tensor with gradients of outputs
:return:
grad_features: (B, C, M) tensor with gradients of features
None:
None:
"""
idx, weight, features = ctx.saved_tensors
m = features.size(2)
B, c, n = grad_out.size()
grad_features = Variable(torch.cuda.FloatTensor(B, c, m).zero_())
grad_out_data = grad_out.data.contiguous()
pc_util.three_interpolate_grad_wrapper(B, c, n, m, grad_out_data, idx, weight, grad_features.data)
return grad_features, torch.zeros_like(idx), torch.zeros_like(weight)
three_interpolate = ThreeInterpolate.apply
# class GroupingOperation(Function):
# @staticmethod
# def forward(ctx, features: torch.Tensor, idx: torch.Tensor) -> torch.Tensor:
# """
# :param ctx:
# :param features: (B, C, N) tensor of features to group
# :param idx: (B, npoint, nsample) tensor containing the indicies of features to group with
# :return:
# output: (B, C, npoint, nsample) tensor
# """
# features = features.contiguous()
# idx = idx.contiguous()
# B, nfeatures, nsample = idx.size()
# _, C, N = features.size()
# output = torch.cuda.FloatTensor(B, C, nfeatures, nsample)
# pc_util.group_points_wrapper(B, C, N, nfeatures, nsample, features, idx, output)
# ctx.save_for_backward(idx, features)
# return output
# @staticmethod
# def backward(ctx, grad_out: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
# """
# :param ctx:
# :param grad_out: (B, C, npoint, nsample) tensor of the gradients of the output from forward
# :return:
# grad_features: (B, C, N) gradient of the features
# """
# idx, features = ctx.saved_tensors
# N = features.size(2)
# B, C, npoint, nsample = grad_out.size()
# grad_features = Variable(torch.cuda.FloatTensor(B, C, N).zero_())
# grad_out_data = grad_out.data.contiguous()
# pc_util.group_points_grad_wrapper(B, C, N, npoint, nsample, grad_out_data, idx, grad_features.data)
# return grad_features, torch.zeros_like(idx)
class GroupingOperation(Function):
@staticmethod
def forward(ctx, features: torch.Tensor, idx: torch.Tensor) -> torch.Tensor:
"""
:param ctx:
:param features: (B, C, N) tensor of features to group
:param idx: (B, npoint, nsample) tensor containing the indices of features to group with
:return:
output: (B, C, npoint, nsample) tensor
"""
if not torch.cuda.is_available():
raise RuntimeError("CUDA is not available, and no CPU fallback is implemented.")
features = features.contiguous()
idx = idx.contiguous()
B, npoint, nsample = idx.size()
_, C, N = features.size()
device = torch.device('cuda')
output = torch.zeros(B, C, npoint, nsample, dtype=torch.float32, device=device)
pc_util.group_points_wrapper(B, C, N, npoint, nsample, features, idx, output)
ctx.save_for_backward(idx, features)
return output
@staticmethod
def backward(ctx, grad_out: torch.Tensor) -> torch.Tensor:
"""
:param ctx:
:param grad_out: (B, C, npoint, nsample) tensor of the gradients of the output from forward
:return:
grad_features: (B, C, N) gradient of the features
"""
if not torch.cuda.is_available():
raise RuntimeError("CUDA is not available, and no CPU fallback is implemented.")
idx, features = ctx.saved_tensors
B, C, N = features.size()
_, _, npoint, nsample = grad_out.size()
device = torch.device('cuda')
grad_features = torch.zeros(B, C, N, dtype=torch.float32, device=device)
grad_out_data = grad_out.contiguous()
pc_util.group_points_grad_wrapper(B, C, N, npoint, nsample, grad_out_data, idx, grad_features)
return grad_features, torch.zeros_like(idx)
grouping_operation = GroupingOperation.apply
# class BallQuery(Function):
# @staticmethod
# def forward(ctx, radius: float, nsample: int, xyz: torch.Tensor, new_xyz: torch.Tensor) -> torch.Tensor:
# """
# :param ctx:
# :param radius: float, radius of the balls
# :param nsample: int, maximum number of features in the balls
# :param xyz: (B, N, 3) xyz coordinates of the features
# :param new_xyz: (B, npoint, 3) centers of the ball query
# :return:
# idx: (B, npoint, nsample) tensor with the indicies of the features that form the query balls
# """
# new_xyz = new_xyz.contiguous()
# xyz = xyz.contiguous()
# B, N, _ = xyz.size()
# npoint = new_xyz.size(1)
# idx = torch.cuda.IntTensor(B, npoint, nsample).zero_()
# pc_util.ball_query_wrapper(B, N, npoint, radius, nsample, new_xyz, xyz, idx)
# ctx.mark_non_differentiable(idx)
# return idx
# @staticmethod
# def backward(ctx, grad_out):
# return ()
class BallQuery(Function):
@staticmethod
def forward(ctx, radius: float, nsample: int, xyz: torch.Tensor, new_xyz: torch.Tensor) -> torch.Tensor:
"""
:param ctx:
:param radius: float, radius of the balls
:param nsample: int, maximum number of features in the balls
:param xyz: (B, N, 3) xyz coordinates of the features
:param new_xyz: (B, npoint, 3) centers of the ball query
:return:
idx: (B, npoint, nsample) tensor with the indices of the features that form the query balls
"""
if not torch.cuda.is_available():
raise RuntimeError("CUDA is not available, and no CPU fallback is implemented.")
new_xyz = new_xyz.contiguous()
xyz = xyz.contiguous()
B, N, _ = xyz.size()
npoint = new_xyz.size(1)
device = torch.device('cuda')
idx = torch.zeros(B, npoint, nsample, dtype=torch.int32, device=device)
pc_util.ball_query_wrapper(B, N, npoint, radius, nsample, new_xyz, xyz, idx)
ctx.mark_non_differentiable(idx)
return idx
@staticmethod
def backward(ctx, grad_out):
return ()
ball_query = BallQuery.apply
# class BallCenterQuery(Function):
# @staticmethod
# def forward(ctx, radius: float, point: torch.Tensor, key_point: torch.Tensor) -> torch.Tensor:
# """
# :param ctx:
# :param radius: float, radius of the balls
# :param point: (B, N, 3) xyz coordinates of the features
# :param key_point: (B, npoint, 3) centers of the ball query
# :return:
# idx: (B, N) tensor with the indicies of the features that form the query balls
# """
# point = point.contiguous()
# key_point = key_point.contiguous()
# B, N, _ = point.size()
# npoint = key_point.size(1)
# idx = torch.cuda.IntTensor(B, N).zero_() - 1
# pc_util.ball_center_query_wrapper(B, N, npoint, radius, point, key_point, idx)
# ctx.mark_non_differentiable(idx)
# return idx
# @staticmethod
# def backward(ctx, grad_out):
# return ()
class BallCenterQuery(Function):
@staticmethod
def forward(ctx, radius: float, point: torch.Tensor, key_point: torch.Tensor) -> torch.Tensor:
"""
:param ctx:
:param radius: float, radius of the balls
:param point: (B, N, 3) xyz coordinates of the features
:param key_point: (B, npoint, 3) centers of the ball query
:return:
idx: (B, N) tensor with the indices of the features that form the query balls
"""
if not torch.cuda.is_available():
raise RuntimeError("CUDA is not available, and no CPU fallback is implemented.")
point = point.contiguous()
key_point = key_point.contiguous()
B, N, _ = point.size()
npoint = key_point.size(1)
device = torch.device('cuda')
idx = torch.full((B, N), -1, dtype=torch.int32, device=device)
pc_util.ball_center_query_wrapper(B, N, npoint, radius, point, key_point, idx)
ctx.mark_non_differentiable(idx)
return idx
@staticmethod
def backward(ctx, grad_out):
return ()
ball_center_query = BallCenterQuery.apply
import numpy as np
# class KNNQuery(Function):
# @staticmethod
# def forward(ctx, nsample: int, xyz: torch.Tensor, new_xyz: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
# """
# Find the three nearest neighbors of unknown in known
# :param ctx:
# :param nsample: int, number of features in knn
# :param xyz: (B, N, 3)
# :param new_xyz: (B, npoint, 3)
# :return:
# dist: (B, npoint, nsample) l2 distance to knn
# idx: (B, npoint, nsample) index of knn
# """
# new_xyz = new_xyz.contiguous()
# xyz = xyz.contiguous()
# B, N, _ = xyz.size()
# npoint = new_xyz.size(1)
# dist2 = torch.cuda.FloatTensor(np.ones([B, npoint, nsample]) * 1e4)
# idx = torch.cuda.IntTensor(B, npoint, nsample)
# pc_util.knn_query_wrapper(B, N, npoint, nsample, new_xyz, xyz, dist2, idx)
# ctx.mark_non_differentiable(dist2, idx)
# return torch.sqrt(dist2), idx
# @staticmethod
# def backward(ctx, grad_out):
# return ()
class KNNQuery(Function):
@staticmethod
def forward(ctx, nsample: int, xyz: torch.Tensor, new_xyz: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
"""
Find the k nearest neighbors of unknown in known
:param ctx:
:param nsample: int, number of features in knn
:param xyz: (B, N, 3)
:param new_xyz: (B, npoint, 3)
:return:
dist: (B, npoint, nsample) l2 distance to knn
idx: (B, npoint, nsample) index of knn
"""
if not torch.cuda.is_available():
raise RuntimeError("CUDA is not available, and no CPU fallback is implemented.")
new_xyz = new_xyz.contiguous()
xyz = xyz.contiguous()
B, N, _ = xyz.size()
npoint = new_xyz.size(1)
device = torch.device('cuda')
dist2 = torch.full((B, npoint, nsample), 1e4, dtype=torch.float32, device=device)
idx = torch.zeros((B, npoint, nsample), dtype=torch.int32, device=device)
pc_util.knn_query_wrapper(B, N, npoint, nsample, new_xyz, xyz, dist2, idx)
ctx.mark_non_differentiable(dist2, idx)
return torch.sqrt(dist2), idx
@staticmethod
def backward(ctx, grad_out):
return ()
knn_query = KNNQuery.apply