| import os |
| import torchaudio |
| import numpy as np |
| from tqdm import tqdm |
| import torch |
| import torch.nn.functional as F |
| from scipy.signal import get_window |
| from librosa.util import pad_center, tiny, normalize |
| from librosa.filters import mel as librosa_mel_fn |
| from argparse import ArgumentParser |
|
|
| def window_sumsquare( |
| window, |
| n_frames, |
| hop_length, |
| win_length, |
| n_fft, |
| dtype=np.float32, |
| norm=None, |
| ): |
| if win_length is None: |
| win_length = n_fft |
|
|
| n = n_fft + hop_length * (n_frames - 1) |
| x = np.zeros(n, dtype=dtype) |
|
|
| |
| win_sq = get_window(window, win_length, fftbins=True) |
| win_sq = normalize(win_sq, norm=norm) ** 2 |
| win_sq = pad_center(win_sq, size=n_fft) |
|
|
| |
| for i in range(n_frames): |
| sample = i * hop_length |
| x[sample : min(n, sample + n_fft)] += win_sq[: max(0, min(n_fft, n - sample))] |
| return x |
|
|
| def dynamic_range_compression(x, normalize_fun=torch.log, C=1, clip_val=1e-5): |
| """ |
| PARAMS |
| ------ |
| C: compression factor |
| """ |
| return normalize_fun(torch.clamp(x, min=clip_val) * C) |
|
|
|
|
| def dynamic_range_decompression(x, C=1): |
| """ |
| PARAMS |
| ------ |
| C: compression factor used to compress |
| """ |
| return torch.exp(x) / C |
|
|
|
|
| class STFT(torch.nn.Module): |
| """adapted from Prem Seetharaman's https://github.com/pseeth/pytorch-stft""" |
|
|
| def __init__(self, filter_length, hop_length, win_length, window="hann"): |
| super(STFT, self).__init__() |
| self.filter_length = filter_length |
| self.hop_length = hop_length |
| self.win_length = win_length |
| self.window = window |
| self.forward_transform = None |
| scale = self.filter_length / self.hop_length |
| fourier_basis = np.fft.fft(np.eye(self.filter_length)) |
|
|
| cutoff = int((self.filter_length / 2 + 1)) |
| fourier_basis = np.vstack( |
| [np.real(fourier_basis[:cutoff, :]), np.imag(fourier_basis[:cutoff, :])] |
| ) |
|
|
| forward_basis = torch.FloatTensor(fourier_basis[:, None, :]) |
| inverse_basis = torch.FloatTensor( |
| np.linalg.pinv(scale * fourier_basis).T[:, None, :] |
| ) |
|
|
| if window is not None: |
| assert filter_length >= win_length |
| |
| fft_window = get_window(window, win_length, fftbins=True) |
| fft_window = pad_center(fft_window, size=filter_length) |
| fft_window = torch.from_numpy(fft_window).float() |
|
|
| |
| forward_basis *= fft_window |
| inverse_basis *= fft_window |
|
|
| self.register_buffer("forward_basis", forward_basis.float()) |
| self.register_buffer("inverse_basis", inverse_basis.float()) |
|
|
| def transform(self, input_data): |
| num_batches = input_data.size(0) |
| num_samples = input_data.size(1) |
|
|
| self.num_samples = num_samples |
|
|
| |
| input_data = input_data.view(num_batches, 1, num_samples) |
| input_data = F.pad( |
| input_data.unsqueeze(1), |
| (int(self.filter_length / 2), int(self.filter_length / 2), 0, 0), |
| mode="reflect", |
| ) |
| input_data = input_data.squeeze(1) |
|
|
| forward_transform = F.conv1d( |
| input_data, |
| torch.autograd.Variable(self.forward_basis, requires_grad=False), |
| stride=self.hop_length, |
| padding=0, |
| ).cpu() |
|
|
| cutoff = int((self.filter_length / 2) + 1) |
| real_part = forward_transform[:, :cutoff, :] |
| imag_part = forward_transform[:, cutoff:, :] |
|
|
| magnitude = torch.sqrt(real_part**2 + imag_part**2) |
| phase = torch.autograd.Variable(torch.atan2(imag_part.data, real_part.data)) |
|
|
| return magnitude, phase |
|
|
| def inverse(self, magnitude, phase): |
| recombine_magnitude_phase = torch.cat( |
| [magnitude * torch.cos(phase), magnitude * torch.sin(phase)], dim=1 |
| ) |
|
|
| inverse_transform = F.conv_transpose1d( |
| recombine_magnitude_phase, |
| torch.autograd.Variable(self.inverse_basis, requires_grad=False), |
| stride=self.hop_length, |
| padding=0, |
| ) |
|
|
| if self.window is not None: |
| window_sum = window_sumsquare( |
| self.window, |
| magnitude.size(-1), |
| hop_length=self.hop_length, |
| win_length=self.win_length, |
| n_fft=self.filter_length, |
| dtype=np.float32, |
| ) |
| |
| approx_nonzero_indices = torch.from_numpy( |
| np.where(window_sum > tiny(window_sum))[0] |
| ) |
| window_sum = torch.autograd.Variable( |
| torch.from_numpy(window_sum), requires_grad=False |
| ) |
| window_sum = window_sum |
| inverse_transform[:, :, approx_nonzero_indices] /= window_sum[ |
| approx_nonzero_indices |
| ] |
|
|
| |
| inverse_transform *= float(self.filter_length) / self.hop_length |
|
|
| inverse_transform = inverse_transform[:, :, int(self.filter_length / 2) :] |
| inverse_transform = inverse_transform[:, :, : -int(self.filter_length / 2) :] |
|
|
| return inverse_transform |
|
|
| def forward(self, input_data): |
| self.magnitude, self.phase = self.transform(input_data) |
| reconstruction = self.inverse(self.magnitude, self.phase) |
| return reconstruction |
|
|
|
|
| class TacotronSTFT(torch.nn.Module): |
| def __init__( |
| self, |
| filter_length, |
| hop_length, |
| win_length, |
| n_mel_channels, |
| sampling_rate, |
| mel_fmin, |
| mel_fmax, |
| ): |
| super(TacotronSTFT, self).__init__() |
| self.n_mel_channels = n_mel_channels |
| self.sampling_rate = sampling_rate |
| self.stft_fn = STFT(filter_length, hop_length, win_length) |
| mel_basis = librosa_mel_fn( |
| sr=sampling_rate, n_fft=filter_length, n_mels=n_mel_channels, fmin=mel_fmin, fmax=mel_fmax |
| ) |
| mel_basis = torch.from_numpy(mel_basis).float() |
| self.register_buffer("mel_basis", mel_basis) |
|
|
| def spectral_normalize(self, magnitudes, normalize_fun): |
| output = dynamic_range_compression(magnitudes, normalize_fun) |
| return output |
|
|
| def spectral_de_normalize(self, magnitudes): |
| output = dynamic_range_decompression(magnitudes) |
| return output |
|
|
| def mel_spectrogram(self, y, normalize_fun=torch.log): |
| assert torch.min(y.data) >= -1, torch.min(y.data) |
| assert torch.max(y.data) <= 1, torch.max(y.data) |
|
|
| magnitudes, phases = self.stft_fn.transform(y) |
| magnitudes = magnitudes.data |
| mel_output = torch.matmul(self.mel_basis, magnitudes) |
| mel_output = self.spectral_normalize(mel_output, normalize_fun) |
| energy = torch.norm(magnitudes, dim=1) |
|
|
| log_magnitudes = self.spectral_normalize(magnitudes, normalize_fun) |
|
|
| return mel_output, log_magnitudes, energy |
|
|
| def get_mel_from_wav(audio, _stft): |
| audio = torch.clip(torch.FloatTensor(audio).unsqueeze(0), -1, 1) |
| audio = torch.autograd.Variable(audio, requires_grad=False) |
| melspec, log_magnitudes_stft, energy = _stft.mel_spectrogram(audio) |
| melspec = torch.squeeze(melspec, 0).numpy().astype(np.float32) |
| log_magnitudes_stft = ( |
| torch.squeeze(log_magnitudes_stft, 0).numpy().astype(np.float32) |
| ) |
| energy = torch.squeeze(energy, 0).numpy().astype(np.float32) |
| return melspec, log_magnitudes_stft, energy |
|
|
| def _pad_spec(fbank, target_length=1024): |
| n_frames = fbank.shape[0] |
| p = target_length - n_frames |
| |
| if p > 0: |
| m = torch.nn.ZeroPad2d((0, 0, 0, p)) |
| fbank = m(fbank) |
| elif p < 0: |
| fbank = fbank[0:target_length, :] |
|
|
| if fbank.size(-1) % 2 != 0: |
| fbank = fbank[..., :-1] |
|
|
| return fbank |
|
|
| def pad_wav(waveform, segment_length): |
| waveform_length = waveform.shape[-1] |
| assert waveform_length > 100, "Waveform is too short, %s" % waveform_length |
| if segment_length is None or waveform_length == segment_length: |
| return waveform |
| elif waveform_length > segment_length: |
| return waveform[:segment_length] |
| elif waveform_length < segment_length: |
| temp_wav = np.zeros((1, segment_length)) |
| temp_wav[:, :waveform_length] = waveform |
| return temp_wav |
|
|
| def normalize_wav(waveform): |
| waveform = waveform - np.mean(waveform) |
| waveform = waveform / (np.max(np.abs(waveform)) + 1e-8) |
| return waveform * 0.5 |
|
|
| def read_wav_file(filename, segment_length): |
| waveform, sr = torchaudio.load(filename) |
| waveform = torchaudio.functional.resample(waveform, orig_freq=sr, new_freq=16000) |
| waveform = waveform.numpy()[0, ...] |
| waveform = normalize_wav(waveform) |
| waveform = waveform[None, ...] |
| waveform = pad_wav(waveform, segment_length) |
| |
| waveform = waveform / np.max(np.abs(waveform)) |
| waveform = 0.5 * waveform |
| |
| return waveform |
|
|
| def wav_to_fbank(filename, target_length=1024, fn_STFT=None): |
| assert fn_STFT is not None |
|
|
| |
| waveform = read_wav_file(filename, target_length * 160) |
|
|
| waveform = waveform[0, ...] |
| waveform = torch.FloatTensor(waveform) |
|
|
| fbank, log_magnitudes_stft, energy = get_mel_from_wav(waveform, fn_STFT) |
|
|
| fbank = torch.FloatTensor(fbank.T) |
| log_magnitudes_stft = torch.FloatTensor(log_magnitudes_stft.T) |
|
|
| fbank, log_magnitudes_stft = _pad_spec(fbank, target_length), _pad_spec( |
| log_magnitudes_stft, target_length |
| ) |
|
|
| return fbank, log_magnitudes_stft, waveform |
|
|
| def main(): |
| parser = ArgumentParser(description="Inference script parameters") |
| parser.add_argument("--wav_folder_path", type=str, default="./input_wavs", required=True, help="Path to the input video folder") |
| parser.add_argument("--save_folder_path", type=str, default="./output", help="Folder to save output files") |
|
|
| args = parser.parse_args() |
|
|
| os.makedirs(os.path.join(args.save_folder_path, "melspec"), exist_ok=True) |
|
|
| |
| filter_length = 1024 |
| hop_length = 160 |
| win_length = 1024 |
| n_mel_channels = 64 |
| sampling_rate = 16000 |
| mel_fmin = 0 |
| mel_fmax = 8000 |
| duration = 10 |
|
|
| fn_STFT = TacotronSTFT( |
| filter_length, |
| hop_length, |
| win_length, |
| n_mel_channels, |
| sampling_rate, |
| mel_fmin, |
| mel_fmax, |
| ) |
|
|
| for filename in tqdm(os.listdir(args.wav_folder_path)): |
| if filename.endswith('.wav'): |
| original_audio_file_path = os.path.join(args.wav_folder_path, filename) |
| mel, _, _ = wav_to_fbank( |
| original_audio_file_path, target_length=int(duration * 102.4), fn_STFT=fn_STFT |
| ) |
| output_filename = os.path.splitext(filename)[0] + '.npy' |
| output_path = os.path.join(args.save_folder_path, "melspec", output_filename) |
| np.save(output_path, mel.numpy()) |
|
|
| print("========================================FINISH MELSPEC EXTRACTION===========================================") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|