Respair's picture
Upload folder using huggingface_hub
b386992 verified
# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import librosa
import matplotlib.pyplot as plt
import numpy as np
import pytest
import scipy
import torch
from nemo.collections.audio.parts.utils.audio import SOUND_VELOCITY as sound_velocity
from nemo.collections.audio.parts.utils.audio import (
calculate_sdr_numpy,
convmtx_mc_numpy,
covariance_matrix,
db2mag,
estimated_coherence,
generate_approximate_noise_field,
get_segment_start,
mag2db,
pow2db,
rms,
theoretical_coherence,
toeplitz,
)
try:
import torchaudio
HAVE_TORCHAUDIO = True
except ModuleNotFoundError:
HAVE_TORCHAUDIO = False
class TestGenerateApproximateNoiseField:
@pytest.mark.unit
@pytest.mark.parametrize('num_mics', [5])
@pytest.mark.parametrize('mic_spacing', [0.05])
@pytest.mark.parametrize('fft_length', [512, 2048])
@pytest.mark.parametrize('sample_rate', [8000, 16000])
@pytest.mark.parametrize('field', ['spherical'])
def test_theoretical_coherence_matrix(
self, num_mics: int, mic_spacing: float, fft_length: int, sample_rate: float, field: str
):
"""Test calculation of a theoretical coherence matrix."""
# test setup
max_diff_tol = 1e-9
# golden reference: spherical coherence
num_subbands = fft_length // 2 + 1
angular_freq = 2 * np.pi * sample_rate * np.arange(0, num_subbands) / fft_length
golden_coherence = np.zeros((num_subbands, num_mics, num_mics))
for p in range(num_mics):
for q in range(num_mics):
if p == q:
golden_coherence[:, p, q] = 1.0
else:
if field == 'spherical':
dist_pq = abs(p - q) * mic_spacing
sinc_arg = angular_freq * dist_pq / sound_velocity
golden_coherence[:, p, q] = np.sinc(sinc_arg / np.pi)
else:
raise NotImplementedError(f'Field {field} not supported.')
# assume linear arrray
mic_positions = np.zeros((num_mics, 3))
mic_positions[:, 0] = mic_spacing * np.arange(num_mics)
# UUT
uut_coherence = theoretical_coherence(
mic_positions, sample_rate=sample_rate, fft_length=fft_length, field='spherical'
)
# Check difference
max_diff = np.max(np.abs(uut_coherence - golden_coherence))
assert max_diff < max_diff_tol
@pytest.mark.unit
@pytest.mark.parametrize('num_mics', [5])
@pytest.mark.parametrize('mic_spacing', [0.10])
@pytest.mark.parametrize('fft_length', [256, 512])
@pytest.mark.parametrize('sample_rate', [8000, 16000])
@pytest.mark.parametrize('field', ['spherical'])
def test_generate_approximate_noise_field(
self,
num_mics: int,
mic_spacing: float,
fft_length: int,
sample_rate: float,
field: str,
save_figures: bool = False,
):
"""Test approximate noise field with white noise as the input noise."""
duration_in_sec = 20
relative_mse_tol_dB = -30
relative_mse_tol = 10 ** (relative_mse_tol_dB / 10)
num_samples = sample_rate * duration_in_sec
noise_signal = np.random.rand(num_samples, num_mics)
# random channel-wise power scaling
noise_signal *= np.random.randn(num_mics)
# assume linear arrray
mic_positions = np.zeros((num_mics, 3))
mic_positions[:, 0] = mic_spacing * np.arange(num_mics)
# UUT
noise_field = generate_approximate_noise_field(
mic_positions, noise_signal, sample_rate=sample_rate, field=field, fft_length=fft_length
)
# Compare the estimated coherence with the theoretical coherence
# reference
golden_coherence = theoretical_coherence(
mic_positions, sample_rate=sample_rate, field=field, fft_length=fft_length
)
# estimated
N = librosa.stft(noise_field.transpose(), n_fft=fft_length)
# (channel, subband, frame) -> (subband, frame, channel)
N = N.transpose(1, 2, 0)
uut_coherence = estimated_coherence(N)
# Check difference
relative_mse_real = np.mean((uut_coherence.real - golden_coherence) ** 2)
assert relative_mse_real < relative_mse_tol
relative_mse_imag = np.mean((uut_coherence.imag) ** 2)
assert relative_mse_imag < relative_mse_tol
if save_figures:
# For debugging and visualization template
figure_dir = os.path.expanduser('~/_coherence')
if not os.path.exists(figure_dir):
os.mkdir(figure_dir)
freq = librosa.fft_frequencies(sr=sample_rate, n_fft=fft_length)
freq = freq / 1e3 # kHz
plt.figure(figsize=(7, 10))
for n in range(1, num_mics):
plt.subplot(num_mics - 1, 2, 2 * n - 1)
plt.plot(freq, golden_coherence[:, 0, n].real, label='golden')
plt.plot(freq, uut_coherence[:, 0, n].real, label='estimated')
plt.title(f'Real(coherence), p=0, q={n}')
plt.xlabel('f / kHz')
plt.grid()
plt.legend(loc='upper right')
plt.subplot(num_mics - 1, 2, 2 * n)
plt.plot(golden_coherence[:, 0, n].imag, label='golden')
plt.plot(uut_coherence[:, 0, n].imag, label='estimated')
plt.title(f'Imag(coherence), p=0, q={n}')
plt.xlabel('f / kHz')
plt.grid()
plt.legend(loc='upper right')
plt.tight_layout()
plt.savefig(
os.path.join(
figure_dir, f'num_mics_{num_mics}_sample_rate_{sample_rate}_fft_length_{fft_length}_{field}.png'
)
)
plt.close()
class TestAudioUtilsElements:
@pytest.mark.unit
def test_rms(self):
"""Test RMS calculation"""
# setup
A = np.random.rand()
omega = 100
n_points = 1000
rms_threshold = 1e-4
# prep data
t = np.linspace(0, 2 * np.pi, n_points)
x = A * np.cos(2 * np.pi * omega * t)
# test
x_rms = rms(x)
golden_rms = A / np.sqrt(2)
assert (
np.abs(x_rms - golden_rms) < rms_threshold
), f'RMS not matching for A={A}, omega={omega}, n_point={n_points}'
@pytest.mark.unit
def test_db_conversion(self):
"""Test conversions to and from dB."""
num_examples = 10
abs_threshold = 1e-6
mag = np.random.rand(num_examples)
mag_db = mag2db(mag)
assert all(np.abs(mag - 10 ** (mag_db / 20)) < abs_threshold)
assert all(np.abs(db2mag(mag_db) - 10 ** (mag_db / 20)) < abs_threshold)
assert all(np.abs(pow2db(mag**2) - mag_db) < abs_threshold)
@pytest.mark.unit
def test_get_segment_start(self):
random_seed = 42
num_examples = 50
num_samples = 2000
_rng = np.random.default_rng(seed=random_seed)
for n in range(num_examples):
# Generate signal
signal = _rng.normal(size=num_samples)
# Random start in the first half
start = _rng.integers(low=0, high=num_samples // 2)
# Random length
end = _rng.integers(low=start, high=num_samples)
# Selected segment
segment = signal[start:end]
# UUT
estimated_start = get_segment_start(signal=signal, segment=segment)
assert (
estimated_start == start
), f'Example {n}: estimated start ({estimated_start}) not matching the actual start ({start})'
@pytest.mark.unit
def test_calculate_sdr_numpy(self):
atol = 1e-6
random_seed = 42
num_examples = 50
num_samples = 2000
_rng = np.random.default_rng(seed=random_seed)
for n in range(num_examples):
# Generate signal
target = _rng.normal(size=num_samples)
# Adjust the estimate
golden_sdr = _rng.integers(low=-10, high=10)
estimate = target * (1 + 10 ** (-golden_sdr / 20))
# UUT
estimated_sdr = calculate_sdr_numpy(estimate=estimate, target=target, remove_mean=False)
assert np.isclose(
estimated_sdr, golden_sdr, atol=atol
), f'Example {n}: estimated ({estimated_sdr}) not matching the actual value ({golden_sdr})'
# Add random mean and use remove_mean=True
# SDR should not change
target += _rng.uniform(low=-10, high=10)
estimate += _rng.uniform(low=-10, high=10)
# UUT
estimated_sdr = calculate_sdr_numpy(estimate=estimate, target=target, remove_mean=True)
assert np.isclose(
estimated_sdr, golden_sdr, atol=atol
), f'Example {n}: estimated ({estimated_sdr}) not matching the actual value ({golden_sdr})'
@pytest.mark.unit
def test_calculate_sdr_numpy_scale_invariant(self):
atol = 1e-6
random_seed = 42
num_examples = 50
num_samples = 2000
_rng = np.random.default_rng(seed=random_seed)
for n in range(num_examples):
# Generate signal
target = _rng.normal(size=num_samples)
# Adjust the estimate
estimate = target + _rng.uniform(low=0.01, high=1) * _rng.normal(size=target.size)
# scaled target
target_scaled = target / (np.linalg.norm(target) + 1e-16)
target_scaled = np.sum(estimate * target_scaled) * target_scaled
golden_sdr = calculate_sdr_numpy(
estimate=estimate, target=target_scaled, scale_invariant=False, remove_mean=False
)
# UUT
estimated_sdr = calculate_sdr_numpy(
estimate=estimate, target=target, scale_invariant=True, remove_mean=False
)
assert np.isclose(
estimated_sdr, golden_sdr, atol=atol
), f'Example {n}: estimated ({estimated_sdr}) not matching the actual value ({golden_sdr})'
@pytest.mark.unit
@pytest.mark.parametrize('num_channels', [1, 3])
@pytest.mark.parametrize('filter_length', [10])
@pytest.mark.parametrize('delay', [0, 5])
def test_convmtx_mc(self, num_channels: int, filter_length: int, delay: int):
"""Test convmtx against convolve and sum.
Multiplication of convmtx_mc of input with a vectorized multi-channel filter
should match the sum of convolution of each input channel with the corresponding
filter.
"""
atol = 1e-6
random_seed = 42
num_examples = 10
num_samples = 2000
_rng = np.random.default_rng(seed=random_seed)
for n in range(num_examples):
x = _rng.normal(size=(num_samples, num_channels))
f = _rng.normal(size=(filter_length, num_channels))
CM = convmtx_mc_numpy(x=x, filter_length=filter_length, delay=delay)
# Multiply convmtx_mc with the vectorized filter
uut = CM @ f.transpose().reshape(-1, 1)
uut = uut.squeeze(1)
# Calculate reference as sum of convolutions
golden_ref = 0
for m in range(num_channels):
x_m_delayed = np.hstack([np.zeros(delay), x[:, m]])
golden_ref += np.convolve(x_m_delayed, f[:, m], mode='full')[: len(x)]
assert np.allclose(uut, golden_ref, atol=atol), f'Example {n}: UUT not matching the reference.'
@pytest.mark.unit
@pytest.mark.parametrize('num_channels', [1, 3])
@pytest.mark.parametrize('filter_length', [10])
@pytest.mark.parametrize('num_samples', [10, 100])
def test_toeplitz(self, num_channels: int, filter_length: int, num_samples: int):
"""Test construction of a Toeplitz matrix for a given signal."""
atol = 1e-6
random_seed = 42
num_batches = 10
batch_size = 8
_rng = np.random.default_rng(seed=random_seed)
for n in range(num_batches):
x = _rng.normal(size=(batch_size, num_channels, num_samples))
# Construct Toeplitz matrix
Tx = toeplitz(x=torch.tensor(x))
# Compare against the reference
for b in range(batch_size):
for m in range(num_channels):
T_ref = scipy.linalg.toeplitz(x[b, m, ...])
assert np.allclose(
Tx[b, m, ...].cpu().numpy(), T_ref, atol=atol
), f'Example {n}: not matching the reference for (b={b}, m={m}), .'
class TestCovarianceMatrix:
@pytest.mark.unit
@pytest.mark.skipif(not HAVE_TORCHAUDIO, reason="Modules in this test require torchaudio")
@pytest.mark.parametrize('num_channels', [1, 3])
@pytest.mark.parametrize('num_freq', [17, 33])
@pytest.mark.parametrize('use_mask', [True, False])
@pytest.mark.parametrize('normalize_mask', [True, False])
@pytest.mark.parametrize('mask_type', ['real', 'complex', 'bool'])
def test_calculate_covariance_matrix_vs_psd(
self, num_channels: int, num_freq: int, use_mask: bool, normalize_mask: bool, mask_type: str
):
"""Test against reference calculation using torchaudio."""
# Element-wise relative tolerance
atol = 1e-5
# Random generator
random_seed = 42
rng = torch.Generator()
rng.manual_seed(random_seed)
num_examples = 10
batch_size, num_steps = 8, 100
# Reference calculation of multichannel covariance matrix
psd_ref = torchaudio.transforms.PSD(multi_mask=False, normalize=normalize_mask, eps=1e-8)
for n in range(num_examples):
input = torch.randn(batch_size, num_channels, num_freq, num_steps, dtype=torch.cfloat, generator=rng)
if mask_type == 'real':
mask = torch.rand(batch_size, num_freq, num_steps, dtype=torch.float, generator=rng)
elif mask_type == 'complex':
mask = torch.rand(batch_size, num_freq, num_steps, dtype=torch.cfloat, generator=rng)
elif mask_type == 'bool':
mask = torch.randint(0, 2, (batch_size, num_freq, num_steps), dtype=torch.bool, generator=rng)
else:
raise ValueError(f'Mask type {mask_type} not supported.')
# UUT
uut = covariance_matrix(x=input, mask=mask if use_mask else None, normalize_mask=normalize_mask)
# Reference
ref = psd_ref(specgram=input, mask=mask if use_mask else None)
if not use_mask:
# torchaudio is summing over time, divide by num_steps to have an average over time
ref = ref / num_steps
# Check if the UUT matches the reference
assert torch.allclose(uut, ref, atol=atol), f'Example {n}: UUT not matching the reference.'
@pytest.mark.unit
@pytest.mark.parametrize('num_channels', [1, 3])
@pytest.mark.parametrize('num_freq', [3, 10])
@pytest.mark.parametrize('use_mask', [True, False])
@pytest.mark.parametrize('normalize_mask', [True, False])
@pytest.mark.parametrize('mask_type', ['real', 'complex', 'bool'])
def test_calculate_covariance_matrix(
self, num_channels: int, num_freq: int, use_mask: bool, normalize_mask: bool, mask_type: str
):
"""Test against simple reference calculation."""
# Element-wise relative tolerance
atol = 1e-5
# Random generator
random_seed = 42
rng = torch.Generator()
rng.manual_seed(random_seed)
num_examples = 10
batch_size, num_steps = 8, 10
for n in range(num_examples):
input = torch.randn(batch_size, num_channels, num_freq, num_steps, dtype=torch.cfloat, generator=rng)
if mask_type == 'real':
mask = torch.rand(batch_size, num_freq, num_steps, dtype=torch.float, generator=rng)
elif mask_type == 'complex':
mask = torch.rand(batch_size, num_freq, num_steps, dtype=torch.cfloat, generator=rng)
elif mask_type == 'bool':
mask = torch.randint(0, 2, (batch_size, num_freq, num_steps), dtype=torch.bool, generator=rng)
else:
raise ValueError(f'Mask type {mask_type} not supported.')
# UUT
uut = covariance_matrix(x=input, mask=mask if use_mask else None, normalize_mask=normalize_mask)
# Reference calculation
ref = torch.zeros(batch_size, num_freq, num_channels, num_channels, num_steps, dtype=torch.cfloat)
# calculate x(t) x(t)^H for each time step
for b in range(batch_size):
for f in range(num_freq):
for t in range(num_steps):
ref[b, f, :, :, t] = torch.outer(input[b, :, f, t], input[b, :, f, t].conj())
# aggregate over time
if use_mask:
# mask: weighted sum over time
if normalize_mask:
# normalize the mask
mask = mask / (mask.sum(dim=-1, keepdim=True) + 1e-8)
# apply the mask
ref = ref * mask[..., None, None, :]
# aggregate over time
ref = ref.sum(dim=-1)
else:
# no mask: average over time
ref = ref.mean(dim=-1)
# Check if the UUT matches the reference
assert torch.allclose(uut, ref, atol=atol), f'Example {n}: UUT not matching the reference.'
@pytest.mark.unit
@pytest.mark.parametrize('num_channels', [1, 3])
@pytest.mark.parametrize('num_freq', [17, 33])
def test_mismatch_dimensions(self, num_channels: int, num_freq: int):
"""Test that the covariance matrix is not calculated if the mask has a different number of dimensions than the input."""
batch_size, num_steps = 8, 100
# Typically-shaped inputs
input = torch.randn(batch_size, num_channels, num_freq, num_steps, dtype=torch.cfloat)
mask = torch.rand(batch_size, num_freq, num_steps, dtype=torch.float)
# Input has only (freq, time) dimensions -- missing at least one channel dimension
with pytest.raises(ValueError):
covariance_matrix(x=input[0, 0, ...])
# Mask has only (freq, time) dimensions -- missing batch dimension
with pytest.raises(ValueError):
covariance_matrix(x=input, mask=mask[0, ...])
# Mask has wrong number of time steps
with pytest.raises(ValueError):
covariance_matrix(x=input, mask=mask[..., :-1])
# Mask has wrong number of frequency bins
with pytest.raises(ValueError):
covariance_matrix(x=input, mask=mask[..., :-1, :])