|
|
from typing import * |
|
|
import torch |
|
|
import torch.nn as nn |
|
|
from .. import SparseTensor |
|
|
|
|
|
|
|
|
class SparseSpatial2Channel(nn.Module): |
|
|
""" |
|
|
Downsample a sparse tensor by a factor of `factor`. |
|
|
Implemented as rearranging its features from spatial to channel. |
|
|
""" |
|
|
def __init__(self, factor: int = 2): |
|
|
super(SparseSpatial2Channel, self).__init__() |
|
|
self.factor = factor |
|
|
|
|
|
def forward(self, x: SparseTensor) -> SparseTensor: |
|
|
DIM = x.coords.shape[-1] - 1 |
|
|
cache = x.get_spatial_cache(f'spatial2channel_{self.factor}') |
|
|
if cache is None: |
|
|
coord = list(x.coords.unbind(dim=-1)) |
|
|
for i in range(DIM): |
|
|
coord[i+1] = coord[i+1] // self.factor |
|
|
subidx = x.coords[:, 1:] % self.factor |
|
|
subidx = sum([subidx[..., i] * self.factor ** i for i in range(DIM)]) |
|
|
|
|
|
MAX = [(s + self.factor - 1) // self.factor for s in x.spatial_shape] |
|
|
OFFSET = torch.cumprod(torch.tensor(MAX[::-1]), 0).tolist()[::-1] + [1] |
|
|
code = sum([c * o for c, o in zip(coord, OFFSET)]) |
|
|
code, idx = code.unique(return_inverse=True) |
|
|
|
|
|
new_coords = torch.stack( |
|
|
[code // OFFSET[0]] + |
|
|
[(code // OFFSET[i+1]) % MAX[i] for i in range(DIM)], |
|
|
dim=-1 |
|
|
) |
|
|
else: |
|
|
new_coords, idx, subidx = cache |
|
|
|
|
|
new_feats = torch.zeros(new_coords.shape[0] * self.factor ** DIM, x.feats.shape[1], device=x.feats.device, dtype=x.feats.dtype) |
|
|
new_feats[idx * self.factor ** DIM + subidx] = x.feats |
|
|
|
|
|
out = SparseTensor(new_feats.reshape(new_coords.shape[0], -1), new_coords, None if x._shape is None else torch.Size([x._shape[0], x._shape[1] * self.factor ** DIM])) |
|
|
out._scale = tuple([s * self.factor for s in x._scale]) |
|
|
out._spatial_cache = x._spatial_cache |
|
|
|
|
|
if cache is None: |
|
|
x.register_spatial_cache(f'spatial2channel_{self.factor}', (new_coords, idx, subidx)) |
|
|
out.register_spatial_cache(f'channel2spatial_{self.factor}', (x.coords, idx, subidx)) |
|
|
out.register_spatial_cache(f'shape', torch.Size(MAX)) |
|
|
if self.training: |
|
|
subdivision = torch.zeros((new_coords.shape[0], self.factor ** DIM), device=x.device, dtype=torch.bool) |
|
|
subdivision[idx, subidx] = True |
|
|
out.register_spatial_cache(f'subdivision', subdivision) |
|
|
|
|
|
return out |
|
|
|
|
|
|
|
|
class SparseChannel2Spatial(nn.Module): |
|
|
""" |
|
|
Upsample a sparse tensor by a factor of `factor`. |
|
|
Implemented as rearranging its features from channel to spatial. |
|
|
""" |
|
|
def __init__(self, factor: int = 2): |
|
|
super(SparseChannel2Spatial, self).__init__() |
|
|
self.factor = factor |
|
|
|
|
|
def forward(self, x: SparseTensor, subdivision: Optional[SparseTensor] = None) -> SparseTensor: |
|
|
DIM = x.coords.shape[-1] - 1 |
|
|
|
|
|
cache = x.get_spatial_cache(f'channel2spatial_{self.factor}') |
|
|
if cache is None: |
|
|
if subdivision is None: |
|
|
raise ValueError('Cache not found. Provide subdivision tensor or pair SparseChannel2Spatial with SparseSpatial2Channel.') |
|
|
else: |
|
|
sub = subdivision.feats |
|
|
N_leaf = sub.sum(dim=-1) |
|
|
subidx = sub.nonzero()[:, -1] |
|
|
new_coords = x.coords.clone().detach() |
|
|
new_coords[:, 1:] *= self.factor |
|
|
new_coords = torch.repeat_interleave(new_coords, N_leaf, dim=0, output_size=subidx.shape[0]) |
|
|
for i in range(DIM): |
|
|
new_coords[:, i+1] += subidx // self.factor ** i % self.factor |
|
|
idx = torch.repeat_interleave(torch.arange(x.coords.shape[0], device=x.device), N_leaf, dim=0, output_size=subidx.shape[0]) |
|
|
else: |
|
|
new_coords, idx, subidx = cache |
|
|
|
|
|
x_feats = x.feats.reshape(x.feats.shape[0] * self.factor ** DIM, -1) |
|
|
new_feats = x_feats[idx * self.factor ** DIM + subidx] |
|
|
out = SparseTensor(new_feats, new_coords, None if x._shape is None else torch.Size([x._shape[0], x._shape[1] // self.factor ** DIM])) |
|
|
out._scale = tuple([s / self.factor for s in x._scale]) |
|
|
if cache is not None: |
|
|
out._spatial_cache = x._spatial_cache |
|
|
return out |
|
|
|