|
|
from typing import Optional, Tuple, Union |
|
|
|
|
|
import librosa |
|
|
import numpy as np |
|
|
import torch |
|
|
from packaging.version import parse as V |
|
|
from torch_complex.tensor import ComplexTensor |
|
|
from typeguard import check_argument_types |
|
|
|
|
|
from ..utils.complex_utils import is_complex |
|
|
from ..utils.inversible_interface import InversibleInterface |
|
|
from ..utils.nets_utils import make_pad_mask |
|
|
|
|
|
is_torch_1_10_plus = V(torch.__version__) >= V("1.10.0") |
|
|
|
|
|
|
|
|
is_torch_1_9_plus = V(torch.__version__) >= V("1.9.0") |
|
|
|
|
|
|
|
|
is_torch_1_7_plus = V(torch.__version__) >= V("1.7") |
|
|
|
|
|
|
|
|
class Stft(torch.nn.Module, InversibleInterface): |
|
|
def __init__( |
|
|
self, |
|
|
n_fft: int = 512, |
|
|
win_length: int = None, |
|
|
hop_length: int = 128, |
|
|
window: Optional[str] = "hann", |
|
|
center: bool = True, |
|
|
normalized: bool = False, |
|
|
onesided: bool = True, |
|
|
): |
|
|
assert check_argument_types() |
|
|
super().__init__() |
|
|
self.n_fft = n_fft |
|
|
if win_length is None: |
|
|
self.win_length = n_fft |
|
|
else: |
|
|
self.win_length = win_length |
|
|
self.hop_length = hop_length |
|
|
self.center = center |
|
|
self.normalized = normalized |
|
|
self.onesided = onesided |
|
|
if window is not None and not hasattr(torch, f"{window}_window"): |
|
|
raise ValueError(f"{window} window is not implemented") |
|
|
self.window = window |
|
|
|
|
|
def extra_repr(self): |
|
|
return ( |
|
|
f"n_fft={self.n_fft}, " |
|
|
f"win_length={self.win_length}, " |
|
|
f"hop_length={self.hop_length}, " |
|
|
f"center={self.center}, " |
|
|
f"normalized={self.normalized}, " |
|
|
f"onesided={self.onesided}" |
|
|
) |
|
|
|
|
|
def forward( |
|
|
self, input: torch.Tensor, ilens: torch.Tensor = None |
|
|
) -> Tuple[torch.Tensor, Optional[torch.Tensor]]: |
|
|
"""STFT forward function. |
|
|
|
|
|
Args: |
|
|
input: (Batch, Nsamples) or (Batch, Nsample, Channels) |
|
|
ilens: (Batch) |
|
|
Returns: |
|
|
output: (Batch, Frames, Freq, 2) or (Batch, Frames, Channels, Freq, 2) |
|
|
|
|
|
""" |
|
|
bs = input.size(0) |
|
|
if input.dim() == 3: |
|
|
multi_channel = True |
|
|
|
|
|
input = input.transpose(1, 2).reshape(-1, input.size(1)) |
|
|
else: |
|
|
multi_channel = False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if self.window is not None: |
|
|
window_func = getattr(torch, f"{self.window}_window") |
|
|
window = window_func( |
|
|
self.win_length, dtype=input.dtype, device=input.device |
|
|
) |
|
|
else: |
|
|
window = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if is_torch_1_10_plus or input.is_cuda or torch.backends.mkl.is_available(): |
|
|
stft_kwargs = dict( |
|
|
n_fft=self.n_fft, |
|
|
win_length=self.win_length, |
|
|
hop_length=self.hop_length, |
|
|
center=self.center, |
|
|
window=window, |
|
|
normalized=self.normalized, |
|
|
onesided=self.onesided, |
|
|
) |
|
|
if is_torch_1_7_plus: |
|
|
stft_kwargs["return_complex"] = False |
|
|
output = torch.stft(input, **stft_kwargs) |
|
|
else: |
|
|
if self.training: |
|
|
raise NotImplementedError( |
|
|
"stft is implemented with librosa on this device, which does not " |
|
|
"support the training mode." |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
stft_kwargs = dict( |
|
|
n_fft=self.n_fft, |
|
|
win_length=self.n_fft, |
|
|
hop_length=self.hop_length, |
|
|
center=self.center, |
|
|
window=window, |
|
|
pad_mode="reflect", |
|
|
) |
|
|
|
|
|
if window is not None: |
|
|
|
|
|
n_pad_left = (self.n_fft - window.shape[0]) // 2 |
|
|
n_pad_right = self.n_fft - window.shape[0] - n_pad_left |
|
|
stft_kwargs["window"] = torch.cat( |
|
|
[torch.zeros(n_pad_left), window, torch.zeros(n_pad_right)], 0 |
|
|
).numpy() |
|
|
else: |
|
|
win_length = ( |
|
|
self.win_length if self.win_length is not None else self.n_fft |
|
|
) |
|
|
stft_kwargs["window"] = torch.ones(win_length) |
|
|
|
|
|
output = [] |
|
|
|
|
|
for i, instance in enumerate(input): |
|
|
stft = librosa.stft(input[i].numpy(), **stft_kwargs) |
|
|
output.append(torch.tensor(np.stack([stft.real, stft.imag], -1))) |
|
|
output = torch.stack(output, 0) |
|
|
if not self.onesided: |
|
|
len_conj = self.n_fft - output.shape[1] |
|
|
conj = output[:, 1 : 1 + len_conj].flip(1) |
|
|
conj[:, :, :, -1].data *= -1 |
|
|
output = torch.cat([output, conj], 1) |
|
|
if self.normalized: |
|
|
output = output * (stft_kwargs["window"].shape[0] ** (-0.5)) |
|
|
|
|
|
|
|
|
|
|
|
output = output.transpose(1, 2) |
|
|
if multi_channel: |
|
|
|
|
|
|
|
|
output = output.view(bs, -1, output.size(1), output.size(2), 2).transpose( |
|
|
1, 2 |
|
|
) |
|
|
|
|
|
if ilens is not None: |
|
|
if self.center: |
|
|
pad = self.n_fft // 2 |
|
|
ilens = ilens + 2 * pad |
|
|
|
|
|
if is_torch_1_9_plus: |
|
|
olens = ( |
|
|
torch.div( |
|
|
ilens - self.n_fft, self.hop_length, rounding_mode="trunc" |
|
|
) |
|
|
+ 1 |
|
|
) |
|
|
else: |
|
|
olens = (ilens - self.n_fft) // self.hop_length + 1 |
|
|
output.masked_fill_(make_pad_mask(olens, output, 1), 0.0) |
|
|
else: |
|
|
olens = None |
|
|
|
|
|
return output, olens |
|
|
|
|
|
def inverse( |
|
|
self, input: Union[torch.Tensor, ComplexTensor], ilens: torch.Tensor = None |
|
|
) -> Tuple[torch.Tensor, Optional[torch.Tensor]]: |
|
|
"""Inverse STFT. |
|
|
|
|
|
Args: |
|
|
input: Tensor(batch, T, F, 2) or ComplexTensor(batch, T, F) |
|
|
ilens: (batch,) |
|
|
Returns: |
|
|
wavs: (batch, samples) |
|
|
ilens: (batch,) |
|
|
""" |
|
|
if V(torch.__version__) >= V("1.6.0"): |
|
|
istft = torch.functional.istft |
|
|
else: |
|
|
try: |
|
|
import torchaudio |
|
|
except ImportError: |
|
|
raise ImportError( |
|
|
"Please install torchaudio>=0.3.0 or use torch>=1.6.0" |
|
|
) |
|
|
|
|
|
if not hasattr(torchaudio.functional, "istft"): |
|
|
raise ImportError( |
|
|
"Please install torchaudio>=0.3.0 or use torch>=1.6.0" |
|
|
) |
|
|
istft = torchaudio.functional.istft |
|
|
|
|
|
if self.window is not None: |
|
|
window_func = getattr(torch, f"{self.window}_window") |
|
|
if is_complex(input): |
|
|
datatype = input.real.dtype |
|
|
else: |
|
|
datatype = input.dtype |
|
|
window = window_func(self.win_length, dtype=datatype, device=input.device) |
|
|
else: |
|
|
window = None |
|
|
|
|
|
if is_complex(input): |
|
|
input = torch.stack([input.real, input.imag], dim=-1) |
|
|
elif input.shape[-1] != 2: |
|
|
raise TypeError("Invalid input type") |
|
|
input = input.transpose(1, 2) |
|
|
input = torch.complex(input[:,:,:,0], input[:,:,:,1]) |
|
|
|
|
|
wavs = istft( |
|
|
input, |
|
|
n_fft=self.n_fft, |
|
|
hop_length=self.hop_length, |
|
|
win_length=self.win_length, |
|
|
window=window, |
|
|
center=self.center, |
|
|
normalized=self.normalized, |
|
|
onesided=self.onesided, |
|
|
length=ilens.max() if ilens is not None else ilens, |
|
|
) |
|
|
|
|
|
return wavs, ilens |