| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | import concurrent |
| | import multiprocessing |
| | import os |
| | import shutil |
| | import warnings |
| | from collections import defaultdict |
| | from typing import Dict, Iterable, List, Optional, Tuple, Union |
| |
|
| | import h5py |
| | import librosa |
| | import matplotlib.pyplot as plt |
| | import numpy as np |
| | import soundfile as sf |
| | import torch |
| | from numpy.random import default_rng |
| | from omegaconf import DictConfig, OmegaConf |
| | from scipy.signal import convolve |
| | from scipy.signal.windows import cosine, hamming, hann |
| | from scipy.spatial.transform import Rotation |
| | from scipy.stats import beta, gamma |
| | from tqdm import tqdm |
| |
|
| | from nemo.collections.asr.parts.preprocessing.segment import AudioSegment |
| | from nemo.collections.asr.parts.utils.audio_utils import db2mag, mag2db, pow2db, rms |
| | from nemo.collections.asr.parts.utils.manifest_utils import ( |
| | create_manifest, |
| | create_segment_manifest, |
| | read_manifest, |
| | write_ctm, |
| | write_manifest, |
| | write_text, |
| | ) |
| | from nemo.collections.asr.parts.utils.speaker_utils import ( |
| | get_overlap_range, |
| | is_overlap, |
| | labels_to_rttmfile, |
| | merge_float_intervals, |
| | ) |
| | from nemo.utils import logging |
| |
|
| | try: |
| | import pyroomacoustics as pra |
| | from pyroomacoustics.directivities import CardioidFamily, DirectionVector, DirectivityPattern |
| |
|
| | PRA = True |
| | except ImportError: |
| | PRA = False |
| | try: |
| | from gpuRIR import att2t_SabineEstimator, beta_SabineEstimation, simulateRIR, t2n |
| |
|
| | GPURIR = True |
| | except ImportError: |
| | GPURIR = False |
| |
|
| |
|
| | def clamp_min_list(target_list: List[float], min_val: float) -> List[float]: |
| | """ |
| | Clamp numbers in the given list with `min_val`. |
| | Args: |
| | target_list (list): |
| | List containing floating point numbers |
| | min_val (float): |
| | Desired minimum value to clamp the numbers in `target_list` |
| | |
| | Returns: |
| | (list) List containing clamped numbers |
| | """ |
| | return [max(x, min_val) for x in target_list] |
| |
|
| |
|
| | def clamp_max_list(target_list: List[float], max_val: float) -> List[float]: |
| | """ |
| | Clamp numbers in the given list with `max_val`. |
| | Args: |
| | target_list (list): |
| | List containing floating point numbers |
| | min_val (float): |
| | Desired maximum value to clamp the numbers in `target_list` |
| | |
| | Returns: |
| | (list) List containing clamped numbers |
| | """ |
| | return [min(x, max_val) for x in target_list] |
| |
|
| |
|
| | class MultiSpeakerSimulator(object): |
| | """ |
| | Multispeaker Audio Session Simulator - Simulates multispeaker audio sessions using single-speaker audio files and |
| | corresponding word alignments. |
| | |
| | Change Log: |
| | v1.0: Dec 2022 |
| | - First working verison, supports multispeaker simulation with overlaps, silence and RIR |
| | v1.1: Feb 2023 |
| | - Multi-GPU support for speed up |
| | - Faster random sampling routine |
| | - Fixed sentence duration bug |
| | - Silence and overlap length sampling algorithms are updated to guarantee `mean_silence` approximation |
| | |
| | Args: |
| | cfg: OmegaConf configuration loaded from yaml file. |
| | |
| | Parameters: |
| | manifest_filepath (str): Manifest file with paths to single speaker audio files |
| | sr (int): Sampling rate of the input audio files from the manifest |
| | random_seed (int): Seed to random number generator |
| | |
| | session_config: |
| | num_speakers (int): Number of unique speakers per multispeaker audio session |
| | num_sessions (int): Number of sessions to simulate |
| | session_length (int): Length of each simulated multispeaker audio session (seconds). Short sessions |
| | (e.g. ~240 seconds) tend to fall short of the expected overlap-ratio and silence-ratio. |
| | |
| | session_params: |
| | sentence_length_params (list): k,p values for a negative_binomial distribution which is sampled to get the |
| | sentence length (in number of words) |
| | dominance_var (float): Variance in speaker dominance (where each speaker's dominance is sampled from a normal |
| | distribution centered on 1/`num_speakers`, and then the dominance values are together |
| | normalized to 1) |
| | min_dominance (float): Minimum percentage of speaking time per speaker (note that this can cause the dominance of |
| | the other speakers to be slightly reduced) |
| | turn_prob (float): Probability of switching speakers after each utterance |
| | mean_overlap (float): Mean proportion of overlap in the overall speaking time (overlap lengths are sampled from |
| | half normal distribution) |
| | mean_silence (float): Mean proportion of silence to speaking time in the audio session. Should be in range [0, 1). |
| | mean_silence_var (float): Variance for mean silence in all audio sessions. |
| | This value should be 0 <= mean_silence_var < mean_silence * (1 - mean_silence). |
| | per_silence_var (float): Variance for each silence in an audio session, set large values (e.g., 20) for de-correlation. |
| | per_silence_min (float): Minimum duration for each silence, default to 0. |
| | per_silence_max (float): Maximum duration for each silence, default to -1 for no maximum. |
| | mean_overlap (float): Mean proportion of overlap in the overall non-silence duration. Should be in range [0, 1) and |
| | recommend [0, 0.15] range for accurate results. |
| | mean_overlap_var (float): Variance for mean overlap in all audio sessions. |
| | This value should be 0 <= mean_overlap_var < mean_overlap * (1 - mean_overlap). |
| | per_overlap_var (float): Variance for per overlap in each session, set large values to de-correlate silence lengths |
| | with the latest speech segment lengths |
| | per_overlap_min (float): Minimum per overlap duration in seconds |
| | per_overlap_max (float): Maximum per overlap duration in seconds, set -1 for no maximum |
| | start_window (bool): Whether to window the start of sentences to smooth the audio signal (and remove silence at |
| | the start of the clip) |
| | window_type (str): Type of windowing used when segmenting utterances ("hamming", "hann", "cosine") |
| | window_size (float): Length of window at the start or the end of segmented utterance (seconds) |
| | start_buffer (float): Buffer of silence before the start of the sentence (to avoid cutting off speech or starting |
| | abruptly) |
| | split_buffer (float): Split RTTM labels if greater than twice this amount of silence (to avoid long gaps between |
| | utterances as being labelled as speech) |
| | release_buffer (float): Buffer before window at end of sentence (to avoid cutting off speech or ending abruptly) |
| | normalize (bool): Normalize speaker volumes |
| | normalization_type (str): Normalizing speakers ("equal" - same volume per speaker, "var" - variable volume per |
| | speaker) |
| | normalization_var (str): Variance in speaker volume (sample from standard deviation centered at 1) |
| | min_volume (float): Minimum speaker volume (only used when variable normalization is used) |
| | max_volume (float): Maximum speaker volume (only used when variable normalization is used) |
| | end_buffer (float): Buffer at the end of the session to leave blank |
| | |
| | outputs: |
| | output_dir (str): Output directory for audio sessions and corresponding label files |
| | output_filename (str): Output filename for the wav and RTTM files |
| | overwrite_output (bool): If true, delete the output directory if it exists |
| | output_precision (int): Number of decimal places in output files |
| | |
| | background_noise: |
| | add_bg (bool): Add ambient background noise if true |
| | background_manifest (str): Path to background noise manifest file |
| | snr (int): SNR for background noise (using average speaker power) |
| | |
| | speaker_enforcement: |
| | enforce_num_speakers (bool): Enforce that all requested speakers are present in the output wav file |
| | enforce_time (list): Percentage of the way through the audio session that enforcement mode is triggered (sampled |
| | between time 1 and 2) |
| | |
| | segment_manifest: (parameters for regenerating the segment manifest file) |
| | window (float): Window length for segmentation |
| | shift (float): Shift length for segmentation |
| | step_count (int): Number of the unit segments you want to create per utterance |
| | deci (int): Rounding decimals for segment manifest file |
| | """ |
| |
|
| | def __init__(self, cfg): |
| | self._params = cfg |
| | |
| | self._manifest = read_manifest(self._params.data_simulator.manifest_filepath) |
| | self._speaker_samples = self._build_speaker_samples_map() |
| | self._noise_samples = [] |
| | self._sentence = None |
| | self._text = "" |
| | self._words = [] |
| | self._alignments = [] |
| | self._merged_speech_intervals = [] |
| | |
| | self._furthest_sample = [0 for n in range(self._params.data_simulator.session_config.num_speakers)] |
| | |
| | self._missing_overlap = 0 |
| | |
| | self.base_manifest_filepath = None |
| | self.segment_manifest_filepath = None |
| | self._turn_prob_min = self._params.data_simulator.session_params.get("turn_prob_min", 0.5) |
| | |
| | self._volume = None |
| | self._device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") |
| |
|
| | self._audio_read_buffer_dict = {} |
| | self._noise_read_buffer_dict = {} |
| |
|
| | self.running_speech_len_samples = 0 |
| | self.running_silence_len_samples = 0 |
| | self.running_overlap_len_samples = 0 |
| |
|
| | self.sess_silence_mean = 0 |
| | self.per_silence_min_len = 0 |
| | self.per_silence_max_len = 0 |
| |
|
| | self.sess_overlap_mean = 0 |
| | self.per_overlap_min_len = 0 |
| | self.per_overlap_max_len = 0 |
| | self.add_missing_overlap = self._params.data_simulator.session_params.get("add_missing_overlap", False) |
| |
|
| | self._check_args() |
| |
|
| | def _check_args(self): |
| | """ |
| | Checks YAML arguments to ensure they are within valid ranges. |
| | """ |
| | if self._params.data_simulator.session_config.num_speakers < 1: |
| | raise Exception("At least one speaker is required for making audio sessions (num_speakers < 1)") |
| | if ( |
| | self._params.data_simulator.session_params.turn_prob < 0 |
| | or self._params.data_simulator.session_params.turn_prob > 1 |
| | ): |
| | raise Exception("Turn probability is outside of [0,1]") |
| | elif ( |
| | self._params.data_simulator.session_params.turn_prob < self._turn_prob_min |
| | and self._params.data_simulator.speaker_enforcement.enforce_num_speakers == True |
| | ): |
| | logging.warning( |
| | "Turn probability is less than {self._turn_prob_min} while enforce_num_speakers=True, which may result in excessive session lengths. Forcing turn_prob to 0.5." |
| | ) |
| | self._params.data_simulator.session_params.turn_prob = self._turn_prob_min |
| |
|
| | if self._params.data_simulator.session_params.sentence_length_params[0] <= 0: |
| | raise Exception( |
| | "k (number of success until the exp. ends) in Sentence length parameter value must be a positive number" |
| | ) |
| |
|
| | if not (0 < self._params.data_simulator.session_params.sentence_length_params[1] <= 1): |
| | raise Exception("p (success probability) value in sentence length parameter must be in range (0,1]") |
| |
|
| | if ( |
| | self._params.data_simulator.session_params.mean_overlap < 0 |
| | or self._params.data_simulator.session_params.mean_overlap > 1 |
| | ): |
| | raise Exception("Mean overlap is outside of [0,1]") |
| | if ( |
| | self._params.data_simulator.session_params.mean_silence < 0 |
| | or self._params.data_simulator.session_params.mean_silence > 1 |
| | ): |
| | raise Exception("Mean silence is outside of [0,1]") |
| | if self._params.data_simulator.session_params.mean_silence_var < 0: |
| | raise Exception("Mean silence variance is not below 0") |
| | if ( |
| | self._params.data_simulator.session_params.mean_silence > 0 |
| | and self._params.data_simulator.session_params.mean_silence_var |
| | >= self._params.data_simulator.session_params.mean_silence |
| | * (1 - self._params.data_simulator.session_params.mean_silence) |
| | ): |
| | raise Exception("Mean silence variance should be lower than mean_silence * (1-mean_silence)") |
| | if self._params.data_simulator.session_params.per_silence_var < 0: |
| | raise Exception("Per silence variance is below 0") |
| |
|
| | if self._params.data_simulator.session_params.mean_overlap_var < 0: |
| | raise Exception("Mean overlap variance is not larger than 0") |
| | if ( |
| | self._params.data_simulator.session_params.mean_overlap > 0 |
| | and self._params.data_simulator.session_params.mean_overlap_var |
| | >= self._params.data_simulator.session_params.mean_overlap |
| | * (1 - self._params.data_simulator.session_params.mean_overlap) |
| | ): |
| | raise Exception("Mean overlap variance should be lower than mean_overlap * (1-mean_overlap)") |
| | if self._params.data_simulator.session_params.per_overlap_var < 0: |
| | raise Exception("Per overlap variance is not larger than 0") |
| |
|
| | if ( |
| | self._params.data_simulator.session_params.min_dominance < 0 |
| | or self._params.data_simulator.session_params.min_dominance > 1 |
| | ): |
| | raise Exception("Minimum dominance is outside of [0,1]") |
| | if ( |
| | self._params.data_simulator.speaker_enforcement.enforce_time[0] < 0 |
| | or self._params.data_simulator.speaker_enforcement.enforce_time[0] > 1 |
| | ): |
| | raise Exception("Speaker enforcement start is outside of [0,1]") |
| | if ( |
| | self._params.data_simulator.speaker_enforcement.enforce_time[1] < 0 |
| | or self._params.data_simulator.speaker_enforcement.enforce_time[1] > 1 |
| | ): |
| | raise Exception("Speaker enforcement end is outside of [0,1]") |
| |
|
| | if ( |
| | self._params.data_simulator.session_params.min_dominance |
| | * self._params.data_simulator.session_config.num_speakers |
| | > 1 |
| | ): |
| | raise Exception("Number of speakers times minimum dominance is greater than 1") |
| |
|
| | if ( |
| | self._params.data_simulator.session_params.window_type not in ['hamming', 'hann', 'cosine'] |
| | and self._params.data_simulator.session_params.window_type is not None |
| | ): |
| | raise Exception("Incorrect window type provided") |
| |
|
| | if len(self._manifest) == 0: |
| | raise Exception("Manifest file is empty. Check that the source path is correct.") |
| |
|
| | def clean_up(self): |
| | self._sentence = None |
| | self._words = [] |
| | self._alignments = [] |
| | self._audio_read_buffer_dict = {} |
| | self._noise_read_buffer_dict = {} |
| | torch.cuda.empty_cache() |
| |
|
| | def _get_speaker_ids(self) -> List[str]: |
| | """ |
| | Randomly select speaker IDs from the loaded manifest file. |
| | |
| | Returns: |
| | speaker_ids (list): List of speaker IDs |
| | """ |
| | all_speaker_ids = list(self._speaker_samples.keys()) |
| | idx_list = np.random.permutation(len(all_speaker_ids))[ |
| | : self._params.data_simulator.session_config.num_speakers |
| | ] |
| | speaker_ids = [all_speaker_ids[i] for i in idx_list] |
| | return speaker_ids |
| |
|
| | def _build_speaker_samples_map(self) -> Dict: |
| | """ |
| | Build a dictionary for mapping speaker ID to their list of samples |
| | |
| | Returns: |
| | speaker_samples (Dict[list]): |
| | Dictionary mapping speaker ID to their list of samples |
| | """ |
| | speaker_samples = defaultdict(list) |
| | logging.info("Building speaker to samples map...") |
| | for sample in tqdm(self._manifest, total=len(self._manifest)): |
| | speaker_id = sample['speaker_id'] |
| | speaker_samples[speaker_id].append(sample) |
| | return speaker_samples |
| |
|
| | def _sample_noise_manifest(self, noise_manifest) -> list: |
| | """ |
| | Sample noise manifest to a specified count `num_noise_files` for the current simulated audio session. |
| | |
| | Args: |
| | noise_manifest (list): |
| | List of noise source samples to be sampled from. |
| | |
| | Returns: |
| | sampled_noise_manifest (list): |
| | List of noise samples to be used for the current session. |
| | """ |
| | num_noise_files = min(len(noise_manifest), self._params.data_simulator.background_noise.num_noise_files) |
| | sampled_noise_manifest = [] |
| | if num_noise_files > 0: |
| | selected_noise_ids = np.random.choice(range(len(noise_manifest)), num_noise_files, replace=False) |
| | for k in selected_noise_ids: |
| | sampled_noise_manifest.append(noise_manifest[k]) |
| | return sampled_noise_manifest |
| |
|
| | def _read_noise_manifest(self): |
| | """ |
| | Read the noise manifest file and sample the noise manifest. |
| | |
| | Returns: |
| | noise_manifest (list): List of the entire noise source samples. |
| | """ |
| | noise_manifest = [] |
| | if self._params.data_simulator.background_noise.add_bg is True: |
| | if self._params.data_simulator.background_noise.background_manifest is not None: |
| | if os.path.exists(self._params.data_simulator.background_noise.background_manifest): |
| | noise_manifest = read_manifest(self._params.data_simulator.background_noise.background_manifest) |
| | else: |
| | raise FileNotFoundError( |
| | f"Noise manifest file: {self._params.data_simulator.background_noise.background_manifest} file not found." |
| | ) |
| | else: |
| | raise FileNotFoundError( |
| | f"Noise manifest file is null. Please provide a valid noise manifest file if add_bg=True." |
| | ) |
| | return noise_manifest |
| |
|
| | def _get_speaker_samples(self, speaker_ids: List[str]) -> Dict[str, list]: |
| | """ |
| | Get a list of the samples for each of the specified speakers. |
| | |
| | Args: |
| | speaker_ids (list): LibriSpeech speaker IDs for each speaker in the current session. |
| | |
| | Returns: |
| | speaker_wav_align_map (dict): Dictionary containing speaker IDs and their corresponding wav filepath and alignments. |
| | """ |
| | speaker_wav_align_map = defaultdict(list) |
| | for sid in speaker_ids: |
| | speaker_wav_align_map[sid] = self._speaker_samples[sid] |
| | return speaker_wav_align_map |
| |
|
| | def _load_speaker_sample( |
| | self, speaker_wav_align_map: List[dict], speaker_ids: List[str], speaker_turn: int |
| | ) -> str: |
| | """ |
| | Load a sample for the selected speaker ID. |
| | The first alignment and word must be silence that determines the start of the alignments. |
| | |
| | Args: |
| | speaker_wav_align_map (dict): Dictionary containing speaker IDs and their corresponding wav filepath and alignments. |
| | speaker_ids (list): LibriSpeech speaker IDs for each speaker in the current session. |
| | speaker_turn (int): Current speaker turn. |
| | |
| | Returns: |
| | file_path (str): Path to the desired audio file |
| | """ |
| | speaker_id = speaker_ids[speaker_turn] |
| | file_id = np.random.randint(0, max(len(speaker_wav_align_map[str(speaker_id)]) - 1, 1)) |
| | file_dict = speaker_wav_align_map[str(speaker_id)][file_id] |
| |
|
| | |
| | if file_dict['words'][0] != "": |
| | file_dict['words'].insert(0, "") |
| | file_dict['alignments'].insert(0, 1 / (10 ** self._params.data_simulator.outputs.output_precision)) |
| |
|
| | return file_dict |
| |
|
| | def _get_speaker_dominance(self) -> List[float]: |
| | """ |
| | Get the dominance value for each speaker, accounting for the dominance variance and |
| | the minimum per-speaker dominance. |
| | |
| | Returns: |
| | dominance (list): Per-speaker dominance |
| | """ |
| | dominance_mean = 1.0 / self._params.data_simulator.session_config.num_speakers |
| | dominance = np.random.normal( |
| | loc=dominance_mean, |
| | scale=self._params.data_simulator.session_params.dominance_var, |
| | size=self._params.data_simulator.session_config.num_speakers, |
| | ) |
| | dominance = clamp_min_list(dominance, 0) |
| | |
| | total = np.sum(dominance) |
| | if total == 0: |
| | for i in range(len(dominance)): |
| | dominance[i] += self._params.data_simulator.session_params.min_dominance |
| | |
| | dominance = (dominance / total) * ( |
| | 1 |
| | - self._params.data_simulator.session_params.min_dominance |
| | * self._params.data_simulator.session_config.num_speakers |
| | ) |
| | for i in range(len(dominance)): |
| | dominance[i] += self._params.data_simulator.session_params.min_dominance |
| | if ( |
| | i > 0 |
| | ): |
| | dominance[i] = dominance[i] + dominance[i - 1] |
| | return dominance |
| |
|
| | def _increase_speaker_dominance( |
| | self, base_speaker_dominance: List[float], factor: int |
| | ) -> Tuple[List[float], bool]: |
| | """ |
| | Increase speaker dominance for unrepresented speakers (used only in enforce mode). |
| | Increases the dominance for these speakers by the input factor (and then re-normalizes the probabilities to 1). |
| | |
| | Args: |
| | base_speaker_dominance (list): Dominance values for each speaker. |
| | factor (int): Factor to increase dominance of unrepresented speakers by. |
| | Returns: |
| | dominance (list): Per-speaker dominance |
| | enforce (bool): Whether to keep enforce mode turned on |
| | """ |
| | increase_percent = [] |
| | for i in range(self._params.data_simulator.session_config.num_speakers): |
| | if self._furthest_sample[i] == 0: |
| | increase_percent.append(i) |
| | |
| | if len(increase_percent) > 0: |
| | |
| | dominance = np.copy(base_speaker_dominance) |
| | for i in range(len(dominance) - 1, 0, -1): |
| | dominance[i] = dominance[i] - dominance[i - 1] |
| | |
| | for i in increase_percent: |
| | dominance[i] = dominance[i] * factor |
| | |
| | dominance = dominance / np.sum(dominance) |
| | for i in range(1, len(dominance)): |
| | dominance[i] = dominance[i] + dominance[i - 1] |
| | enforce = True |
| | else: |
| | dominance = base_speaker_dominance |
| | enforce = False |
| | return dominance, enforce |
| |
|
| | def _set_speaker_volume(self): |
| | """ |
| | Set the volume for each speaker (either equal volume or variable speaker volume). |
| | """ |
| | if self._params.data_simulator.session_params.normalization_type == 'equal': |
| | self._volume = np.ones(self._params.data_simulator.session_config.num_speakers) |
| | elif self._params.data_simulator.session_params.normalization_type == 'variable': |
| | self._volume = np.random.normal( |
| | loc=1.0, |
| | scale=self._params.data_simulator.session_params.normalization_var, |
| | size=self._params.data_simulator.session_config.num_speakers, |
| | ) |
| | self._volume = clamp_min_list(self._volume, self._params.data_simulator.session_params.min_volume) |
| | self._volume = clamp_max_list(self._volume, self._params.data_simulator.session_params.max_volume) |
| |
|
| | def _get_next_speaker(self, prev_speaker: int, dominance: List[float]) -> int: |
| | """ |
| | Get the next speaker (accounting for turn probability and dominance distribution). |
| | |
| | Args: |
| | prev_speaker (int): Previous speaker turn. |
| | dominance (list): Dominance values for each speaker. |
| | Returns: |
| | prev_speaker/speaker_turn (int): Speaker turn |
| | """ |
| | if self._params.data_simulator.session_config.num_speakers == 1: |
| | prev_speaker = 0 if prev_speaker is None else prev_speaker |
| | return prev_speaker |
| | else: |
| | if ( |
| | np.random.uniform(0, 1) > self._params.data_simulator.session_params.turn_prob |
| | and prev_speaker is not None |
| | ): |
| | return prev_speaker |
| | else: |
| | speaker_turn = prev_speaker |
| | while speaker_turn == prev_speaker: |
| | rand = np.random.uniform(0, 1) |
| | speaker_turn = 0 |
| | while rand > dominance[speaker_turn]: |
| | speaker_turn += 1 |
| | return speaker_turn |
| |
|
| | def _get_window(self, window_amount: int, start: bool = False): |
| | """ |
| | Get window curve to alleviate abrupt change of time-series signal when segmenting audio samples. |
| | |
| | Args: |
| | window_amount (int): Window length (in terms of number of samples). |
| | start (bool): If true, return the first half of the window. |
| | |
| | Returns: |
| | window (tensor): Half window (either first half or second half) |
| | """ |
| | if self._params.data_simulator.session_params.window_type == 'hamming': |
| | window = hamming(window_amount * 2) |
| | elif self._params.data_simulator.session_params.window_type == 'hann': |
| | window = hann(window_amount * 2) |
| | elif self._params.data_simulator.session_params.window_type == 'cosine': |
| | window = cosine(window_amount * 2) |
| | else: |
| | raise Exception("Incorrect window type provided") |
| |
|
| | window = torch.from_numpy(window).to(self._device) |
| |
|
| | |
| | if start: |
| | return window[:window_amount] |
| | else: |
| | return window[window_amount:] |
| |
|
| | def _get_start_buffer_and_window(self, first_alignment: int) -> Tuple[int, int]: |
| | """ |
| | Get the start cutoff and window length for smoothing the start of the sentence. |
| | |
| | Args: |
| | first_alignment (int): Start of the first word (in terms of number of samples). |
| | Returns: |
| | start_cutoff (int): Amount into the audio clip to start |
| | window_amount (int): Window length |
| | """ |
| | window_amount = int(self._params.data_simulator.session_params.window_size * self._params.data_simulator.sr) |
| | start_buffer = int(self._params.data_simulator.session_params.start_buffer * self._params.data_simulator.sr) |
| |
|
| | if first_alignment < start_buffer: |
| | window_amount = 0 |
| | start_cutoff = 0 |
| | elif first_alignment < start_buffer + window_amount: |
| | window_amount = first_alignment - start_buffer |
| | start_cutoff = 0 |
| | else: |
| | start_cutoff = first_alignment - start_buffer - window_amount |
| |
|
| | return start_cutoff, window_amount |
| |
|
| | def _get_end_buffer_and_window( |
| | self, current_sample_cursor: int, remaining_dur_samples: int, remaining_len_audio_file: int |
| | ) -> Tuple[int, int]: |
| | """ |
| | Get the end buffer and window length for smoothing the end of the sentence. |
| | |
| | Args: |
| | current_sample_cursor (int): Current location in the target file (in terms of number of samples). |
| | remaining_dur_samples (int): Remaining duration in the target file (in terms of number of samples). |
| | remaining_len_audio_file (int): Length remaining in audio file (in terms of number of samples). |
| | Returns: |
| | release_buffer (int): Amount after the end of the last alignment to include |
| | window_amount (int): Window length |
| | """ |
| |
|
| | window_amount = int(self._params.data_simulator.session_params.window_size * self._params.data_simulator.sr) |
| | release_buffer = int( |
| | self._params.data_simulator.session_params.release_buffer * self._params.data_simulator.sr |
| | ) |
| |
|
| | if current_sample_cursor + release_buffer > remaining_dur_samples: |
| | release_buffer = remaining_dur_samples - current_sample_cursor |
| | window_amount = 0 |
| | elif current_sample_cursor + window_amount + release_buffer > remaining_dur_samples: |
| | window_amount = remaining_dur_samples - current_sample_cursor - release_buffer |
| |
|
| | if remaining_len_audio_file < release_buffer: |
| | release_buffer = remaining_len_audio_file |
| | window_amount = 0 |
| | elif remaining_len_audio_file < release_buffer + window_amount: |
| | window_amount = remaining_len_audio_file - release_buffer |
| |
|
| | return release_buffer, window_amount |
| |
|
| | def _sample_from_silence_model(self, running_len_samples: int, session_len_samples: int) -> int: |
| | """ |
| | Sample from the silence model to determine the amount of silence to add between sentences. |
| | Gamma distribution is employed for modeling the highly skewed distribution of silence length distribution. |
| | When we add silence between sentences, we want to ensure that the proportion of silence meets the `self.sess_silence_mean`. |
| | Thus, we employ the following formula to determine the amount of silence to add: |
| | |
| | running_ratio = running_len_samples / session_len_samples |
| | silence_mean = (session_len_samples*(self.sess_silence_mean) - self.running_silence_len_samples) * running_ratio. |
| | |
| | `running_ratio` is the proportion of the created session compared to the targeted total session length. |
| | |
| | Args: |
| | running_len_samples (int): |
| | Running length of the session (in terms of number of samples). |
| | session_len_samples (int): |
| | Targeted total session length (in terms of number of samples). |
| | |
| | Returns: |
| | silence_amount (int): Amount of silence to add between sentences (in terms of number of samples). |
| | """ |
| | running_ratio = running_len_samples / session_len_samples |
| | silence_mean = ( |
| | session_len_samples * (self.sess_silence_mean) - self.running_silence_len_samples |
| | ) * running_ratio |
| | silence_mean = max(self.per_silence_min_len, min(silence_mean, self.per_silence_max_len)) |
| | if silence_mean > 0: |
| | silence_var = self._params.data_simulator.session_params.per_silence_var |
| | silence_amount = ( |
| | int(gamma(a=(silence_mean ** 2) / silence_var, scale=silence_var / silence_mean).rvs()) |
| | if silence_var > 0 |
| | else int(silence_mean) |
| | ) |
| | silence_amount = max(self.per_silence_min_len, min(silence_amount, self.per_silence_max_len)) |
| | else: |
| | silence_amount = 0 |
| |
|
| | return silence_amount |
| |
|
| | def _sample_from_overlap_model(self, non_silence_len_samples: int): |
| | """ |
| | Sample from the overlap model to determine the amount of overlap between segments. |
| | Gamma distribution is employed for modeling the highly skewed distribution of overlap length distribution. |
| | When we add an overlap occurrence, we want to meet the desired overlap ratio defined by `self.sess_overlap_mean`. |
| | Let `overlap_mean` be the desired overlap amount, then the mean and variance of the gamma distribution is given by: |
| | |
| | self.sess_overlap_mean = (overlap_mean + self.running_overlap_len_samples) / (overlap_mean + non_silence_len_samples) |
| | |
| | The above equation is setting `overlap_mean` to yield the desired overlap ratio `self.sess_overlap_mean`. |
| | We use the above `overlap_mean` value to sample overlap-length for each overlap occurrence. |
| | |
| | Args: |
| | non_silence_len_samples (int): |
| | The total amount of non-silence (speech) region regardless of overlap status |
| | |
| | Returns: |
| | desired_overlap_amount (int): |
| | Amount of overlap between segments (in terms of number of samples). |
| | """ |
| | overlap_mean = ((self.sess_overlap_mean * non_silence_len_samples) - self.running_overlap_len_samples) / ( |
| | 1 - self.sess_overlap_mean |
| | ) |
| | overlap_mean = max(self.per_overlap_min_len, min(max(0, overlap_mean), self.per_overlap_max_len)) |
| | if self.add_missing_overlap: |
| | overlap_mean += self._missing_overlap |
| |
|
| | if overlap_mean > 0: |
| | overlap_var = self._params.data_simulator.session_params.per_overlap_var |
| |
|
| | desired_overlap_amount = ( |
| | int(gamma(a=overlap_mean ** 2 / overlap_var, scale=overlap_var / overlap_mean).rvs()) |
| | if overlap_var > 0 |
| | else int(overlap_mean) |
| | ) |
| | desired_overlap_amount = max( |
| | self.per_overlap_min_len, min(desired_overlap_amount, self.per_overlap_max_len) |
| | ) |
| | else: |
| | desired_overlap_amount = 0 |
| |
|
| | return desired_overlap_amount |
| |
|
| | def _add_file( |
| | self, |
| | audio_manifest: dict, |
| | audio_file, |
| | sentence_word_count: int, |
| | max_word_count_in_sentence: int, |
| | max_samples_in_sentence: int, |
| | ) -> Tuple[int, torch.Tensor]: |
| | """ |
| | Add audio file to current sentence (up to the desired number of words). |
| | Uses the alignments to segment the audio file. |
| | |
| | Args: |
| | audio_manifest (dict): Line from manifest file for current audio file |
| | audio_file (tensor): Current loaded audio file |
| | sentence_word_count (int): Running count for number of words in sentence |
| | max_word_count_in_sentence (int): Maximum count for number of words in sentence |
| | max_samples_in_sentence (int): Maximum length for sentence in terms of samples |
| | Returns: |
| | sentence_word_count+current_word_count (int): Running word count |
| | len(self._sentence) (tensor): Current length of the audio file |
| | """ |
| | if len(audio_manifest['alignments']) <= 1: |
| | raise ValueError(f"Alignment file has inappropriate length of {len(audio_manifest['alignments'])}") |
| |
|
| | offset_idx = np.random.randint(low=1, high=len(audio_manifest['words'])) |
| |
|
| | first_alignment = int(audio_manifest['alignments'][offset_idx - 1] * self._params.data_simulator.sr) |
| | start_cutoff, start_window_amount = self._get_start_buffer_and_window(first_alignment) |
| | if not self._params.data_simulator.session_params.start_window: |
| | start_window_amount = 0 |
| |
|
| | |
| | sentence_samples = len(self._sentence) |
| |
|
| | remaining_dur_samples = max_samples_in_sentence - sentence_samples |
| | remaining_duration = max_word_count_in_sentence - sentence_word_count |
| | prev_dur_samples, dur_samples, curr_dur_samples = 0, 0, 0 |
| | current_word_count = 0 |
| | word_idx = offset_idx |
| | silence_count = 1 |
| | while ( |
| | current_word_count < remaining_duration |
| | and dur_samples < remaining_dur_samples |
| | and word_idx < len(audio_manifest['words']) |
| | ): |
| | dur_samples = int(audio_manifest['alignments'][word_idx] * self._params.data_simulator.sr) - start_cutoff |
| |
|
| | |
| | if curr_dur_samples + dur_samples > remaining_dur_samples: |
| | |
| | break |
| |
|
| | word = audio_manifest['words'][word_idx] |
| |
|
| | if silence_count > 0 and word == "": |
| | break |
| |
|
| | self._words.append(word) |
| | self._alignments.append( |
| | float(sentence_samples * 1.0 / self._params.data_simulator.sr) |
| | - float(start_cutoff * 1.0 / self._params.data_simulator.sr) |
| | + audio_manifest['alignments'][word_idx] |
| | ) |
| |
|
| | if word == "": |
| | word_idx += 1 |
| | silence_count += 1 |
| | continue |
| | elif self._text == "": |
| | self._text += word |
| | else: |
| | self._text += " " + word |
| |
|
| | word_idx += 1 |
| | current_word_count += 1 |
| | prev_dur_samples = dur_samples |
| | curr_dur_samples += dur_samples |
| |
|
| | |
| | if self._params.data_simulator.session_params.window_type is not None: |
| | if start_window_amount > 0: |
| | window = self._get_window(start_window_amount, start=True) |
| | self._sentence = self._sentence.to(self._device) |
| | self._sentence = torch.cat( |
| | ( |
| | self._sentence, |
| | torch.multiply(audio_file[start_cutoff : start_cutoff + start_window_amount], window), |
| | ), |
| | 0, |
| | ) |
| | self._sentence = torch.cat( |
| | (self._sentence, audio_file[start_cutoff + start_window_amount : start_cutoff + prev_dur_samples],), 0, |
| | ).to(self._device) |
| |
|
| | else: |
| | self._sentence = torch.cat( |
| | (self._sentence, audio_file[start_cutoff : start_cutoff + prev_dur_samples]), 0 |
| | ).to(self._device) |
| |
|
| | |
| | if ( |
| | word_idx < len(audio_manifest['words']) |
| | ) and self._params.data_simulator.session_params.window_type is not None: |
| | release_buffer, end_window_amount = self._get_end_buffer_and_window( |
| | prev_dur_samples, remaining_dur_samples, len(audio_file[start_cutoff + prev_dur_samples :]), |
| | ) |
| | self._sentence = torch.cat( |
| | ( |
| | self._sentence, |
| | audio_file[start_cutoff + prev_dur_samples : start_cutoff + prev_dur_samples + release_buffer], |
| | ), |
| | 0, |
| | ).to(self._device) |
| |
|
| | if end_window_amount > 0: |
| | window = self._get_window(end_window_amount, start=False) |
| | self._sentence = torch.cat( |
| | ( |
| | self._sentence, |
| | torch.multiply( |
| | audio_file[ |
| | start_cutoff |
| | + prev_dur_samples |
| | + release_buffer : start_cutoff |
| | + prev_dur_samples |
| | + release_buffer |
| | + end_window_amount |
| | ], |
| | window, |
| | ), |
| | ), |
| | 0, |
| | ).to(self._device) |
| |
|
| | del audio_file |
| | return sentence_word_count + current_word_count, len(self._sentence) |
| |
|
| | def _build_sentence( |
| | self, |
| | speaker_turn: int, |
| | speaker_ids: List[str], |
| | speaker_wav_align_map: Dict[str, list], |
| | max_samples_in_sentence: int, |
| | ): |
| | """ |
| | Build a new sentence by attaching utterance samples together until the sentence has reached a desired length. |
| | While generating the sentence, alignment information is used to segment the audio. |
| | |
| | Args: |
| | speaker_turn (int): Current speaker turn. |
| | speaker_ids (list): LibriSpeech speaker IDs for each speaker in the current session. |
| | speaker_wav_align_map (dict): Dictionary containing speaker IDs and their corresponding wav filepath and alignments. |
| | max_samples_in_sentence (int): Maximum length for sentence in terms of samples |
| | """ |
| | |
| | sl = ( |
| | np.random.negative_binomial( |
| | self._params.data_simulator.session_params.sentence_length_params[0], |
| | self._params.data_simulator.session_params.sentence_length_params[1], |
| | ) |
| | + 1 |
| | ) |
| |
|
| | |
| | self._sentence = torch.zeros(0, dtype=torch.float64, device=self._device) |
| | self._text = "" |
| | self._words = [] |
| | self._alignments = [] |
| | sentence_word_count = 0 |
| | sentence_samples = 0 |
| |
|
| | |
| | while sentence_word_count < sl and sentence_samples < max_samples_in_sentence: |
| | audio_manifest = self._load_speaker_sample(speaker_wav_align_map, speaker_ids, speaker_turn) |
| | if audio_manifest['audio_filepath'] in self._audio_read_buffer_dict: |
| | audio_file, sr = self._audio_read_buffer_dict[audio_manifest['audio_filepath']] |
| | else: |
| | audio_file, sr = sf.read(audio_manifest['audio_filepath']) |
| | audio_file = torch.from_numpy(audio_file).to(self._device) |
| | if audio_file.ndim > 1: |
| | audio_file = torch.mean(audio_file, 1, False).to(self._device) |
| | self._audio_read_buffer_dict[audio_manifest['audio_filepath']] = (audio_file, sr) |
| |
|
| | sentence_word_count, sentence_samples = self._add_file( |
| | audio_manifest, audio_file, sentence_word_count, sl, max_samples_in_sentence |
| | ) |
| |
|
| | |
| | splits = [] |
| | new_start = 0 |
| | for i in range(len(self._words)): |
| | if self._words[i] == "" and i != 0 and i != len(self._words) - 1: |
| | silence_length = self._alignments[i] - self._alignments[i - 1] |
| | if ( |
| | silence_length > 2 * self._params.data_simulator.session_params.split_buffer |
| | ): |
| | new_end = self._alignments[i - 1] + self._params.data_simulator.session_params.split_buffer |
| | splits.append( |
| | [ |
| | int(new_start * self._params.data_simulator.sr), |
| | int(new_end * self._params.data_simulator.sr), |
| | ] |
| | ) |
| | new_start = self._alignments[i] - self._params.data_simulator.session_params.split_buffer |
| |
|
| | splits.append([int(new_start * self._params.data_simulator.sr), len(self._sentence)]) |
| |
|
| | |
| | if self._params.data_simulator.session_params.normalize: |
| | if torch.max(torch.abs(self._sentence)) > 0: |
| | split_length = torch.tensor(0).to(self._device).double() |
| | split_sum = torch.tensor(0).to(self._device).double() |
| | for split in splits: |
| | split_length += len(self._sentence[split[0] : split[1]]) |
| | split_sum += torch.sum(self._sentence[split[0] : split[1]] ** 2) |
| | average_rms = torch.sqrt(split_sum * 1.0 / split_length) |
| | self._sentence = self._sentence / (1.0 * average_rms) * self._volume[speaker_turn] |
| |
|
| | def _silence_vs_overlap_selector(self, running_len_samples: int, non_silence_len_samples: int) -> bool: |
| | """ |
| | Compare the current silence ratio to the current overlap ratio. Switch to either silence or overlap mode according |
| | to the amount of the gap between current ratio and session mean in config. |
| | |
| | Args: |
| | running_len_samples (int): Length of the current session in samples. |
| | non_silence_len_samples (int): Length of the signal that is not silence in samples. |
| | |
| | Returns: |
| | add_overlap (bool): True if the current silence ratio is less than the current overlap ratio, False otherwise. |
| | """ |
| | if running_len_samples > 0: |
| | self.current_silence_ratio = (running_len_samples - self.running_speech_len_samples) / running_len_samples |
| | self.current_overlap_ratio = self.running_overlap_len_samples / non_silence_len_samples |
| | else: |
| | self.current_silence_ratio, self.current_overlap_ratio = 0, 0 |
| |
|
| | self.silence_discrepancy = self.current_silence_ratio - self.sess_silence_mean |
| | self.overlap_discrepancy = self.current_overlap_ratio - self.sess_overlap_mean |
| | add_overlap = self.overlap_discrepancy <= self.silence_discrepancy |
| | return add_overlap |
| |
|
| | |
| | def _add_silence_or_overlap( |
| | self, |
| | speaker_turn: int, |
| | prev_speaker: int, |
| | start: int, |
| | length: int, |
| | session_len_samples: int, |
| | prev_len_samples: int, |
| | enforce: bool, |
| | ) -> int: |
| | """ |
| | Returns new overlapped (or shifted) start position after inserting overlap or silence. |
| | |
| | Args: |
| | speaker_turn (int): The integer index of the current speaker turn. |
| | prev_speaker (int): The integer index of the previous speaker turn. |
| | start (int): Current start of the audio file being inserted. |
| | length (int): Length of the audio file being inserted. |
| | session_len_samples (int): Maximum length of the session in terms of number of samples |
| | prev_len_samples (int): Length of previous sentence (in terms of number of samples) |
| | enforce (bool): Whether speaker enforcement mode is being used |
| | Returns: |
| | new_start (int): New starting position in the session accounting for overlap or silence |
| | """ |
| | running_len_samples = start + length |
| | |
| | non_silence_len_samples = self.running_speech_len_samples + length |
| |
|
| | |
| | add_overlap = self._silence_vs_overlap_selector(running_len_samples, non_silence_len_samples) |
| |
|
| | |
| | if prev_speaker != speaker_turn and prev_speaker is not None and add_overlap: |
| | |
| | desired_overlap_amount = self._sample_from_overlap_model(non_silence_len_samples) |
| | new_start = start - desired_overlap_amount |
| |
|
| | |
| | if new_start < 0: |
| | desired_overlap_amount -= 0 - new_start |
| | self._missing_overlap += 0 - new_start |
| | new_start = 0 |
| |
|
| | |
| | if new_start < self._furthest_sample[speaker_turn]: |
| | desired_overlap_amount -= self._furthest_sample[speaker_turn] - new_start |
| | self._missing_overlap += self._furthest_sample[speaker_turn] - new_start |
| | new_start = self._furthest_sample[speaker_turn] |
| |
|
| | prev_start = start - prev_len_samples |
| | prev_end = start |
| | new_end = new_start + length |
| |
|
| | |
| | overlap_amount = 0 |
| | if is_overlap([prev_start, prev_end], [new_start, new_end]): |
| | overlap_range = get_overlap_range([prev_start, prev_end], [new_start, new_end]) |
| | overlap_amount = max(overlap_range[1] - overlap_range[0], 0) |
| |
|
| | if overlap_amount < desired_overlap_amount: |
| | self._missing_overlap += desired_overlap_amount - overlap_amount |
| | self.running_overlap_len_samples += overlap_amount |
| |
|
| | |
| | else: |
| | silence_amount = self._sample_from_silence_model(running_len_samples, session_len_samples) |
| |
|
| | |
| | if start + length + silence_amount > session_len_samples and not enforce: |
| | new_start = max(session_len_samples - length, start) |
| | else: |
| | new_start = start + silence_amount |
| |
|
| | return new_start |
| |
|
| | def _get_background(self, len_array: int, power_array: float): |
| | """ |
| | Augment with background noise (inserting ambient background noise up to the desired SNR for the full clip). |
| | |
| | Args: |
| | len_array (int): Length of background noise required. |
| | avg_power_array (float): Average power of the audio file. |
| | |
| | Returns: |
| | bg_array (tensor): Tensor containing background noise |
| | """ |
| | bg_array = torch.zeros(len_array).to(self._device) |
| | desired_snr = self._params.data_simulator.background_noise.snr |
| | ratio = 10 ** (desired_snr / 20) |
| | desired_avg_power_noise = (power_array / ratio).to(self._device) |
| | running_len_samples, file_id = 0, 0 |
| | while running_len_samples < len_array: |
| | audio_manifest = self._noise_samples[file_id % len(self._noise_samples)] |
| | file_id += 1 |
| |
|
| | if audio_manifest['audio_filepath'] in self._noise_read_buffer_dict: |
| | audio_file, sr = self._noise_read_buffer_dict[audio_manifest['audio_filepath']] |
| | else: |
| | audio_file, sr = sf.read(audio_manifest['audio_filepath']) |
| | audio_file = torch.from_numpy(audio_file).to(self._device) |
| | if audio_file.ndim > 1: |
| | audio_file = torch.mean(audio_file, 1, False) |
| | self._noise_read_buffer_dict[audio_manifest['audio_filepath']] = (audio_file, sr) |
| |
|
| | if running_len_samples + len(audio_file) < len_array: |
| | end_audio_file = running_len_samples + len(audio_file) |
| | else: |
| | end_audio_file = len_array |
| |
|
| | pow_audio_file = torch.mean(audio_file[: end_audio_file - running_len_samples] ** 2).to(self._device) |
| | scaled_audio_file = audio_file[: end_audio_file - running_len_samples] * torch.sqrt( |
| | desired_avg_power_noise / pow_audio_file |
| | ).to(self._device) |
| |
|
| | bg_array[running_len_samples:end_audio_file] = scaled_audio_file |
| | running_len_samples = end_audio_file |
| |
|
| | return bg_array |
| |
|
| | def _create_new_rttm_entry(self, start: int, end: int, speaker_id: int) -> List[str]: |
| | """ |
| | Create new RTTM entries (to write to output rttm file) |
| | |
| | Args: |
| | start (int): Current start of the audio file being inserted. |
| | end (int): End of the audio file being inserted. |
| | speaker_id (int): LibriSpeech speaker ID for the current entry. |
| | |
| | Returns: |
| | rttm_list (list): List of rttm entries |
| | """ |
| | rttm_list = [] |
| | new_start = start |
| | |
| | for i in range(len(self._words)): |
| | if self._words[i] == "" and i != 0 and i != len(self._words) - 1: |
| | silence_length = self._alignments[i] - self._alignments[i - 1] |
| | if ( |
| | silence_length > 2 * self._params.data_simulator.session_params.split_buffer |
| | ): |
| | new_end = start + self._alignments[i - 1] + self._params.data_simulator.session_params.split_buffer |
| | t_stt = float(round(new_start, self._params.data_simulator.outputs.output_precision)) |
| | t_end = float(round(new_end, self._params.data_simulator.outputs.output_precision)) |
| | rttm_list.append(f"{t_stt} {t_end} {speaker_id}") |
| | new_start = start + self._alignments[i] - self._params.data_simulator.session_params.split_buffer |
| |
|
| | t_stt = float(round(new_start, self._params.data_simulator.outputs.output_precision)) |
| | t_end = float(round(end, self._params.data_simulator.outputs.output_precision)) |
| | rttm_list.append(f"{t_stt} {t_end} {speaker_id}") |
| | return rttm_list |
| |
|
| | def _create_new_json_entry( |
| | self, wav_filename: str, start: int, length: int, speaker_id: int, rttm_filepath: str, ctm_filepath: str |
| | ) -> dict: |
| | """ |
| | Create new JSON entries (to write to output json file). |
| | |
| | Args: |
| | wav_filename (str): Output wav filepath. |
| | start (int): Current start of the audio file being inserted. |
| | length (int): Length of the audio file being inserted. |
| | speaker_id (int): LibriSpeech speaker ID for the current entry. |
| | rttm_filepath (str): Output rttm filepath. |
| | ctm_filepath (str): Output ctm filepath. |
| | |
| | Returns: |
| | dict (dict): JSON entry |
| | """ |
| | start = float(round(start, self._params.data_simulator.outputs.output_precision)) |
| | length = float(round(length, self._params.data_simulator.outputs.output_precision)) |
| | meta = { |
| | "audio_filepath": wav_filename, |
| | "offset": start, |
| | "duration": length, |
| | "label": speaker_id, |
| | "text": self._text, |
| | "num_speakers": self._params.data_simulator.session_config.num_speakers, |
| | "rttm_filepath": rttm_filepath, |
| | "ctm_filepath": ctm_filepath, |
| | "uem_filepath": None, |
| | } |
| | return meta |
| |
|
| | def _create_new_ctm_entry(self, session_name: str, speaker_id: int, start: int) -> List[str]: |
| | """ |
| | Create new CTM entry (to write to output ctm file) |
| | |
| | Args: |
| | session_name (str): Current session name. |
| | start (int): Current start of the audio file being inserted. |
| | speaker_id (int): LibriSpeech speaker ID for the current entry. |
| | |
| | Returns: |
| | arr (list): List of ctm entries |
| | """ |
| | arr = [] |
| | start = float(round(start, self._params.data_simulator.outputs.output_precision)) |
| | for i in range(len(self._words)): |
| | word = self._words[i] |
| | if ( |
| | word != "" |
| | ): |
| | prev_align = 0 if i == 0 else self._alignments[i - 1] |
| | align1 = float(round(prev_align + start, self._params.data_simulator.outputs.output_precision)) |
| | align2 = float( |
| | round(self._alignments[i] - prev_align, self._params.data_simulator.outputs.output_precision,) |
| | ) |
| | text = f"{session_name} {speaker_id} {align1} {align2} {word} 0\n" |
| | arr.append((align1, text)) |
| | return arr |
| |
|
| | def create_base_manifest_ds(self) -> str: |
| | """ |
| | Create base diarization manifest file for online data simulation. |
| | |
| | Returns: |
| | self.base_manifest_filepath (str): Path to manifest file |
| | """ |
| | basepath = self._params.data_simulator.outputs.output_dir |
| | wav_path = os.path.join(basepath, 'synthetic_wav.list') |
| | text_path = os.path.join(basepath, 'synthetic_txt.list') |
| | rttm_path = os.path.join(basepath, 'synthetic_rttm.list') |
| | ctm_path = os.path.join(basepath, 'synthetic_ctm.list') |
| | manifest_filepath = os.path.join(basepath, 'base_manifest.json') |
| |
|
| | create_manifest( |
| | wav_path, |
| | manifest_filepath, |
| | text_path=text_path, |
| | rttm_path=rttm_path, |
| | ctm_path=ctm_path, |
| | add_duration=False, |
| | ) |
| |
|
| | self.base_manifest_filepath = manifest_filepath |
| | return self.base_manifest_filepath |
| |
|
| | def create_segment_manifest_ds(self) -> str: |
| | """ |
| | Create segmented diarization manifest file for online data simulation. |
| | |
| | Returns: |
| | self.segment_manifest_filepath (str): Path to manifest file |
| | """ |
| | basepath = self._params.data_simulator.outputs.output_dir |
| | output_manifest_filepath = os.path.join(basepath, 'segment_manifest.json') |
| | input_manifest_filepath = self.base_manifest_filepath |
| | window = self._params.data_simulator.segment_manifest.window |
| | shift = self._params.data_simulator.segment_manifest.shift |
| | step_count = self._params.data_simulator.segment_manifest.step_count |
| | deci = self._params.data_simulator.segment_manifest.deci |
| |
|
| | create_segment_manifest(input_manifest_filepath, output_manifest_filepath, window, shift, step_count, deci) |
| |
|
| | self.segment_manifest_filepath = output_manifest_filepath |
| | return self.segment_manifest_filepath |
| |
|
| | def _init_silence_params(self): |
| | """ |
| | Initialize parameters for silence insertion in the current session. |
| | """ |
| | self.running_silence_len_samples = 0 |
| | self.running_speech_len_samples = 0 |
| | self.per_silence_min_len = int( |
| | max(0, self._params.data_simulator.session_params.per_silence_min) * self._params.data_simulator.sr |
| | ) |
| | if self._params.data_simulator.session_params.per_silence_max > 0: |
| | self.per_silence_max_len = int( |
| | self._params.data_simulator.session_params.per_silence_max * self._params.data_simulator.sr |
| | ) |
| | else: |
| | self.per_silence_max_len = int( |
| | self._params.data_simulator.session_config.session_length * self._params.data_simulator.sr |
| | ) |
| |
|
| | def _init_overlap_params(self): |
| | """ |
| | Initialize parameters for overlap insertion in the current session. |
| | """ |
| | self.running_overlap_len_samples = 0 |
| | self.per_overlap_min_len = int( |
| | max(0, self._params.data_simulator.session_params.per_overlap_min) * self._params.data_simulator.sr |
| | ) |
| | if self._params.data_simulator.session_params.per_overlap_max > 0: |
| | self.per_overlap_max_len = int( |
| | self._params.data_simulator.session_params.per_overlap_max * self._params.data_simulator.sr |
| | ) |
| | else: |
| | self.per_overlap_max_len = int( |
| | self._params.data_simulator.session_config.session_length * self._params.data_simulator.sr |
| | ) |
| |
|
| | def _get_session_silence_mean(self): |
| | """ |
| | Get the target mean silence for current session using re-parameterized Beta distribution. |
| | The following constraints are applied to make a > 0 and b > 0: |
| | |
| | 0 < mean_silence < 1 |
| | 0 < mean_silence_var < mean_silence * (1 - mean_silence) |
| | |
| | Args: |
| | silence_mean (float): |
| | Target mean silence for the current session |
| | """ |
| | mean = float(self._params.data_simulator.session_params.mean_silence) |
| | var = float(self._params.data_simulator.session_params.mean_silence_var) |
| | if var > 0: |
| | a = mean ** 2 * (1 - mean) / var - mean |
| | b = mean * (1 - mean) ** 2 / var - (1 - mean) |
| | if a < 0 or b < 0: |
| | raise ValueError( |
| | f"Beta(a, b), a = {a:.3f} and b = {b:.3f} should be both greater than 0. " |
| | f"Invalid `mean_silence_var` value {var} for sampling from Beta distribution. " |
| | f"`mean_silence_var` should be less than `mean_silence * (1 - mean_silence)`. " |
| | f"Please check `mean_silence_var` and try again." |
| | ) |
| | silence_mean = beta(a, b).rvs() |
| | else: |
| | silence_mean = mean |
| | return silence_mean |
| |
|
| | def _get_session_overlap_mean(self): |
| | """ |
| | Get the target mean overlap for current session using re-parameterized Beta distribution. |
| | The following constraints are applied to make a > 0 and b > 0: |
| | |
| | 0 < mean_overlap < 1 |
| | 0 < mean_overlap_var < mean_overlap * (1 - mean_overlap) |
| | |
| | Returns: |
| | overlap_mean (float): |
| | Target mean overlap for the current session |
| | """ |
| | mean = float(self._params.data_simulator.session_params.mean_overlap) |
| | var = float(self._params.data_simulator.session_params.mean_overlap_var) |
| | if var > 0: |
| | a = mean ** 2 * (1 - mean) / var - mean |
| | b = mean * (1 - mean) ** 2 / var - (1 - mean) |
| | if a < 0 or b < 0: |
| | raise ValueError( |
| | f"Beta(a, b), a = {a:.3f} and b = {b:.3f} should be both greater than 0. " |
| | f"Invalid `mean_overlap_var` value {var} for sampling from Beta distribution. " |
| | f"`mean_overlap_var` should be less than `mean_overlap * (1 - mean_overlap)`. " |
| | f"Please check `mean_overlap_var` and try again." |
| | ) |
| | overlap_mean = beta(a, b).rvs() |
| | else: |
| | overlap_mean = mean |
| | return overlap_mean |
| |
|
| | def _get_session_silence_from_rttm(self, rttm_list: List[str], running_len_samples: int): |
| | """ |
| | Calculate the total speech and silence duration in the current session using RTTM file. |
| | |
| | Args: |
| | rttm_list (list): |
| | List of RTTM timestamps |
| | running_len_samples (int): |
| | Total number of samples generated so far in the current session |
| | |
| | Returns: |
| | sess_speech_len_rttm (int): |
| | The total number of speech samples in the current session |
| | sess_silence_len_rttm (int): |
| | The total number of silence samples in the current session |
| | """ |
| | all_sample_list = [] |
| | for x_raw in rttm_list: |
| | x = [token for token in x_raw.split()] |
| | all_sample_list.append([float(x[0]), float(x[1])]) |
| |
|
| | self._merged_speech_intervals = merge_float_intervals(all_sample_list) |
| | total_speech_in_secs = sum([x[1] - x[0] for x in self._merged_speech_intervals]) |
| | total_silence_in_secs = running_len_samples / self._params.data_simulator.sr - total_speech_in_secs |
| | sess_speech_len = int(total_speech_in_secs * self._params.data_simulator.sr) |
| | sess_silence_len = int(total_silence_in_secs * self._params.data_simulator.sr) |
| | return sess_speech_len, sess_silence_len |
| |
|
| | def _generate_session( |
| | self, |
| | idx: int, |
| | basepath: str, |
| | filename: str, |
| | speaker_ids: List[str], |
| | speaker_wav_align_map: Dict[str, list], |
| | noise_samples: list, |
| | device: torch.device, |
| | enforce_counter: int = 2, |
| | ): |
| | """ |
| | _generate_session function without RIR simulation. |
| | Generate a multispeaker audio session and corresponding label files. |
| | |
| | Args: |
| | idx (int): Index for current session (out of total number of sessions). |
| | basepath (str): Path to output directory. |
| | filename (str): Filename for output files. |
| | speaker_ids (list): List of speaker IDs that will be used in this session. |
| | speaker_wav_align_map (dict): Dictionary containing speaker IDs and their corresponding wav filepath and alignments. |
| | noise_samples (list): List of randomly sampled noise source files that will be used for generating this session. |
| | device (torch.device): Device to use for generating this session. |
| | enforce_counter (int): In enforcement mode, dominance is increased by a factor of enforce_counter for unrepresented speakers |
| | """ |
| | self._device = device |
| | speaker_dominance = self._get_speaker_dominance() |
| | base_speaker_dominance = np.copy(speaker_dominance) |
| | self._set_speaker_volume() |
| |
|
| | running_len_samples, prev_len_samples = 0, 0 |
| | prev_speaker = None |
| | rttm_list, json_list, ctm_list = [], [], [] |
| | self._noise_samples = noise_samples |
| | self._furthest_sample = [0 for n in range(self._params.data_simulator.session_config.num_speakers)] |
| | self._missing_silence = 0 |
| |
|
| | |
| | enforce_time = np.random.uniform( |
| | self._params.data_simulator.speaker_enforcement.enforce_time[0], |
| | self._params.data_simulator.speaker_enforcement.enforce_time[1], |
| | ) |
| | enforce = self._params.data_simulator.speaker_enforcement.enforce_num_speakers |
| |
|
| | session_len_samples = int( |
| | (self._params.data_simulator.session_config.session_length * self._params.data_simulator.sr) |
| | ) |
| | array = torch.zeros(session_len_samples).to(self._device) |
| | is_speech = torch.zeros(session_len_samples).to(self._device) |
| |
|
| | self._init_silence_params() |
| | self._init_overlap_params() |
| | self.sess_silence_mean = self._get_session_silence_mean() |
| | self.sess_overlap_mean = self._get_session_overlap_mean() |
| |
|
| | while running_len_samples < session_len_samples or enforce: |
| | |
| | if running_len_samples > enforce_time * session_len_samples and enforce: |
| | speaker_dominance, enforce = self._increase_speaker_dominance(base_speaker_dominance, enforce_counter) |
| | if enforce: |
| | enforce_counter += 1 |
| |
|
| | |
| | speaker_turn = self._get_next_speaker(prev_speaker, speaker_dominance) |
| |
|
| | |
| | max_samples_in_sentence = session_len_samples - running_len_samples |
| | if enforce: |
| | max_samples_in_sentence = float('inf') |
| | elif ( |
| | max_samples_in_sentence |
| | < self._params.data_simulator.session_params.end_buffer * self._params.data_simulator.sr |
| | ): |
| | break |
| |
|
| | |
| | self._build_sentence(speaker_turn, speaker_ids, speaker_wav_align_map, max_samples_in_sentence) |
| | length = len(self._sentence) |
| |
|
| | |
| | start = self._add_silence_or_overlap( |
| | speaker_turn, |
| | prev_speaker, |
| | running_len_samples, |
| | length, |
| | session_len_samples, |
| | prev_len_samples, |
| | enforce, |
| | ) |
| |
|
| | |
| | end = start + length |
| | if end > len(array): |
| | array = torch.nn.functional.pad(array, (0, end - len(array))) |
| | is_speech = torch.nn.functional.pad(is_speech, (0, end - len(is_speech))) |
| | array[start:end] += self._sentence |
| | is_speech[start:end] = 1 |
| |
|
| | |
| | new_rttm_entries = self._create_new_rttm_entry( |
| | start / self._params.data_simulator.sr, end / self._params.data_simulator.sr, speaker_ids[speaker_turn] |
| | ) |
| |
|
| | for entry in new_rttm_entries: |
| | rttm_list.append(entry) |
| |
|
| | new_json_entry = self._create_new_json_entry( |
| | os.path.join(basepath, filename + '.wav'), |
| | start / self._params.data_simulator.sr, |
| | length / self._params.data_simulator.sr, |
| | speaker_ids[speaker_turn], |
| | os.path.join(basepath, filename + '.rttm'), |
| | os.path.join(basepath, filename + '.ctm'), |
| | ) |
| | json_list.append(new_json_entry) |
| | new_ctm_entries = self._create_new_ctm_entry( |
| | filename, speaker_ids[speaker_turn], start / self._params.data_simulator.sr |
| | ) |
| | for entry in new_ctm_entries: |
| | ctm_list.append(entry) |
| |
|
| | running_len_samples = np.maximum(running_len_samples, end) |
| | self.running_speech_len_samples, self.running_silence_len_samples = self._get_session_silence_from_rttm( |
| | rttm_list, running_len_samples |
| | ) |
| |
|
| | self._furthest_sample[speaker_turn] = running_len_samples |
| | prev_speaker = speaker_turn |
| | prev_len_samples = length |
| |
|
| | |
| | if self._params.data_simulator.background_noise.add_bg: |
| | if len(self._noise_samples) > 0: |
| | avg_power_array = torch.mean(array[is_speech == 1] ** 2) |
| | bg = self._get_background(len(array), avg_power_array) |
| | array += bg |
| | else: |
| | raise ValueError('No background noise samples found in self._noise_samples.') |
| |
|
| | |
| | array = array / (1.0 * torch.max(torch.abs(array))) |
| | if torch.is_tensor(array): |
| | array = array.cpu().numpy() |
| | sf.write(os.path.join(basepath, filename + '.wav'), array, self._params.data_simulator.sr) |
| | labels_to_rttmfile(rttm_list, filename, self._params.data_simulator.outputs.output_dir) |
| | write_manifest(os.path.join(basepath, filename + '.json'), json_list) |
| | write_ctm(os.path.join(basepath, filename + '.ctm'), ctm_list) |
| | write_text(os.path.join(basepath, filename + '.txt'), ctm_list) |
| |
|
| | del array |
| | self.clean_up() |
| | return basepath, filename |
| |
|
| | def generate_sessions(self, random_seed: int = None): |
| | """ |
| | Generate several multispeaker audio sessions and corresponding list files. |
| | |
| | Args: |
| | random_seed (int): random seed for reproducibility |
| | """ |
| | logging.info(f"Generating Diarization Sessions") |
| | if random_seed is None: |
| | random_seed = self._params.data_simulator.random_seed |
| | np.random.seed(random_seed) |
| | output_dir = self._params.data_simulator.outputs.output_dir |
| |
|
| | |
| | if os.path.isdir(output_dir) and os.listdir(output_dir): |
| | if self._params.data_simulator.outputs.overwrite_output: |
| | if os.path.exists(output_dir): |
| | shutil.rmtree(output_dir) |
| | os.mkdir(output_dir) |
| | else: |
| | raise Exception("Output directory is nonempty and overwrite_output = false") |
| | elif not os.path.isdir(output_dir): |
| | os.mkdir(output_dir) |
| |
|
| | |
| | if not os.path.isabs(output_dir): |
| | ROOT = os.getcwd() |
| | basepath = os.path.join(ROOT, output_dir) |
| | else: |
| | basepath = output_dir |
| |
|
| | wavlist = open(os.path.join(basepath, "synthetic_wav.list"), "w") |
| | rttmlist = open(os.path.join(basepath, "synthetic_rttm.list"), "w") |
| | jsonlist = open(os.path.join(basepath, "synthetic_json.list"), "w") |
| | ctmlist = open(os.path.join(basepath, "synthetic_ctm.list"), "w") |
| | textlist = open(os.path.join(basepath, "synthetic_txt.list"), "w") |
| | num_workers = self._params.get("num_workers", 1) |
| | tp = concurrent.futures.ProcessPoolExecutor(max_workers=self._params.get("num_workers", 1)) |
| | futures = [] |
| |
|
| | num_sessions = self._params.data_simulator.session_config.num_sessions |
| | source_noise_manifest = self._read_noise_manifest() |
| | queue = [] |
| |
|
| | |
| | for sess_idx in range(num_sessions): |
| | filename = self._params.data_simulator.outputs.output_filename + f"_{sess_idx}" |
| | speaker_ids = self._get_speaker_ids() |
| | speaker_wav_align_map = self._get_speaker_samples(speaker_ids) |
| | noise_samples = self._sample_noise_manifest(source_noise_manifest) |
| | if torch.cuda.is_available(): |
| | device = torch.device(f"cuda:{sess_idx % torch.cuda.device_count()}") |
| | else: |
| | device = self._device |
| | queue.append((sess_idx, basepath, filename, speaker_ids, speaker_wav_align_map, noise_samples, device)) |
| |
|
| | |
| | if num_workers > 1: |
| | self._manifest = None |
| | self._speaker_samples = None |
| |
|
| | for sess_idx in range(num_sessions): |
| | self._furthest_sample = [0 for n in range(self._params.data_simulator.session_config.num_speakers)] |
| | self._audio_read_buffer_dict = {} |
| | if num_workers > 1: |
| | futures.append(tp.submit(self._generate_session, *queue[sess_idx])) |
| | else: |
| | futures.append(queue[sess_idx]) |
| |
|
| | if num_workers > 1: |
| | generator = concurrent.futures.as_completed(futures) |
| | else: |
| | generator = futures |
| |
|
| | for future in tqdm(generator, desc="Waiting for generators to finish", unit="jobs", total=len(futures)): |
| | if num_workers > 1: |
| | basepath, filename = future.result() |
| | else: |
| | self._noise_samples = self._sample_noise_manifest(source_noise_manifest) |
| | basepath, filename = self._generate_session(*future) |
| |
|
| | wavlist.write(os.path.join(basepath, filename + '.wav\n')) |
| | rttmlist.write(os.path.join(basepath, filename + '.rttm\n')) |
| | jsonlist.write(os.path.join(basepath, filename + '.json\n')) |
| | ctmlist.write(os.path.join(basepath, filename + '.ctm\n')) |
| | textlist.write(os.path.join(basepath, filename + '.txt\n')) |
| |
|
| | |
| | num_missing = 0 |
| | for k in range(len(self._furthest_sample)): |
| | if self._furthest_sample[k] == 0: |
| | num_missing += 1 |
| | if num_missing != 0: |
| | warnings.warn( |
| | f"{self._params.data_simulator.session_config.num_speakers-num_missing} speakers were included in the clip instead of the requested amount of {self._params.data_simulator.session_config.num_speakers}" |
| | ) |
| |
|
| | tp.shutdown() |
| |
|
| | wavlist.close() |
| | rttmlist.close() |
| | jsonlist.close() |
| | ctmlist.close() |
| | textlist.close() |
| |
|
| | logging.info(f"Data simulation has been completed, results saved at: {basepath}") |
| |
|
| |
|
| | class RIRMultiSpeakerSimulator(MultiSpeakerSimulator): |
| | """ |
| | RIR Augmented Multispeaker Audio Session Simulator - simulates multispeaker audio sessions using single-speaker |
| | audio files and corresponding word alignments, as well as simulated RIRs for augmentation. |
| | |
| | Args: |
| | cfg: OmegaConf configuration loaded from yaml file. |
| | |
| | Parameters (in addition to the base MultiSpeakerSimulator parameters): |
| | rir_generation: |
| | use_rir (bool): Whether to generate synthetic RIR |
| | toolkit (str): Which toolkit to use ("pyroomacoustics", "gpuRIR") |
| | room_config: |
| | room_sz (list): Size of the shoebox room environment (1d array for specific, 2d array for random range to be |
| | sampled from) |
| | pos_src (list): Positions of the speakers in the simulated room environment (2d array for specific, 3d array |
| | for random ranges to be sampled from) |
| | noise_src_pos (list): Position in room for the ambient background noise source |
| | mic_config: |
| | num_channels (int): Number of output audio channels |
| | pos_rcv (list): Microphone positions in the simulated room environment (1d/2d array for specific, 2d/3d array |
| | for range assuming num_channels is 1/2+) |
| | orV_rcv (list or null): Microphone orientations (needed for non-omnidirectional microphones) |
| | mic_pattern (str): Microphone type ("omni" - omnidirectional) - currently only omnidirectional microphones are |
| | supported for pyroomacoustics |
| | absorbtion_params: (Note that only `T60` is used for pyroomacoustics simulations) |
| | abs_weights (list): Absorption coefficient ratios for each surface |
| | T60 (float): Room reverberation time (`T60` is the time it takes for the RIR to decay by 60DB) |
| | att_diff (float): Starting attenuation (if this is different than att_max, the diffuse reverberation model is |
| | used by gpuRIR) |
| | att_max (float): End attenuation when using the diffuse reverberation model (gpuRIR) |
| | """ |
| |
|
| | def __init__(self, cfg): |
| | super().__init__(cfg) |
| | self._check_args_rir() |
| |
|
| | def _check_args_rir(self): |
| | """ |
| | Checks RIR YAML arguments to ensure they are within valid ranges |
| | """ |
| |
|
| | if not (self._params.data_simulator.rir_generation.toolkit in ['pyroomacoustics', 'gpuRIR']): |
| | raise Exception("Toolkit must be pyroomacoustics or gpuRIR") |
| | if self._params.data_simulator.rir_generation.toolkit == 'pyroomacoustics' and not PRA: |
| | raise ImportError("pyroomacoustics should be installed to run this simulator with RIR augmentation") |
| |
|
| | if self._params.data_simulator.rir_generation.toolkit == 'gpuRIR' and not GPURIR: |
| | raise ImportError("gpuRIR should be installed to run this simulator with RIR augmentation") |
| |
|
| | if len(self._params.data_simulator.rir_generation.room_config.room_sz) != 3: |
| | raise Exception("Incorrect room dimensions provided") |
| | if self._params.data_simulator.rir_generation.mic_config.num_channels == 0: |
| | raise Exception("Number of channels should be greater or equal to 1") |
| | if len(self._params.data_simulator.rir_generation.room_config.pos_src) < 2: |
| | raise Exception("Less than 2 provided source positions") |
| | for sublist in self._params.data_simulator.rir_generation.room_config.pos_src: |
| | if len(sublist) != 3: |
| | raise Exception("Three coordinates must be provided for sources positions") |
| | if len(self._params.data_simulator.rir_generation.mic_config.pos_rcv) == 0: |
| | raise Exception("No provided mic positions") |
| | for sublist in self._params.data_simulator.rir_generation.room_config.pos_src: |
| | if len(sublist) != 3: |
| | raise Exception("Three coordinates must be provided for mic positions") |
| |
|
| | if self._params.data_simulator.session_config.num_speakers != len( |
| | self._params.data_simulator.rir_generation.room_config.pos_src |
| | ): |
| | raise Exception("Number of speakers is not equal to the number of provided source positions") |
| | if self._params.data_simulator.rir_generation.mic_config.num_channels != len( |
| | self._params.data_simulator.rir_generation.mic_config.pos_rcv |
| | ): |
| | raise Exception("Number of channels is not equal to the number of provided microphone positions") |
| |
|
| | if ( |
| | not self._params.data_simulator.rir_generation.mic_config.orV_rcv |
| | and self._params.data_simulator.rir_generation.mic_config.mic_pattern != 'omni' |
| | ): |
| | raise Exception("Microphone orientations must be provided if mic_pattern != omni") |
| | if self._params.data_simulator.rir_generation.mic_config.orV_rcv is not None: |
| | if len(self._params.data_simulator.rir_generation.mic_config.orV_rcv) != len( |
| | self._params.data_simulator.rir_generation.mic_config.pos_rcv |
| | ): |
| | raise Exception("A different number of microphone orientations and microphone positions were provided") |
| | for sublist in self._params.data_simulator.rir_generation.mic_config.orV_rcv: |
| | if len(sublist) != 3: |
| | raise Exception("Three coordinates must be provided for orientations") |
| |
|
| | def _generate_rir_gpuRIR(self): |
| | """ |
| | Create simulated RIR using the gpuRIR library |
| | |
| | Returns: |
| | RIR (tensor): Generated RIR |
| | RIR_pad (int): Length of padding added when convolving the RIR with an audio file |
| | """ |
| | room_sz_tmp = np.array(self._params.data_simulator.rir_generation.room_config.room_sz) |
| | if room_sz_tmp.ndim == 2: |
| | room_sz = np.zeros(room_sz_tmp.shape[0]) |
| | for i in range(room_sz_tmp.shape[0]): |
| | room_sz[i] = np.random.uniform(room_sz_tmp[i, 0], room_sz_tmp[i, 1]) |
| | else: |
| | room_sz = room_sz_tmp |
| |
|
| | pos_src_tmp = np.array(self._params.data_simulator.rir_generation.room_config.pos_src) |
| | if pos_src_tmp.ndim == 3: |
| | pos_src = np.zeros((pos_src_tmp.shape[0], pos_src_tmp.shape[1])) |
| | for i in range(pos_src_tmp.shape[0]): |
| | for j in range(pos_src_tmp.shape[1]): |
| | pos_src[i] = np.random.uniform(pos_src_tmp[i, j, 0], pos_src_tmp[i, j, 1]) |
| | else: |
| | pos_src = pos_src_tmp |
| |
|
| | if self._params.data_simulator.background_noise.add_bg: |
| | pos_src = np.vstack((pos_src, self._params.data_simulator.rir_generation.room_config.noise_src_pos)) |
| |
|
| | mic_pos_tmp = np.array(self._params.data_simulator.rir_generation.mic_config.pos_rcv) |
| | if mic_pos_tmp.ndim == 3: |
| | mic_pos = np.zeros((mic_pos_tmp.shape[0], mic_pos_tmp.shape[1])) |
| | for i in range(mic_pos_tmp.shape[0]): |
| | for j in range(mic_pos_tmp.shape[1]): |
| | mic_pos[i] = np.random.uniform(mic_pos_tmp[i, j, 0], mic_pos_tmp[i, j, 1]) |
| | else: |
| | mic_pos = mic_pos_tmp |
| |
|
| | orV_rcv = self._params.data_simulator.rir_generation.mic_config.orV_rcv |
| | if orV_rcv: |
| | orV_rcv = np.array(orV_rcv) |
| | mic_pattern = self._params.data_simulator.rir_generation.mic_config.mic_pattern |
| | abs_weights = self._params.data_simulator.rir_generation.absorbtion_params.abs_weights |
| | T60 = self._params.data_simulator.rir_generation.absorbtion_params.T60 |
| | att_diff = self._params.data_simulator.rir_generation.absorbtion_params.att_diff |
| | att_max = self._params.data_simulator.rir_generation.absorbtion_params.att_max |
| | sr = self._params.data_simulator.sr |
| |
|
| | beta = beta_SabineEstimation(room_sz, T60, abs_weights=abs_weights) |
| | Tdiff = att2t_SabineEstimator(att_diff, T60) |
| | Tmax = att2t_SabineEstimator(att_max, T60) |
| | nb_img = t2n(Tdiff, room_sz) |
| | RIR = simulateRIR( |
| | room_sz, beta, pos_src, mic_pos, nb_img, Tmax, sr, Tdiff=Tdiff, orV_rcv=orV_rcv, mic_pattern=mic_pattern |
| | ) |
| | RIR_pad = RIR.shape[2] - 1 |
| | return RIR, RIR_pad |
| |
|
| | def _generate_rir_pyroomacoustics(self) -> Tuple[torch.Tensor, int]: |
| | """ |
| | Create simulated RIR using the pyroomacoustics library |
| | |
| | Returns: |
| | RIR (tensor): Generated RIR |
| | RIR_pad (int): Length of padding added when convolving the RIR with an audio file |
| | """ |
| |
|
| | rt60 = self._params.data_simulator.rir_generation.absorbtion_params.T60 |
| | sr = self._params.data_simulator.sr |
| |
|
| | room_sz_tmp = np.array(self._params.data_simulator.rir_generation.room_config.room_sz) |
| | if room_sz_tmp.ndim == 2: |
| | room_sz = np.zeros(room_sz_tmp.shape[0]) |
| | for i in range(room_sz_tmp.shape[0]): |
| | room_sz[i] = np.random.uniform(room_sz_tmp[i, 0], room_sz_tmp[i, 1]) |
| | else: |
| | room_sz = room_sz_tmp |
| |
|
| | pos_src_tmp = np.array(self._params.data_simulator.rir_generation.room_config.pos_src) |
| | if pos_src_tmp.ndim == 3: |
| | pos_src = np.zeros((pos_src_tmp.shape[0], pos_src_tmp.shape[1])) |
| | for i in range(pos_src_tmp.shape[0]): |
| | for j in range(pos_src_tmp.shape[1]): |
| | pos_src[i] = np.random.uniform(pos_src_tmp[i, j, 0], pos_src_tmp[i, j, 1]) |
| | else: |
| | pos_src = pos_src_tmp |
| |
|
| | |
| | e_absorption, max_order = pra.inverse_sabine(rt60, room_sz) |
| | room = pra.ShoeBox(room_sz, fs=sr, materials=pra.Material(e_absorption), max_order=max_order) |
| |
|
| | if self._params.data_simulator.background_noise.add_bg: |
| | pos_src = np.vstack((pos_src, self._params.data_simulator.rir_generation.room_config.noise_src_pos)) |
| | for pos in pos_src: |
| | room.add_source(pos) |
| |
|
| | |
| | mic_pattern = self._params.data_simulator.rir_generation.mic_config.mic_pattern |
| | if self._params.data_simulator.rir_generation.mic_config.mic_pattern == 'omni': |
| | mic_pattern = DirectivityPattern.OMNI |
| | dir_vec = DirectionVector(azimuth=0, colatitude=90, degrees=True) |
| | dir_obj = CardioidFamily(orientation=dir_vec, pattern_enum=mic_pattern,) |
| |
|
| | mic_pos_tmp = np.array(self._params.data_simulator.rir_generation.mic_config.pos_rcv) |
| | if mic_pos_tmp.ndim == 3: |
| | mic_pos = np.zeros((mic_pos_tmp.shape[0], mic_pos_tmp.shape[1])) |
| | for i in range(mic_pos_tmp.shape[0]): |
| | for j in range(mic_pos_tmp.shape[1]): |
| | mic_pos[i] = np.random.uniform(mic_pos_tmp[i, j, 0], mic_pos_tmp[i, j, 1]) |
| | else: |
| | mic_pos = mic_pos_tmp |
| |
|
| | room.add_microphone_array(mic_pos.T, directivity=dir_obj) |
| |
|
| | room.compute_rir() |
| | rir_pad = 0 |
| | for channel in room.rir: |
| | for pos in channel: |
| | if pos.shape[0] - 1 > rir_pad: |
| | rir_pad = pos.shape[0] - 1 |
| | return room.rir, rir_pad |
| |
|
| | def _convolve_rir(self, input, speaker_turn: int, RIR: torch.Tensor) -> Tuple[list, int]: |
| | """ |
| | Augment one sentence (or background noise segment) using a synthetic RIR. |
| | |
| | Args: |
| | input (torch.tensor): Input audio. |
| | speaker_turn (int): Current speaker turn. |
| | RIR (torch.tensor): Room Impulse Response. |
| | Returns: |
| | output_sound (list): List of tensors containing augmented audio |
| | length (int): Length of output audio channels (or of the longest if they have different lengths) |
| | """ |
| | output_sound = [] |
| | length = 0 |
| | for channel in range(self._params.data_simulator.rir_generation.mic_config.num_channels): |
| | if self._params.data_simulator.rir_generation.toolkit == 'gpuRIR': |
| | out_channel = convolve(input, RIR[speaker_turn, channel, : len(input)]).tolist() |
| | elif self._params.data_simulator.rir_generation.toolkit == 'pyroomacoustics': |
| | out_channel = convolve(input, RIR[channel][speaker_turn][: len(input)]).tolist() |
| | if len(out_channel) > length: |
| | length = len(out_channel) |
| | output_sound.append(torch.tensor(out_channel)) |
| | return output_sound, length |
| |
|
| | def _generate_session( |
| | self, |
| | idx: int, |
| | basepath: str, |
| | filename: str, |
| | speaker_ids: list, |
| | speaker_wav_align_map: dict, |
| | noise_samples: list, |
| | device: torch.device, |
| | enforce_counter: int = 2, |
| | ): |
| | """ |
| | Generate a multispeaker audio session and corresponding label files. |
| | |
| | Args: |
| | idx (int): Index for current session (out of total number of sessions). |
| | basepath (str): Path to output directory. |
| | filename (str): Filename for output files. |
| | speaker_ids (list): List of speaker IDs that will be used in this session. |
| | speaker_wav_align_map (dict): Dictionary containing speaker IDs and their corresponding wav filepath and alignments. |
| | noise_samples (list): List of randomly sampled noise source files that will be used for generating this session. |
| | device (torch.device): Device to use for generating this session. |
| | enforce_counter (int): In enforcement mode, dominance is increased by a factor of enforce_counter for unrepresented speakers |
| | """ |
| | self._device = device |
| | speaker_dominance = self._get_speaker_dominance() |
| | base_speaker_dominance = np.copy(speaker_dominance) |
| | self._set_speaker_volume() |
| |
|
| | running_len_samples, prev_len_samples = 0, 0 |
| | prev_speaker = None |
| | rttm_list, json_list, ctm_list = [], [], [] |
| | self._noise_samples = noise_samples |
| | self._furthest_sample = [0 for n in range(self._params.data_simulator.session_config.num_speakers)] |
| |
|
| | |
| | if self._params.data_simulator.rir_generation.toolkit == 'gpuRIR': |
| | RIR, RIR_pad = self._generate_rir_gpuRIR() |
| | elif self._params.data_simulator.rir_generation.toolkit == 'pyroomacoustics': |
| | RIR, RIR_pad = self._generate_rir_pyroomacoustics() |
| | else: |
| | raise Exception("Toolkit must be pyroomacoustics or gpuRIR") |
| |
|
| | |
| | enforce_time = np.random.uniform( |
| | self._params.data_simulator.speaker_enforcement.enforce_time[0], |
| | self._params.data_simulator.speaker_enforcement.enforce_time[1], |
| | ) |
| | enforce = self._params.data_simulator.speaker_enforcement.enforce_num_speakers |
| |
|
| | session_len_samples = int( |
| | (self._params.data_simulator.session_config.session_length * self._params.data_simulator.sr) |
| | ) |
| | array = torch.zeros((session_len_samples, self._params.data_simulator.rir_generation.mic_config.num_channels)) |
| | is_speech = torch.zeros(session_len_samples) |
| |
|
| | while running_len_samples < session_len_samples or enforce: |
| | |
| | if running_len_samples > enforce_time * session_len_samples and enforce: |
| | speaker_dominance, enforce = self._increase_speaker_dominance(base_speaker_dominance, enforce_counter) |
| | if enforce: |
| | enforce_counter += 1 |
| |
|
| | |
| | speaker_turn = self._get_next_speaker(prev_speaker, speaker_dominance) |
| |
|
| | |
| | max_samples_in_sentence = ( |
| | session_len_samples - running_len_samples - RIR_pad |
| | ) |
| | if enforce: |
| | max_samples_in_sentence = float('inf') |
| | elif ( |
| | max_samples_in_sentence |
| | < self._params.data_simulator.session_params.end_buffer * self._params.data_simulator.sr |
| | ): |
| | break |
| |
|
| | |
| | self._build_sentence(speaker_turn, speaker_ids, speaker_wav_align_map, max_samples_in_sentence) |
| | augmented_sentence, length = self._convolve_rir(self._sentence, speaker_turn, RIR) |
| |
|
| | |
| | start = self._add_silence_or_overlap( |
| | speaker_turn, |
| | prev_speaker, |
| | running_len_samples, |
| | length, |
| | session_len_samples, |
| | prev_len_samples, |
| | enforce, |
| | ) |
| | end = start + length |
| | if end > len(array): |
| | array = torch.nn.functional.pad(array, (0, 0, 0, end - len(array))) |
| | is_speech = torch.nn.functional.pad(is_speech, (0, end - len(is_speech))) |
| |
|
| | is_speech[start:end] = 1 |
| |
|
| | for channel in range(self._params.data_simulator.rir_generation.mic_config.num_channels): |
| | len_ch = len(augmented_sentence[channel]) |
| | array[start : start + len_ch, channel] += augmented_sentence[channel] |
| |
|
| | |
| | new_rttm_entries = self._create_new_rttm_entry( |
| | start / self._params.data_simulator.sr, end / self._params.data_simulator.sr, speaker_ids[speaker_turn] |
| | ) |
| |
|
| | for entry in new_rttm_entries: |
| | rttm_list.append(entry) |
| | new_json_entry = self._create_new_json_entry( |
| | os.path.join(basepath, filename + '.wav'), |
| | start / self._params.data_simulator.sr, |
| | length / self._params.data_simulator.sr, |
| | speaker_ids[speaker_turn], |
| | os.path.join(basepath, filename + '.rttm'), |
| | os.path.join(basepath, filename + '.ctm'), |
| | ) |
| | json_list.append(new_json_entry) |
| | new_ctm_entries = self._create_new_ctm_entry( |
| | filename, speaker_ids[speaker_turn], start / self._params.data_simulator.sr |
| | ) |
| | for entry in new_ctm_entries: |
| | ctm_list.append(entry) |
| |
|
| | running_len_samples = np.maximum(running_len_samples, end) |
| | self._furthest_sample[speaker_turn] = running_len_samples |
| | prev_speaker = speaker_turn |
| | prev_len_samples = length |
| |
|
| | |
| | if self._params.data_simulator.background_noise.add_bg: |
| | avg_power_array = torch.mean(array[is_speech == 1] ** 2) |
| | length = array.shape[0] |
| | bg = self._get_background(length, avg_power_array) |
| | augmented_bg, _ = self._convolve_rir(bg, -1, RIR) |
| | for channel in range(self._params.data_simulator.rir_generation.mic_config.num_channels): |
| | array[:, channel] += augmented_bg[channel][:length] |
| |
|
| | array = array / (1.0 * torch.max(torch.abs(array))) |
| | sf.write(os.path.join(basepath, filename + '.wav'), array, self._params.data_simulator.sr) |
| | labels_to_rttmfile(rttm_list, filename, self._params.data_simulator.outputs.output_dir) |
| | write_manifest(os.path.join(basepath, filename + '.json'), json_list) |
| | write_ctm(os.path.join(basepath, filename + '.ctm'), ctm_list) |
| | write_text(os.path.join(basepath, filename + '.txt'), ctm_list) |
| | del array |
| | self.clean_up() |
| | return basepath, filename |
| |
|
| |
|
| | def check_angle(key: str, val: Union[float, Iterable[float]]) -> bool: |
| | """Check if the angle value is within the expected range. Input |
| | values are in degrees. |
| | |
| | Note: |
| | azimuth: angle between a projection on the horizontal (xy) plane and |
| | positive x axis. Increases counter-clockwise. Range: [-180, 180]. |
| | elevation: angle between a vector an its projection on the horizontal (xy) plane. |
| | Positive above, negative below, i.e., north=+90, south=-90. Range: [-90, 90] |
| | yaw: rotation around the z axis. Defined accoding to right-hand rule. |
| | Range: [-180, 180] |
| | pitch: rotation around the yʹ axis. Defined accoding to right-hand rule. |
| | Range: [-90, 90] |
| | roll: rotation around the xʺ axis. Defined accoding to right-hand rule. |
| | Range: [-180, 180] |
| | |
| | Args: |
| | key: angle type |
| | val: values in degrees |
| | |
| | Returns: |
| | True if all values are within the expected range. |
| | """ |
| | if np.isscalar(val): |
| | min_val = max_val = val |
| | else: |
| | min_val = min(val) |
| | max_val = max(val) |
| |
|
| | if key == 'azimuth' and -180 <= min_val <= max_val <= 180: |
| | return True |
| | if key == 'elevation' and -90 <= min_val <= max_val <= 90: |
| | return True |
| | if key == 'yaw' and -180 <= min_val <= max_val <= 180: |
| | return True |
| | if key == 'pitch' and -90 <= min_val <= max_val <= 90: |
| | return True |
| | if key == 'roll' and -180 <= min_val <= max_val <= 180: |
| | return True |
| |
|
| | raise ValueError(f'Invalid value for angle {key} = {val}') |
| |
|
| |
|
| | def wrap_to_180(angle: float) -> float: |
| | """Wrap an angle to range ±180 degrees. |
| | |
| | Args: |
| | angle: angle in degrees |
| | |
| | Returns: |
| | Angle in degrees wrapped to ±180 degrees. |
| | """ |
| | return angle - np.floor(angle / 360 + 1 / 2) * 360 |
| |
|
| |
|
| | class ArrayGeometry(object): |
| | """A class to simplify handling of array geometry. |
| | |
| | Supports translation and rotation of the array and calculation of |
| | spherical coordinates of a given point relative to the internal |
| | coordinate system of the array. |
| | |
| | Args: |
| | mic_positions: 3D coordinates, with shape (num_mics, 3) |
| | center: optional position of the center of the array. Defaults to the average of the coordinates. |
| | internal_cs: internal coordinate system for the array relative to the global coordinate system. |
| | Defaults to (x, y, z), and is rotated with the array. |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | mic_positions: Union[np.ndarray, List], |
| | center: Optional[np.ndarray] = None, |
| | internal_cs: Optional[np.ndarray] = None, |
| | ): |
| | if isinstance(mic_positions, Iterable): |
| | mic_positions = np.array(mic_positions) |
| |
|
| | if not mic_positions.ndim == 2: |
| | raise ValueError( |
| | f'Expecting a 2D array specifying mic positions, but received {mic_positions.ndim}-dim array' |
| | ) |
| |
|
| | if not mic_positions.shape[1] == 3: |
| | raise ValueError(f'Expecting 3D positions, but received {mic_positions.shape[1]}-dim positions') |
| |
|
| | mic_positions_center = np.mean(mic_positions, axis=0) |
| | self.centered_positions = mic_positions - mic_positions_center |
| | self.center = mic_positions_center if center is None else center |
| |
|
| | |
| | if internal_cs is None: |
| | |
| | self.internal_cs = np.eye(3) |
| | else: |
| | self.internal_cs = internal_cs |
| |
|
| | @property |
| | def num_mics(self): |
| | """Return the number of microphones for the current array. |
| | """ |
| | return self.centered_positions.shape[0] |
| |
|
| | @property |
| | def positions(self): |
| | """Absolute positions of the microphones. |
| | """ |
| | return self.centered_positions + self.center |
| |
|
| | @property |
| | def internal_positions(self): |
| | """Positions in the internal coordinate system. |
| | """ |
| | return np.matmul(self.centered_positions, self.internal_cs.T) |
| |
|
| | @property |
| | def radius(self): |
| | """Radius of the array, relative to the center. |
| | """ |
| | return max(np.linalg.norm(self.centered_positions, axis=1)) |
| |
|
| | @staticmethod |
| | def get_rotation(yaw: float = 0, pitch: float = 0, roll: float = 0) -> Rotation: |
| | """Get a Rotation object for given angles. |
| | |
| | All angles are defined according to the right-hand rule. |
| | |
| | Args: |
| | yaw: rotation around the z axis |
| | pitch: rotation around the yʹ axis |
| | roll: rotation around the xʺ axis |
| | |
| | Returns: |
| | A rotation object constructed using the provided angles. |
| | """ |
| | check_angle('yaw', yaw) |
| | check_angle('pitch', pitch) |
| | check_angle('roll', roll) |
| |
|
| | return Rotation.from_euler('ZYX', [yaw, pitch, roll], degrees=True) |
| |
|
| | def translate(self, to: np.ndarray): |
| | """Translate the array center to a new point. |
| | |
| | Translation does not change the centered positions or the internal coordinate system. |
| | |
| | Args: |
| | to: 3D point, shape (3,) |
| | """ |
| | self.center = to |
| |
|
| | def rotate(self, yaw: float = 0, pitch: float = 0, roll: float = 0): |
| | """Apply rotation on the mic array. |
| | |
| | This rotates the centered microphone positions and the internal |
| | coordinate system, it doesn't change the center of the array. |
| | |
| | All angles are defined according to the right-hand rule. |
| | For example, this means that a positive pitch will result in a rotation from z |
| | to x axis, which will result in a reduced elevation with respect to the global |
| | horizontal plane. |
| | |
| | Args: |
| | yaw: rotation around the z axis |
| | pitch: rotation around the yʹ axis |
| | roll: rotation around the xʺ axis |
| | """ |
| | |
| | rotation = self.get_rotation(yaw=yaw, pitch=pitch, roll=roll) |
| |
|
| | |
| | self.centered_positions = rotation.apply(self.centered_positions) |
| |
|
| | |
| | self.internal_cs = rotation.apply(self.internal_cs) |
| |
|
| | def new_rotated_array(self, yaw: float = 0, pitch: float = 0, roll: float = 0): |
| | """Create a new array by rotating this array. |
| | |
| | Args: |
| | yaw: rotation around the z axis |
| | pitch: rotation around the yʹ axis |
| | roll: rotation around the xʺ axis |
| | |
| | Returns: |
| | A new ArrayGeometry object constructed using the provided angles. |
| | """ |
| | new_array = ArrayGeometry(mic_positions=self.positions, center=self.center, internal_cs=self.internal_cs) |
| | new_array.rotate(yaw=yaw, pitch=pitch, roll=roll) |
| | return new_array |
| |
|
| | def spherical_relative_to_array( |
| | self, point: np.ndarray, use_internal_cs: bool = True |
| | ) -> Tuple[float, float, float]: |
| | """Return spherical coordinates of a point relative to the internal coordinate system. |
| | |
| | Args: |
| | point: 3D coordinate, shape (3,) |
| | use_internal_cs: Calculate position relative to the internal coordinate system. |
| | If `False`, the positions will be calculated relative to the |
| | external coordinate system centered at `self.center`. |
| | |
| | Returns: |
| | A tuple (distance, azimuth, elevation) relative to the mic array. |
| | """ |
| | rel_position = point - self.center |
| | distance = np.linalg.norm(rel_position) |
| |
|
| | if use_internal_cs: |
| | |
| | rel_position = np.matmul(self.internal_cs, rel_position) |
| |
|
| | |
| | azimuth = np.arctan2(rel_position[1], rel_position[0]) / np.pi * 180 |
| | |
| | elevation = np.arcsin(rel_position[2] / distance) / np.pi * 180 |
| |
|
| | return distance, azimuth, elevation |
| |
|
| | def __str__(self): |
| | with np.printoptions(precision=3, suppress=True): |
| | desc = f"{type(self)}:\ncenter =\n{self.center}\ncentered positions =\n{self.centered_positions}\nradius = \n{self.radius:.3}\nabsolute positions =\n{self.positions}\ninternal coordinate system =\n{self.internal_cs}\n\n" |
| | return desc |
| |
|
| | def plot(self, elev=30, azim=-55, mic_size=25): |
| | """Plot microphone positions. |
| | |
| | Args: |
| | elev: elevation for the view of the plot |
| | azim: azimuth for the view of the plot |
| | mic_size: size of the microphone marker in the plot |
| | """ |
| | fig = plt.figure() |
| | ax = fig.add_subplot(projection='3d') |
| |
|
| | |
| | for m in range(self.num_mics): |
| | |
| | ax.scatter( |
| | self.positions[m, 0], |
| | self.positions[m, 1], |
| | self.positions[m, 2], |
| | marker='o', |
| | c='black', |
| | s=mic_size, |
| | depthshade=False, |
| | ) |
| | |
| | ax.text(self.positions[m, 0], self.positions[m, 1], self.positions[m, 2], str(m), c='red', zorder=10) |
| |
|
| | |
| | ax.quiver( |
| | self.center[0], |
| | self.center[1], |
| | self.center[2], |
| | self.internal_cs[:, 0], |
| | self.internal_cs[:, 1], |
| | self.internal_cs[:, 2], |
| | length=self.radius, |
| | label='internal cs', |
| | normalize=False, |
| | linestyle=':', |
| | linewidth=1.0, |
| | ) |
| | for dim, label in enumerate(['x′', 'y′', 'z′']): |
| | label_pos = self.center + self.radius * self.internal_cs[dim] |
| | ax.text(label_pos[0], label_pos[1], label_pos[2], label, tuple(self.internal_cs[dim]), c='blue') |
| | try: |
| | |
| | ax.set_aspect('equal') |
| | except NotImplementedError: |
| | logging.warning('Equal aspect ratio not supported by Axes3D') |
| | |
| | ax.view_init(elev=elev, azim=azim) |
| | |
| | ax.set_xlim([self.center[0] - self.radius, self.center[0] + self.radius]) |
| | ax.set_ylim([self.center[1] - self.radius, self.center[1] + self.radius]) |
| | ax.set_zlim([self.center[2] - self.radius, self.center[2] + self.radius]) |
| |
|
| | ax.set_xlabel('x/m') |
| | ax.set_ylabel('y/m') |
| | ax.set_zlabel('z/m') |
| | ax.set_title('Microphone positions') |
| | ax.legend() |
| | plt.show() |
| |
|
| |
|
| | def convert_placement_to_range( |
| | placement: Dict, room_dim: Iterable[float], object_radius: float = 0 |
| | ) -> List[List[float]]: |
| | """Given a placement dictionary, return ranges for each dimension. |
| | |
| | Args: |
| | placement: dictionary containing x, y, height, and min_to_wall |
| | room_dim: dimensions of the room, shape (3,) |
| | object_radius: radius of the object to be placed |
| | |
| | Returns |
| | List with a range of values for each dimensions. |
| | """ |
| | if not np.all(np.array(room_dim) > 0): |
| | raise ValueError(f'Room dimensions must be positive: {room_dim}') |
| |
|
| | placement_range = [None] * 3 |
| | min_to_wall = placement.get('min_to_wall', 0) |
| |
|
| | if min_to_wall < 0: |
| | raise ValueError(f'Min distance to wall must be positive: {min_to_wall}') |
| |
|
| | for idx, key in enumerate(['x', 'y', 'height']): |
| | |
| | dim = room_dim[idx] |
| | |
| | val = placement.get(key) |
| | if val is None: |
| | |
| | min_val, max_val = 0, dim |
| | elif np.isscalar(val): |
| | min_val = max_val = val |
| | else: |
| | if len(val) != 2: |
| | raise ValueError(f'Invalid value for placement for dim {idx}/{key}: {str(placement)}') |
| | min_val, max_val = val |
| |
|
| | |
| | min_val = max(min_val, min_to_wall + object_radius) |
| | max_val = min(max_val, dim - min_to_wall - object_radius) |
| |
|
| | if min_val > max_val or min(min_val, max_val) < 0: |
| | raise ValueError(f'Invalid range dim {idx}/{key}: min={min_val}, max={max_val}') |
| |
|
| | placement_range[idx] = [min_val, max_val] |
| |
|
| | return placement_range |
| |
|
| |
|
| | class RIRCorpusGenerator(object): |
| | """Creates a corpus of RIRs based on a defined configuration of rooms and microphone array. |
| | |
| | RIRs are generated using `generate` method. |
| | """ |
| |
|
| | def __init__(self, cfg: DictConfig): |
| | """ |
| | Args: |
| | cfg: dictionary with parameters of the simulation |
| | """ |
| | logging.info("Initialize RIRCorpusGenerator") |
| | self._cfg = cfg |
| | self.check_cfg() |
| |
|
| | @property |
| | def cfg(self): |
| | """Property holding the internal config of the object. |
| | |
| | Note: |
| | Changes to this config are not reflected in the state of the object. |
| | Please create a new model with the updated config. |
| | """ |
| | return self._cfg |
| |
|
| | @property |
| | def sample_rate(self): |
| | return self._cfg.sample_rate |
| |
|
| | @cfg.setter |
| | def cfg(self, cfg): |
| | """Property holding the internal config of the object. |
| | |
| | Note: |
| | Changes to this config are not reflected in the state of the object. |
| | Please create a new model with the updated config. |
| | """ |
| | self._cfg = cfg |
| |
|
| | def check_cfg(self): |
| | """ |
| | Checks provided configuration to ensure it has the minimal required |
| | configuration the values are in a reasonable range. |
| | """ |
| | |
| | sample_rate = self.cfg.get('sample_rate') |
| | if sample_rate is None: |
| | raise ValueError('Sample rate not provided.') |
| | elif sample_rate < 0: |
| | raise ValueError(f'Sample rate must to be positive: {sample_rate}') |
| |
|
| | |
| | room_cfg = self.cfg.get('room') |
| | if room_cfg is None: |
| | raise ValueError('Room configuration not provided') |
| |
|
| | if room_cfg.get('num') is None: |
| | raise ValueError('Number of rooms per subset not provided') |
| |
|
| | if room_cfg.get('dim') is None: |
| | raise ValueError('Room dimensions not provided') |
| |
|
| | for idx, key in enumerate(['width', 'length', 'height']): |
| | dim = room_cfg.dim.get(key) |
| |
|
| | if dim is None: |
| | |
| | raise ValueError(f'Room {key} needs to be a scalar or a range, currently it is None') |
| | elif np.isscalar(dim) and dim <= 0: |
| | |
| | raise ValueError(f'A fixed dimension must be positive for {key}: {dim}') |
| | elif len(dim) != 2 or not 0 < dim[0] < dim[1]: |
| | |
| | raise ValueError(f'Range must be specified with two positive increasing elements for {key}: {dim}') |
| |
|
| | rt60 = room_cfg.get('rt60') |
| | if rt60 is None: |
| | |
| | raise ValueError(f'RT60 needs to be a scalar or a range, currently it is None') |
| | elif np.isscalar(rt60) and rt60 <= 0: |
| | |
| | raise ValueError(f'RT60 must be positive: {rt60}') |
| | elif len(rt60) != 2 or not 0 < rt60[0] < rt60[1]: |
| | |
| | raise ValueError(f'RT60 range must be specified with two positive increasing elements: {rt60}') |
| |
|
| | |
| | mic_cfg = self.cfg.get('mic_array') |
| | if mic_cfg is None: |
| | raise ValueError('Mic configuration not provided') |
| |
|
| | for key in ['positions', 'placement', 'orientation']: |
| | if key not in mic_cfg: |
| | raise ValueError(f'Mic array {key} not provided') |
| |
|
| | |
| | source_cfg = self.cfg.get('source') |
| | if source_cfg is None: |
| | raise ValueError('Source configuration not provided') |
| |
|
| | if source_cfg.get('num') is None: |
| | raise ValueError('Number of sources per room not provided') |
| | elif source_cfg.num <= 0: |
| | raise ValueError(f'Number of sources must be positive: {source_cfg.num}') |
| |
|
| | if 'placement' not in source_cfg: |
| | raise ValueError('Source placement dictionary not provided') |
| |
|
| | |
| | if self.cfg.get('anechoic') is None: |
| | raise ValueError(f'Anechoic configuratio not provided.') |
| |
|
| | def generate_room_params(self) -> dict: |
| | """Generate randomized room parameters based on the provided |
| | configuration. |
| | """ |
| | |
| | if not PRA: |
| | raise ImportError('pyroomacoustics is required for room simulation') |
| |
|
| | room_cfg = self.cfg.room |
| |
|
| | |
| | room_dim = np.zeros(3) |
| |
|
| | |
| | for idx, key in enumerate(['width', 'length', 'height']): |
| | |
| | dim = room_cfg.dim[key] |
| |
|
| | |
| | if dim is None: |
| | raise ValueError(f'Room {key} needs to be a scalar or a range, currently it is None') |
| | elif np.isscalar(dim): |
| | assert dim > 0, f'Dimension should be positive for {key}: {dim}' |
| | room_dim[idx] = dim |
| | elif len(dim) == 2: |
| | assert 0 < dim[0] <= dim[1], f'Expecting two non-decreasing values for {key}, received {dim}' |
| | room_dim[idx] = self.random.uniform(low=dim[0], high=dim[1]) |
| | else: |
| | raise ValueError(f'Unexpected value for {key}: {dim}') |
| |
|
| | |
| | if room_cfg.rt60 is None: |
| | raise ValueError(f'Room RT60 needs to be a scalar or a range, currently it is None') |
| |
|
| | if np.isscalar(room_cfg.rt60): |
| | assert room_cfg.rt60 > 0, f'RT60 should be positive: {room_cfg.rt60}' |
| | rt60 = room_cfg.rt60 |
| | elif len(room_cfg.rt60) == 2: |
| | assert ( |
| | 0 < room_cfg.rt60[0] <= room_cfg.rt60[1] |
| | ), f'Expecting two non-decreasing values for RT60, received {room_cfg.rt60}' |
| | rt60 = self.random.uniform(low=room_cfg.rt60[0], high=room_cfg.rt60[1]) |
| | else: |
| | raise ValueError(f'Unexpected value for RT60: {room_cfg.rt60}') |
| |
|
| | |
| | room_absorption, room_max_order = pra.inverse_sabine(rt60, room_dim) |
| |
|
| | |
| | room_params = { |
| | 'dim': room_dim, |
| | 'absorption': room_absorption, |
| | 'max_order': room_max_order, |
| | 'rt60_theoretical': rt60, |
| | 'anechoic_absorption': self.cfg.anechoic.absorption, |
| | 'anechoic_max_order': self.cfg.anechoic.max_order, |
| | 'sample_rate': self.cfg.sample_rate, |
| | } |
| | return room_params |
| |
|
| | def generate_array(self, room_dim: Iterable[float]) -> ArrayGeometry: |
| | """Generate array placement for the current room and config. |
| | |
| | Args: |
| | room_dim: dimensions of the room, [width, length, height] |
| | |
| | Returns: |
| | Randomly placed microphone array. |
| | """ |
| | mic_cfg = self.cfg.mic_array |
| | mic_array = ArrayGeometry(mic_cfg.positions) |
| |
|
| | |
| | center = np.zeros(3) |
| | placement_range = convert_placement_to_range( |
| | placement=mic_cfg.placement, room_dim=room_dim, object_radius=mic_array.radius |
| | ) |
| |
|
| | for idx in range(len(center)): |
| | center[idx] = self.random.uniform(low=placement_range[idx][0], high=placement_range[idx][1]) |
| |
|
| | |
| | mic_array.translate(to=center) |
| |
|
| | |
| | orientation = dict() |
| | for key in ['yaw', 'roll', 'pitch']: |
| | |
| | angle = mic_cfg.orientation[key] |
| |
|
| | if angle is None: |
| | raise ValueError(f'Mic array {key} should be a scalar or a range, currently it is set to None.') |
| |
|
| | |
| | check_angle(key, angle) |
| |
|
| | if np.isscalar(angle): |
| | orientation[key] = angle |
| | elif len(angle) == 2: |
| | assert angle[0] <= angle[1], f"Expecting two non-decreasing values for {key}, received {angle}" |
| | |
| | orientation[key] = self.random.uniform(low=angle[0], high=angle[1]) |
| | else: |
| | raise ValueError(f'Unexpected value for orientation {key}: {angle}') |
| |
|
| | |
| | mic_array.rotate(**orientation) |
| |
|
| | return mic_array |
| |
|
| | def generate_source_position(self, room_dim: Iterable[float]) -> List[List[float]]: |
| | """Generate position for all sources in a room. |
| | |
| | Args: |
| | room_dim: dimensions of a 3D shoebox room |
| | |
| | Returns: |
| | List of source positions, with each position characterized with a 3D coordinate |
| | """ |
| | source_cfg = self.cfg.source |
| | placement_range = convert_placement_to_range(placement=source_cfg.placement, room_dim=room_dim) |
| | source_position = [] |
| |
|
| | for n in range(source_cfg.num): |
| | |
| | s_pos = [None] * 3 |
| | for idx in range(len(s_pos)): |
| | s_pos[idx] = self.random.uniform(low=placement_range[idx][0], high=placement_range[idx][1]) |
| | source_position.append(s_pos) |
| |
|
| | return source_position |
| |
|
| | def generate(self): |
| | """Generate RIR corpus. |
| | |
| | This method will prepare randomized examples based on the current configuration, |
| | run room simulations and save results to output_dir. |
| | """ |
| | logging.info("Generate RIR corpus") |
| |
|
| | |
| | self.random = default_rng(seed=self.cfg.random_seed) |
| |
|
| | |
| | output_dir = self.cfg.output_dir |
| | if output_dir.endswith('.yaml'): |
| | output_dir = output_dir[:-5] |
| |
|
| | |
| | logging.info('Output dir set to: %s', output_dir) |
| |
|
| | |
| | for subset, num_rooms in self.cfg.room.num.items(): |
| |
|
| | output_dir_subset = os.path.join(output_dir, subset) |
| | examples = [] |
| |
|
| | if not os.path.exists(output_dir_subset): |
| | logging.info('Creating output directory: %s', output_dir_subset) |
| | os.makedirs(output_dir_subset) |
| | elif os.path.isdir(output_dir_subset) and len(os.listdir(output_dir_subset)) > 0: |
| | raise RuntimeError(f'Output directory {output_dir_subset} is not empty.') |
| |
|
| | |
| | for n_room in range(num_rooms): |
| |
|
| | |
| | room_params = self.generate_room_params() |
| |
|
| | |
| | mic_array = self.generate_array(room_params['dim']) |
| |
|
| | |
| | source_position = self.generate_source_position(room_params['dim']) |
| |
|
| | |
| | room_filepath = os.path.join(output_dir_subset, f'{subset}_room_{n_room:06d}.h5') |
| |
|
| | |
| | example = { |
| | 'room_params': room_params, |
| | 'mic_array': mic_array, |
| | 'source_position': source_position, |
| | 'room_filepath': room_filepath, |
| | } |
| | examples.append(example) |
| |
|
| | |
| | num_workers = self.cfg.num_workers |
| | if num_workers is not None and num_workers > 1: |
| | logging.info(f'Simulate using {num_workers} workers') |
| | with multiprocessing.Pool(processes=num_workers) as pool: |
| | metadata = list(tqdm(pool.imap(simulate_room_kwargs, examples), total=len(examples))) |
| |
|
| | else: |
| | logging.info('Simulate using a single worker') |
| | metadata = [] |
| | for example in tqdm(examples, total=len(examples)): |
| | metadata.append(simulate_room(**example)) |
| |
|
| | |
| | manifest_filepath = os.path.join(output_dir, f'{subset}_manifest.json') |
| |
|
| | if os.path.exists(manifest_filepath) and os.path.isfile(manifest_filepath): |
| | raise RuntimeError(f'Manifest config file exists: {manifest_filepath}') |
| |
|
| | |
| | for data in metadata: |
| | data['room_filepath'] = os.path.relpath(data['room_filepath'], start=output_dir) |
| |
|
| | write_manifest(manifest_filepath, metadata) |
| |
|
| | |
| | plot_filepath = os.path.join(output_dir, f'{subset}_info.png') |
| |
|
| | if os.path.exists(plot_filepath) and os.path.isfile(plot_filepath): |
| | raise RuntimeError(f'Plot file exists: {plot_filepath}') |
| |
|
| | plot_rir_manifest_info(manifest_filepath, plot_filepath=plot_filepath) |
| |
|
| | |
| | config_filepath = os.path.join(output_dir, 'config.yaml') |
| | if os.path.exists(config_filepath) and os.path.isfile(config_filepath): |
| | raise RuntimeError(f'Output config file exists: {config_filepath}') |
| |
|
| | OmegaConf.save(self.cfg, config_filepath, resolve=True) |
| |
|
| |
|
| | def simulate_room_kwargs(kwargs: dict) -> dict: |
| | """Wrapper around `simulate_room` to handle kwargs. |
| | |
| | `pool.map(simulate_room_kwargs, examples)` would be |
| | equivalent to `pool.starstarmap(simulate_room, examples)` |
| | if `starstarmap` would exist. |
| | |
| | Args: |
| | kwargs: kwargs that are forwarded to `simulate_room` |
| | |
| | Returns: |
| | Dictionary with metadata, see `simulate_room` |
| | """ |
| | return simulate_room(**kwargs) |
| |
|
| |
|
| | def simulate_room( |
| | room_params: dict, mic_array: ArrayGeometry, source_position: Iterable[Iterable[float]], room_filepath: str, |
| | ) -> dict: |
| | """Simulate room |
| | |
| | Args: |
| | room_params: parameters of the room to be simulated |
| | mic_array: defines positions of the microphones |
| | source_positions: positions for all sources to be simulated |
| | room_filepath: results are saved to this path |
| | |
| | Returns: |
| | Dictionary with metadata based on simulation setup |
| | and simulation results. Used to create the corresponding |
| | manifest file. |
| | """ |
| | |
| | room_sim = pra.ShoeBox( |
| | room_params['dim'], |
| | fs=room_params['sample_rate'], |
| | materials=pra.Material(room_params['absorption']), |
| | max_order=room_params['max_order'], |
| | ) |
| |
|
| | |
| | room_anechoic = pra.ShoeBox( |
| | room_params['dim'], |
| | fs=room_params['sample_rate'], |
| | materials=pra.Material(room_params['anechoic_absorption']), |
| | max_order=room_params['anechoic_max_order'], |
| | ) |
| |
|
| | |
| | for room in [room_sim, room_anechoic]: |
| | |
| | room.add_microphone_array(mic_array.positions.T) |
| |
|
| | |
| | for s_pos in source_position: |
| | room.add_source(s_pos) |
| |
|
| | |
| | room.compute_rir() |
| |
|
| | |
| | source_distance = [] |
| | source_azimuth = [] |
| | source_elevation = [] |
| | for s_pos in source_position: |
| | distance, azimuth, elevation = mic_array.spherical_relative_to_array(s_pos) |
| | source_distance.append(distance) |
| | source_azimuth.append(azimuth) |
| | source_elevation.append(elevation) |
| |
|
| | |
| | rir_dataset = { |
| | 'rir': convert_rir_to_multichannel(room_sim.rir), |
| | 'anechoic': convert_rir_to_multichannel(room_anechoic.rir), |
| | } |
| |
|
| | |
| | metadata = { |
| | 'room_filepath': room_filepath, |
| | 'sample_rate': room_params['sample_rate'], |
| | 'dim': room_params['dim'], |
| | 'rir_absorption': room_params['absorption'], |
| | 'rir_max_order': room_params['max_order'], |
| | 'rir_rt60_theory': room_sim.rt60_theory(), |
| | 'rir_rt60_measured': room_sim.measure_rt60().mean(axis=0), |
| | 'anechoic_rt60_theory': room_anechoic.rt60_theory(), |
| | 'anechoic_rt60_measured': room_anechoic.measure_rt60().mean(axis=0), |
| | 'anechoic_absorption': room_params['anechoic_absorption'], |
| | 'anechoic_max_order': room_params['anechoic_max_order'], |
| | 'mic_positions': mic_array.positions, |
| | 'mic_center': mic_array.center, |
| | 'source_position': source_position, |
| | 'source_distance': source_distance, |
| | 'source_azimuth': source_azimuth, |
| | 'source_elevation': source_elevation, |
| | 'num_sources': len(source_position), |
| | } |
| |
|
| | |
| | save_rir_simulation(room_filepath, rir_dataset, metadata) |
| |
|
| | return convert_numpy_to_serializable(metadata) |
| |
|
| |
|
| | def save_rir_simulation(filepath: str, rir_dataset: Dict[str, List[np.array]], metadata: dict): |
| | """Save simulated RIRs and metadata. |
| | |
| | Args: |
| | filepath: Path to the file where the data will be saved. |
| | rir_dataset: Dictionary with RIR data. Each item is a set of multi-channel RIRs. |
| | metadata: Dictionary with related metadata. |
| | """ |
| | if os.path.exists(filepath): |
| | raise RuntimeError(f'Output file exists: {room_filepath}') |
| |
|
| | num_sources = metadata['num_sources'] |
| |
|
| | with h5py.File(filepath, 'w') as h5f: |
| | |
| | for rir_key, rir_value in rir_dataset.items(): |
| | if len(rir_value) != num_sources: |
| | raise ValueError( |
| | f'Each RIR dataset should have exactly {num_sources} elements. Current RIR {key} has {len(rir_value)} elements' |
| | ) |
| |
|
| | rir_group = h5f.create_group(rir_key) |
| |
|
| | |
| | for idx, rir in enumerate(rir_value): |
| | rir_group.create_dataset(f'{idx}', data=rir_value[idx]) |
| |
|
| | |
| | metadata_group = h5f.create_group('metadata') |
| | for key, value in metadata.items(): |
| | metadata_group.create_dataset(key, data=value) |
| |
|
| |
|
| | def load_rir_simulation(filepath: str, source: int = 0, rir_key: str = 'rir') -> Tuple[np.ndarray, float]: |
| | """Load simulated RIRs and metadata. |
| | |
| | Args: |
| | filepath: Path to simulated RIR data |
| | source: Index of a source. |
| | rir_key: String to denote which RIR to load, if there are multiple available. |
| | |
| | Returns: |
| | Multichannel RIR as ndarray with shape (num_samples, num_channels) and scalar sample rate. |
| | """ |
| | with h5py.File(filepath, 'r') as h5f: |
| | |
| | rir = h5f[rir_key][f'{source}'][:] |
| |
|
| | |
| | sample_rate = h5f['metadata']['sample_rate'][()] |
| |
|
| | return rir, sample_rate |
| |
|
| |
|
| | def convert_numpy_to_serializable(data: Union[dict, float, np.ndarray]) -> Union[dict, float, np.ndarray]: |
| | """Convert all numpy estries to list. |
| | Can be used to preprocess data before writing to a JSON file. |
| | |
| | Args: |
| | data: Dictionary, array or scalar. |
| | |
| | Returns: |
| | The same structure, but converted to list if |
| | the input is np.ndarray, so `data` can be seralized. |
| | """ |
| | if isinstance(data, dict): |
| | for key, val in data.items(): |
| | data[key] = convert_numpy_to_serializable(val) |
| | elif isinstance(data, list): |
| | data = [convert_numpy_to_serializable(d) for d in data] |
| | elif isinstance(data, np.ndarray): |
| | data = data.tolist() |
| | elif isinstance(data, np.integer): |
| | data = int(data) |
| | elif isinstance(data, np.floating): |
| | data = float(data) |
| | elif isinstance(data, np.generic): |
| | data = data.item() |
| |
|
| | return data |
| |
|
| |
|
| | def convert_rir_to_multichannel(rir: List[List[np.ndarray]]) -> List[np.ndarray]: |
| | """Convert RIR to a list of arrays. |
| | |
| | Args: |
| | rir: list of lists, each element is a single-channel RIR |
| | |
| | Returns: |
| | List of multichannel RIRs |
| | """ |
| | num_mics = len(rir) |
| | num_sources = len(rir[0]) |
| |
|
| | mc_rir = [None] * num_sources |
| |
|
| | for n_source in range(num_sources): |
| | rir_len = [len(rir[m][n_source]) for m in range(num_mics)] |
| | max_len = max(rir_len) |
| | mc_rir[n_source] = np.zeros((max_len, num_mics)) |
| | for n_mic, len_mic in enumerate(rir_len): |
| | mc_rir[n_source][:len_mic, n_mic] = rir[n_mic][n_source] |
| |
|
| | return mc_rir |
| |
|
| |
|
| | def plot_rir_manifest_info(filepath: str, plot_filepath: str = None): |
| | """Plot distribution of parameters from manifest file. |
| | |
| | Args: |
| | filepath: path to a RIR corpus manifest file |
| | plot_filepath: path to save the plot at |
| | """ |
| | metadata = read_manifest(filepath) |
| |
|
| | |
| | source_distance = [] |
| | source_azimuth = [] |
| | source_elevation = [] |
| | source_height = [] |
| |
|
| | |
| | rir_rt60_theory = [] |
| | rir_rt60_measured = [] |
| | anechoic_rt60_theory = [] |
| | anechoic_rt60_measured = [] |
| |
|
| | |
| | for data in metadata: |
| | |
| | source_distance += data['source_distance'] |
| | source_azimuth += data['source_azimuth'] |
| | source_elevation += data['source_elevation'] |
| | source_height += [s_pos[2] for s_pos in data['source_position']] |
| |
|
| | |
| | rir_rt60_theory.append(data['rir_rt60_theory']) |
| | rir_rt60_measured += data['rir_rt60_measured'] |
| | anechoic_rt60_theory.append(data['anechoic_rt60_theory']) |
| | anechoic_rt60_measured += data['anechoic_rt60_measured'] |
| |
|
| | |
| | plt.figure(figsize=(12, 6)) |
| |
|
| | plt.subplot(2, 4, 1) |
| | plt.hist(source_distance, label='distance') |
| | plt.xlabel('distance / m') |
| | plt.ylabel('# examples') |
| | plt.title('Source-to-array center distance') |
| |
|
| | plt.subplot(2, 4, 2) |
| | plt.hist(source_azimuth, label='azimuth') |
| | plt.xlabel('azimuth / deg') |
| | plt.ylabel('# examples') |
| | plt.title('Source-to-array center azimuth') |
| |
|
| | plt.subplot(2, 4, 3) |
| | plt.hist(source_elevation, label='elevation') |
| | plt.xlabel('elevation / deg') |
| | plt.ylabel('# examples') |
| | plt.title('Source-to-array center elevation') |
| |
|
| | plt.subplot(2, 4, 4) |
| | plt.hist(source_height, label='source height') |
| | plt.xlabel('height / m') |
| | plt.ylabel('# examples') |
| | plt.title('Source height') |
| |
|
| | plt.subplot(2, 4, 5) |
| | plt.hist(rir_rt60_theory, label='theory') |
| | plt.xlabel('RT60 / s') |
| | plt.ylabel('# examples') |
| | plt.title('RT60 theory') |
| |
|
| | plt.subplot(2, 4, 6) |
| | plt.hist(rir_rt60_measured, label='measured') |
| | plt.xlabel('RT60 / s') |
| | plt.ylabel('# examples') |
| | plt.title('RT60 measured') |
| |
|
| | plt.subplot(2, 4, 7) |
| | plt.hist(anechoic_rt60_theory, label='theory') |
| | plt.xlabel('RT60 / s') |
| | plt.ylabel('# examples') |
| | plt.title('RT60 theory (anechoic)') |
| |
|
| | plt.subplot(2, 4, 8) |
| | plt.hist(anechoic_rt60_measured, label='measured') |
| | plt.xlabel('RT60 / s') |
| | plt.ylabel('# examples') |
| | plt.title('RT60 measured (anechoic)') |
| |
|
| | for n in range(8): |
| | plt.subplot(2, 4, n + 1) |
| | plt.grid() |
| | plt.legend(loc='lower left') |
| |
|
| | plt.tight_layout() |
| |
|
| | if plot_filepath is not None: |
| | plt.savefig(plot_filepath) |
| | plt.close() |
| | logging.info('Plot saved at %s', plot_filepath) |
| |
|
| |
|
| | class RIRMixGenerator(object): |
| | """Creates a dataset of mixed signals at the microphone |
| | by combining target speech, background noise and interference. |
| | |
| | Correspnding signals are are generated and saved |
| | using the `generate` method. |
| | |
| | Input configuration is expexted to have the following structure |
| | ``` |
| | sample_rate: sample rate used for simulation |
| | room: |
| | subset: manifest for RIR data |
| | target: |
| | subset: manifest for target source data |
| | noise: |
| | subset: manifest for noise data |
| | interference: |
| | subset: manifest for interference data |
| | interference_probability: probability that interference is present |
| | max_num_interferers: max number of interferers, randomly selected between 0 and max |
| | mix: |
| | subset: |
| | num: number of examples to generate |
| | rsnr: range of RSNR |
| | rsir: range of RSIR |
| | ref_mic: reference microphone |
| | ref_mic_rms: desired RMS at ref_mic |
| | ``` |
| | """ |
| |
|
| | def __init__(self, cfg: DictConfig): |
| | """ |
| | Instantiate a RIRMixGenerator object. |
| | |
| | Args: |
| | cfg: generator configuration defining data for room, |
| | target signal, noise, interference and mixture |
| | """ |
| | logging.info("Initialize RIRMixGenerator") |
| | self._cfg = cfg |
| | self.check_cfg() |
| |
|
| | self.subsets = self.cfg.room.keys() |
| | logging.info('Initialized with %d subsets: %s', len(self.subsets), str(self.subsets)) |
| |
|
| | |
| | self.metadata = dict() |
| | for subset in self.subsets: |
| | subset_data = dict() |
| |
|
| | logging.info('Loading data for %s', subset) |
| | for key in ['room', 'target', 'noise', 'interference']: |
| | try: |
| | subset_data[key] = read_manifest(self.cfg[key][subset]) |
| | logging.info('\t%-*s: \t%d files', 15, key, len(subset_data[key])) |
| | except Exception as e: |
| | subset_data[key] = None |
| | logging.info('\t%-*s: \t0 files', 15, key) |
| | logging.warning('\t\tManifest data not loaded. Exception: %s', str(e)) |
| |
|
| | self.metadata[subset] = subset_data |
| |
|
| | logging.info('Loaded all manifests') |
| |
|
| | self.num_retries = self.cfg.get('num_retries', 5) |
| |
|
| | @property |
| | def cfg(self): |
| | """Property holding the internal config of the object. |
| | |
| | Note: |
| | Changes to this config are not reflected in the state of the object. |
| | Please create a new model with the updated config. |
| | """ |
| | return self._cfg |
| |
|
| | @property |
| | def sample_rate(self): |
| | return self._cfg.sample_rate |
| |
|
| | @cfg.setter |
| | def cfg(self, cfg): |
| | """Property holding the internal config of the object. |
| | |
| | Note: |
| | Changes to this config are not reflected in the state of the object. |
| | Please create a new model with the updated config. |
| | """ |
| | self._cfg = cfg |
| |
|
| | def check_cfg(self): |
| | """ |
| | Checks provided configuration to ensure it has the minimal required |
| | configuration the values are in a reasonable range. |
| | """ |
| | |
| | sample_rate = self.cfg.get('sample_rate') |
| | if sample_rate is None: |
| | raise ValueError('Sample rate not provided.') |
| | elif sample_rate < 0: |
| | raise ValueError(f'Sample rate must be positive: {sample_rate}') |
| |
|
| | |
| | room_cfg = self.cfg.get('room') |
| | if not room_cfg: |
| | raise ValueError( |
| | 'Room configuration not provided. Expecting RIR manifests in format {subset: path_to_manifest}' |
| | ) |
| |
|
| | |
| | target_cfg = self.cfg.get('target') |
| | if not target_cfg: |
| | raise ValueError( |
| | 'Target configuration not provided. Expecting audio manifests in format {subset: path_to_manifest}' |
| | ) |
| |
|
| | for key in ['azimuth', 'elevation', 'distance']: |
| | value = target_cfg.get(key) |
| |
|
| | if value is None or np.isscalar(value): |
| | |
| | pass |
| | elif len(value) != 2 or not value[0] < value[1]: |
| | |
| | raise ValueError(f'Range must be specified with two positive increasing elements for {key}: {value}') |
| |
|
| | |
| | noise_cfg = self.cfg.get('noise') |
| | if not noise_cfg: |
| | raise ValueError( |
| | 'Noise configuration not provided. Expecting audio manifests in format {subset: path_to_manifest}' |
| | ) |
| |
|
| | |
| | interference_cfg = self.cfg.get('interference') |
| | if not interference_cfg: |
| | raise ValueError( |
| | 'Interference configuration not provided. Expecting audio manifests in format {subset: path_to_manifest}' |
| | ) |
| | interference_probability = interference_cfg.get('interference_probability', 0) |
| | max_num_interferers = interference_cfg.get('max_num_interferers', 0) |
| | min_azimuth_to_target = interference_cfg.get('min_azimuth_to_target', 0) |
| | if interference_probability is not None: |
| | if interference_probability < 0: |
| | raise ValueError(f'Interference probability must be non-negative. Current value: {interference_prob}') |
| | elif interference_probability > 0: |
| | assert ( |
| | max_num_interferers is not None and max_num_interferers > 0 |
| | ), f'Max number of interferers must be positive. Current value: {max_num_interferers}' |
| | assert ( |
| | min_azimuth_to_target is not None and min_azimuth_to_target >= 0 |
| | ), f'Min azimuth to target must be non-negative' |
| |
|
| | |
| | mix_cfg = self.cfg.get('mix') |
| | if not mix_cfg: |
| | raise ValueError('Mix configuration not provided. Expecting configuration for each subset.') |
| | if 'ref_mic' not in mix_cfg: |
| | raise ValueError('Reference microphone not defined.') |
| | if 'ref_mic_rms' not in mix_cfg: |
| | raise ValueError('Reference microphone RMS not defined.') |
| |
|
| | def get_audio_list( |
| | self, metadata: List[dict], min_duration: float, manifest_filepath: str = None, duration_eps: float = 0.01 |
| | ) -> List[dict]: |
| | """Prepare a list of audio files with duration of at least min_duration. |
| | Audio files are randomly selected from manifest metadata. |
| | |
| | If a selected file is longer than required duration, then a random offset is selected |
| | before taking a min_duration segment. |
| | If a selected file is shorter than the required duration, then a the whole file is selected |
| | and a next file is randomly selected. |
| | Needs manifest filepath to support relative path resolution. |
| | |
| | Args: |
| | metadata: metadata loaded from a manifest file |
| | min_duration: minimal duration for the output file |
| | manifest_filepath: path to the manifest file, used to resolve relative paths. |
| | For relative paths, manifest parent directory is assume to |
| | be the base directory. |
| | duration_eps: A small extra duration selected from each file. This is to make |
| | sure that the signal will be long enough even if it needs to be |
| | resampled, etc. |
| | |
| | Returns: |
| | List of audio files with some metadata (offset, duration). |
| | """ |
| | |
| | |
| | total_duration = additional_duration = 0 |
| |
|
| | audio_list = [] |
| |
|
| | while total_duration < min_duration + additional_duration: |
| |
|
| | data = self.random.choice(metadata) |
| | audio_filepath = data['audio_filepath'] |
| | if not os.path.isabs(audio_filepath) and manifest_filepath is not None: |
| | manifest_dir = os.path.dirname(manifest_filepath) |
| | audio_filepath = os.path.join(manifest_dir, audio_filepath) |
| |
|
| | remaining_duration = min_duration - total_duration + additional_duration |
| |
|
| | |
| | if data['duration'] <= remaining_duration: |
| | |
| | offset = 0 |
| | duration = data['duration'] |
| | additional_duration += duration_eps |
| | else: |
| | |
| | max_offset = data['duration'] - remaining_duration |
| | offset = self.random.uniform(low=0, high=max_offset) |
| | duration = remaining_duration |
| |
|
| | audio_example = { |
| | 'audio_filepath': audio_filepath, |
| | 'offset': offset, |
| | 'duration': duration, |
| | 'type': data.get('type'), |
| | } |
| |
|
| | audio_list.append(audio_example) |
| | total_duration += duration |
| |
|
| | return audio_list |
| |
|
| | def generate_target(self, subset: str) -> dict: |
| | """ |
| | Prepare a dictionary with target configuration. |
| | |
| | The output dictionary contains the following information |
| | ``` |
| | room_index: index of the selected room from the RIR corpus |
| | room_filepath: path to the room simulation file |
| | source: index of the selected source for the target |
| | rt60: reverberation time of the selected room |
| | num_mics: number of microphones |
| | azimuth: azimuth of the target source, relative to the microphone array |
| | elevation: elevation of the target source, relative to the microphone array |
| | distance: distance of the target source, relative to the microphone array |
| | audio_filepath: path to the audio file for the target source |
| | text: text for the target source audio signal, if available |
| | duration: duration of the target source audio signal |
| | ``` |
| | |
| | Args: |
| | subset: string denoting a subset which will be used to selected target |
| | audio and room parameters. |
| | |
| | Returns: |
| | Dictionary with target configuration, including room, source index, and audio information. |
| | """ |
| | |
| | room_metadata = self.metadata[subset]['room'] |
| |
|
| | for _ in range(self.num_retries): |
| | |
| | room_index = self.random.integers(low=0, high=len(room_metadata)) |
| | room_data = room_metadata[room_index] |
| |
|
| | |
| | for _ in range(self.num_retries): |
| | |
| | source = self.random.integers(low=0, high=room_data['num_sources']) |
| | |
| | for constraint in ['azimuth', 'elevation', 'distance']: |
| | if self.cfg.target.get(constraint) is None: |
| | continue |
| | else: |
| | |
| | source_value = room_data[f'source_{constraint}'][source] |
| | if self.cfg.target[constraint][0] <= source_value <= self.cfg.target[constraint][1]: |
| | continue |
| | else: |
| | |
| | source = None |
| | break |
| |
|
| | if source is not None: |
| | |
| | break |
| |
|
| | if source is None: |
| | raise RuntimeError(f'Could not find a feasible source given target constraints {self.cfg.target}') |
| |
|
| | |
| | audio_data = self.random.choice(self.metadata[subset]['target']) |
| |
|
| | |
| | room_filepath = room_data['room_filepath'] |
| | if not os.path.isabs(room_filepath): |
| | manifest_dir = os.path.dirname(self.cfg.room[subset]) |
| | room_filepath = os.path.join(manifest_dir, room_filepath) |
| |
|
| | audio_filepath = audio_data['audio_filepath'] |
| | if not os.path.isabs(audio_filepath): |
| | manifest_dir = os.path.dirname(self.cfg.target[subset]) |
| | audio_filepath = os.path.join(manifest_dir, audio_filepath) |
| |
|
| | target_cfg = { |
| | 'room_index': int(room_index), |
| | 'room_filepath': room_filepath, |
| | 'source': source, |
| | 'rt60': room_data['rir_rt60_measured'][source], |
| | 'num_mics': len(room_data['mic_positions']), |
| | 'azimuth': room_data['source_azimuth'][source], |
| | 'elevation': room_data['source_elevation'][source], |
| | 'distance': room_data['source_distance'][source], |
| | 'audio_filepath': audio_filepath, |
| | 'text': audio_data.get('text'), |
| | 'duration': audio_data['duration'], |
| | } |
| |
|
| | return target_cfg |
| |
|
| | def generate_noise(self, subset: str, target_cfg: dict) -> List[dict]: |
| | """ |
| | Prepare a list of dictionaries with noise configuration. |
| | |
| | Args: |
| | subset: string denoting a subset which will be used to select noise audio. |
| | target_cfg: dictionary with target configuration. This is used determine |
| | the minimal required duration for the noise signal. |
| | |
| | Returns: |
| | List of dictionary with noise configuration, including audio information |
| | for one or more noise files. |
| | """ |
| | if (noise_metadata := self.metadata[subset]['noise']) is None: |
| | return None |
| |
|
| | noise_cfg = self.get_audio_list( |
| | noise_metadata, min_duration=target_cfg['duration'], manifest_filepath=self.cfg.noise[subset] |
| | ) |
| |
|
| | return noise_cfg |
| |
|
| | def generate_interference(self, subset: str, target_cfg: dict) -> List[dict]: |
| | """ |
| | Prepare a list of dictionaries with interference configuration. |
| | |
| | Args: |
| | subset: string denoting a subset which will be used to select interference audio. |
| | target_cfg: dictionary with target configuration. This is used to determine |
| | the minimal required duration for the noise signal. |
| | |
| | Returns: |
| | List of dictionary with interference configuration, including source index and audio information |
| | for one or more interference sources. |
| | """ |
| | if (interference_metadata := self.metadata[subset]['interference']) is None: |
| | |
| | return None |
| |
|
| | |
| | max_num_sources = self.cfg.interference.get('max_num_interferers', 0) |
| | interference_probability = self.cfg.interference.get('interference_probability', 0) |
| |
|
| | if ( |
| | max_num_sources >= 1 |
| | and interference_probability > 0 |
| | and self.random.uniform(low=0.0, high=1.0) < interference_probability |
| | ): |
| | |
| | num_interferers = self.random.integers(low=1, high=max_num_sources + 1) |
| | else: |
| | |
| | return None |
| |
|
| | |
| | room_index = target_cfg['room_index'] |
| | room_data = self.metadata[subset]['room'][room_index] |
| | feasible_sources = list(range(room_data['num_sources'])) |
| | |
| | feasible_sources.remove(target_cfg['source']) |
| |
|
| | |
| | min_azimuth_to_target = self.cfg.interference.get('min_azimuth_to_target', 0) |
| |
|
| | |
| | interference_cfg = [] |
| | for n in range(num_interferers): |
| |
|
| | |
| | source = None |
| | while len(feasible_sources) > 0 and source is None: |
| |
|
| | |
| | source = self.random.choice(feasible_sources) |
| | feasible_sources.remove(source) |
| |
|
| | |
| | if min_azimuth_to_target > 0: |
| | source_azimuth = room_data['source_azimuth'][source] |
| | azimuth_diff = wrap_to_180(source_azimuth - target_cfg['azimuth']) |
| | if abs(azimuth_diff) < min_azimuth_to_target: |
| | |
| | source = None |
| | continue |
| |
|
| | if source is None: |
| | logging.warning('Could not select a feasible interference source %d of %s', n, num_interferers) |
| |
|
| | |
| | return interference_cfg if interference_cfg else None |
| |
|
| | |
| | interfering_source = { |
| | 'source': source, |
| | 'azimuth': room_data['source_azimuth'][source], |
| | 'elevation': room_data['source_elevation'][source], |
| | 'distance': room_data['source_distance'][source], |
| | 'audio': self.get_audio_list( |
| | interference_metadata, |
| | min_duration=target_cfg['duration'], |
| | manifest_filepath=self.cfg.interference[subset], |
| | ), |
| | } |
| |
|
| | |
| | interference_cfg.append(interfering_source) |
| |
|
| | return interference_cfg |
| |
|
| | def generate_mix(self, subset: str) -> dict: |
| | """Generate scaling parameters for mixing |
| | the target speech at the microphone, background noise |
| | and interference signal at the microphone. |
| | |
| | The output dictionary contains the following information |
| | ``` |
| | rsnr: reverberant signal-to-noise ratio |
| | rsir: reverberant signal-to-interference ratio |
| | ref_mic: reference microphone for calculating the metrics |
| | ref_mic_rms: RMS of the signal at the reference microphone |
| | ``` |
| | |
| | Args: |
| | subset: string denoting the subset of configuration |
| | |
| | Returns: |
| | Dictionary containing configured RSNR, RSIR, ref_mic |
| | and RMS on ref_mic. |
| | """ |
| | mix_cfg = dict() |
| |
|
| | for key in ['rsnr', 'rsir', 'ref_mic', 'ref_mic_rms']: |
| | if key in self.cfg.mix[subset]: |
| | |
| | value = self.cfg.mix[subset][key] |
| | else: |
| | |
| | value = self.cfg.mix[key] |
| |
|
| | if value is None: |
| | mix_cfg[key] = None |
| | elif np.isscalar(value): |
| | mix_cfg[key] = value |
| | elif len(value) == 2: |
| | |
| | mix_cfg[key] = self.random.integers(low=value[0], high=value[1] + 1) |
| | else: |
| | |
| | mix_cfg[key] = self.random.choice(value) |
| |
|
| | return mix_cfg |
| |
|
| | def generate(self): |
| | """Generate a corpus of microphone signals by mixing target, background noise |
| | and interference signals. |
| | |
| | This method will prepare randomized examples based on the current configuration, |
| | run simulations and save results to output_dir. |
| | """ |
| | logging.info('Generate mixed signals') |
| |
|
| | |
| | self.random = default_rng(seed=self.cfg.random_seed) |
| |
|
| | |
| | output_dir = self.cfg.output_dir |
| | if output_dir.endswith('.yaml'): |
| | output_dir = output_dir[:-5] |
| |
|
| | |
| | logging.info('Output dir set to: %s', output_dir) |
| |
|
| | |
| | for subset in self.subsets: |
| |
|
| | output_dir_subset = os.path.join(output_dir, subset) |
| | examples = [] |
| |
|
| | if not os.path.exists(output_dir_subset): |
| | logging.info('Creating output directory: %s', output_dir_subset) |
| | os.makedirs(output_dir_subset) |
| | elif os.path.isdir(output_dir_subset) and len(os.listdir(output_dir_subset)) > 0: |
| | raise RuntimeError(f'Output directory {output_dir_subset} is not empty.') |
| |
|
| | num_examples = self.cfg.mix[subset].num |
| | logging.info('Preparing %d examples for subset %s', num_examples, subset) |
| |
|
| | |
| | for n_example in tqdm(range(num_examples), total=num_examples, desc=f'Preparing {subset}'): |
| | |
| | target_cfg = self.generate_target(subset) |
| | noise_cfg = self.generate_noise(subset, target_cfg) |
| | interference_cfg = self.generate_interference(subset, target_cfg) |
| | mix_cfg = self.generate_mix(subset) |
| |
|
| | |
| | base_output_filepath = os.path.join(output_dir_subset, f'{subset}_example_{n_example:09d}') |
| |
|
| | |
| | example = { |
| | 'sample_rate': self.sample_rate, |
| | 'target_cfg': target_cfg, |
| | 'noise_cfg': noise_cfg, |
| | 'interference_cfg': interference_cfg, |
| | 'mix_cfg': mix_cfg, |
| | 'base_output_filepath': base_output_filepath, |
| | } |
| |
|
| | examples.append(example) |
| |
|
| | |
| | num_workers = self.cfg.num_workers |
| | if num_workers is not None and num_workers > 1: |
| | logging.info(f'Simulate using {num_workers} workers') |
| | with multiprocessing.Pool(processes=num_workers) as pool: |
| | metadata = list( |
| | tqdm( |
| | pool.imap(simulate_room_mix_kwargs, examples), |
| | total=len(examples), |
| | desc=f'Simulating {subset}', |
| | ) |
| | ) |
| | else: |
| | logging.info('Simulate using a single worker') |
| | metadata = [] |
| | for example in tqdm(examples, total=len(examples), desc=f'Simulating {subset}'): |
| | metadata.append(simulate_room_mix(**example)) |
| |
|
| | |
| | manifest_filepath = os.path.join(output_dir, f'{subset}_manifest.json') |
| |
|
| | if os.path.exists(manifest_filepath) and os.path.isfile(manifest_filepath): |
| | raise RuntimeError(f'Manifest config file exists: {manifest_filepath}') |
| |
|
| | |
| | for data in tqdm(metadata, total=len(metadata), desc=f'Making filepaths relative {subset}'): |
| | for key, val in data.items(): |
| | if key.endswith('_filepath') and val is not None: |
| | data[key] = os.path.relpath(val, start=output_dir) |
| |
|
| | write_manifest(manifest_filepath, metadata) |
| |
|
| | |
| | plot_filepath = os.path.join(output_dir, f'{subset}_info.png') |
| |
|
| | if os.path.exists(plot_filepath) and os.path.isfile(plot_filepath): |
| | raise RuntimeError(f'Plot file exists: {plot_filepath}') |
| |
|
| | plot_mix_manifest_info(manifest_filepath, plot_filepath=plot_filepath) |
| |
|
| | |
| | config_filepath = os.path.join(output_dir, 'config.yaml') |
| | if os.path.exists(config_filepath) and os.path.isfile(config_filepath): |
| | raise RuntimeError(f'Output config file exists: {config_filepath}') |
| |
|
| | OmegaConf.save(self.cfg, config_filepath, resolve=True) |
| |
|
| |
|
| | def convolve_rir(signal: np.ndarray, rir: np.ndarray) -> np.ndarray: |
| | """Convolve signal with a possibly multichannel IR in rir, i.e., |
| | calculate the following for each channel m: |
| | |
| | signal_m = rir_m \ast signal |
| | |
| | Args: |
| | signal: single-channel signal (samples,) |
| | rir: single- or multi-channel IR, (samples,) or (samples, channels) |
| | |
| | Returns: |
| | out: same length as signal, same number of channels as rir, shape (samples, channels) |
| | """ |
| | num_samples = len(signal) |
| | if rir.ndim == 1: |
| | |
| | out = convolve(signal, rir)[:num_samples] |
| | elif rir.ndim == 2: |
| | num_channels = rir.shape[1] |
| | out = np.zeros((num_samples, num_channels)) |
| | for m in range(num_channels): |
| | out[:, m] = convolve(signal, rir[:, m])[:num_samples] |
| | else: |
| | raise RuntimeError(f'RIR with {rir.ndim} not supported') |
| |
|
| | return out |
| |
|
| |
|
| | def calculate_drr(rir: np.ndarray, sample_rate: float, n_direct: List[int], n_0_ms=2.5) -> List[float]: |
| | """Calculate direct-to-reverberant ratio (DRR) from the measured RIR. |
| | |
| | Calculation is done as in eq. (3) from [1]. |
| | |
| | Args: |
| | rir: room impulse response, shape (num_samples, num_channels) |
| | sample_rate: sample rate for the impulse response |
| | n_direct: direct path delay |
| | n_0_ms: window around n_direct for calculating the direct path energy |
| | |
| | Returns: |
| | Calculated DRR for each channel of the input RIR. |
| | |
| | References: |
| | [1] Eaton et al, The ACE challenge: Corpus description and performance evaluation, WASPAA 2015 |
| | """ |
| | |
| | n_0 = int(n_0_ms * sample_rate / 1000) |
| |
|
| | len_rir, num_channels = rir.shape |
| | drr = [None] * num_channels |
| | for m in range(num_channels): |
| |
|
| | |
| | dir_start = max(n_direct[m] - n_0, 0) |
| | dir_end = n_direct[m] + n_0 |
| |
|
| | |
| | pow_dir = np.sum(np.abs(rir[dir_start:dir_end, m]) ** 2) / len_rir |
| |
|
| | |
| | pow_reverberant = (np.sum(np.abs(rir[0:dir_start, m]) ** 2) + np.sum(np.abs(rir[dir_end:, m]) ** 2)) / len_rir |
| |
|
| | |
| | drr[m] = pow2db(pow_dir / pow_reverberant) |
| |
|
| | return drr |
| |
|
| |
|
| | def normalize_max(x: np.ndarray, max_db: float = 0, eps: float = 1e-16) -> np.ndarray: |
| | """Normalize max input value to max_db full scale (±1). |
| | |
| | Args: |
| | x: input signal |
| | max_db: desired max magnitude compared to full scale |
| | eps: small regularization constant |
| | |
| | Returns: |
| | Normalized signal with max absolute value max_db. |
| | """ |
| | max_val = db2mag(max_db) |
| | return max_val * x / (np.max(np.abs(x)) + eps) |
| |
|
| |
|
| | def simultaneously_active_rms( |
| | x: np.ndarray, |
| | y: np.ndarray, |
| | sample_rate: float, |
| | rms_threshold_db: float = -40, |
| | window_len_ms: float = 200, |
| | min_active_duration: float = 0.5, |
| | ) -> Tuple[float, float]: |
| | """Calculate RMS over segments where both input signals are active. |
| | |
| | Args: |
| | x: first input signal |
| | y: second input signal |
| | sample_rate: sample rate for input signals in Hz |
| | rms_threshold_db: threshold for determining activity of the signal, relative |
| | to max absolute value |
| | window_len_ms: window length in milliseconds, used for calculating segmental RMS |
| | min_active_duration: minimal duration of the active segments |
| | |
| | Returns: |
| | RMS value over active segments for x and y. |
| | """ |
| | if len(x) != len(y): |
| | raise RuntimeError(f'Expecting signals of same length: len(x)={len(x)}, len(y)={len(y)}') |
| | window_len = int(window_len_ms * sample_rate / 1000) |
| | rms_threshold = db2mag(rms_threshold_db) |
| |
|
| | x_normalized = normalize_max(x) |
| | y_normalized = normalize_max(y) |
| |
|
| | x_active_power = y_active_power = active_len = 0 |
| | for start in range(0, len(x) - window_len, window_len): |
| | window = slice(start, start + window_len) |
| |
|
| | |
| | x_window_rms = rms(x_normalized[window]) |
| | y_window_rms = rms(y_normalized[window]) |
| |
|
| | if x_window_rms > rms_threshold and y_window_rms > rms_threshold: |
| | |
| | x_active_power += np.sum(np.abs(x[window]) ** 2) |
| | y_active_power += np.sum(np.abs(y[window]) ** 2) |
| | active_len += window_len |
| |
|
| | if active_len < int(min_active_duration * sample_rate): |
| | raise RuntimeError( |
| | f'Signals are simultaneously active less than {min_active_duration} s: only {active_len/sample_rate} s' |
| | ) |
| |
|
| | |
| | x_active_power /= active_len |
| | y_active_power /= active_len |
| |
|
| | return np.sqrt(x_active_power), np.sqrt(y_active_power) |
| |
|
| |
|
| | def scaled_disturbance( |
| | signal: np.ndarray, |
| | disturbance: np.ndarray, |
| | sdr: float, |
| | sample_rate: float = None, |
| | ref_channel: int = 0, |
| | eps: float = 1e-16, |
| | ) -> np.ndarray: |
| | """ |
| | Args: |
| | signal: numpy array, shape (num_samples, num_channels) |
| | disturbance: numpy array, same shape as signal |
| | sdr: desired signal-to-disturbance ration |
| | sample_rate: sample rate of the input signals |
| | ref_channel: ref mic used to calculate RMS |
| | eps: regularization constant |
| | |
| | Returns: |
| | Scaled disturbance, so that signal-to-disturbance ratio at ref_channel |
| | is approximately equal to input SDR during simultaneously active |
| | segment of signal and disturbance. |
| | """ |
| | if signal.shape != disturbance.shape: |
| | raise ValueError(f'Signal and disturbance shapes do not match: {signal.shape} != {disturbance.shape}') |
| |
|
| | |
| | signal_rms, disturbance_rms = simultaneously_active_rms( |
| | signal[:, ref_channel], disturbance[:, ref_channel], sample_rate=sample_rate |
| | ) |
| | disturbance_gain = db2mag(-sdr) * signal_rms / (disturbance_rms + eps) |
| | |
| | scaled_disturbance = disturbance_gain * disturbance |
| | return scaled_disturbance |
| |
|
| |
|
| | def load_audio_from_multiple_files(items: List[Dict], sample_rate: int, total_len: int) -> np.ndarray: |
| | """Load an audio from multiple files and concatenate into a single signal. |
| | |
| | Args: |
| | items: list of dictionaries, each item has audio_filepath, offset, and duration |
| | sample_rate: desired sample rate of the signal |
| | total_len: total length in samples |
| | |
| | Returns: |
| | Numpy array, shape (total_len, num_channels) |
| | """ |
| | if items is None: |
| | |
| | return None |
| |
|
| | signal = None |
| | samples_to_load = total_len |
| | |
| | for item in items: |
| | check_min_sample_rate(item['audio_filepath'], sample_rate) |
| | |
| | segment = AudioSegment.from_file( |
| | item['audio_filepath'], target_sr=sample_rate, offset=item['offset'], duration=item['duration'], |
| | ) |
| | |
| | segment_samples = normalize_max(segment.samples) |
| | |
| | signal = np.concatenate((signal, segment_samples)) if signal is not None else segment_samples |
| | |
| | samples_to_load -= len(segment_samples) |
| |
|
| | if samples_to_load <= 0: |
| | break |
| | |
| | signal = signal[:total_len, ...] |
| |
|
| | return signal |
| |
|
| |
|
| | def check_min_sample_rate(filepath: str, sample_rate: float): |
| | """Make sure the file's sample rate is at least sample_rate. |
| | This will make sure that we have only downsampling if loading |
| | this file, while upsampling is not permitted. |
| | |
| | Args: |
| | filepath: path to a file |
| | sample_rate: desired sample rate |
| | """ |
| | file_sample_rate = librosa.get_samplerate(path=filepath) |
| | if file_sample_rate < sample_rate: |
| | raise RuntimeError( |
| | f'Sample rate ({file_sample_rate}) is lower than the desired sample rate ({sample_rate}). File: {filepath}.' |
| | ) |
| |
|
| |
|
| | def simulate_room_mix( |
| | sample_rate: int, |
| | target_cfg: dict, |
| | noise_cfg: List[dict], |
| | interference_cfg: dict, |
| | mix_cfg: dict, |
| | base_output_filepath: str, |
| | max_amplitude: float = 0.999, |
| | eps: float = 1e-16, |
| | ) -> dict: |
| | """Simulate mixture signal at the microphone, including target, noise and |
| | interference signals and mixed at specific RSNR and RSIR. |
| | |
| | Args: |
| | sample_rate: Sample rate for all signals |
| | target_cfg: Dictionary with configuration of the target. Includes |
| | room_filepath, source index, audio_filepath, duration |
| | noise_cfg: List of dictionaries, where each item includes audio_filepath, |
| | offset and duration. |
| | interference_cfg: List of dictionaries, where each item contains source |
| | index |
| | mix_cfg: Dictionary with the mixture configuration. Includes RSNR, RSIR, |
| | ref_mic and ref_mic_rms. |
| | base_output_filepath: All output audio files will be saved with this prefix by |
| | adding a diffierent suffix for each component, e.g., _mic.wav. |
| | max_amplitude: Maximum amplitude of the mic signal, used to prevent clipping. |
| | eps: Small regularization constant. |
| | |
| | Returns: |
| | Dictionary with metadata based on the mixture setup and |
| | simulation results. This corresponds to a line of the |
| | output manifest file. |
| | """ |
| | |
| | def load_rir(room_filepath: str, source: int, sample_rate: float, rir_key: str = 'rir') -> np.ndarray: |
| | """Load a RIR and check that the sample rate is matching the desired sample rate |
| | |
| | Args: |
| | room_filepath: Path to a room simulation in an h5 file |
| | source: Index of the desired source |
| | sample_rate: Sample rate of the simulation |
| | rir_key: Key of the RIR to load from the simulation. |
| | |
| | Returns: |
| | Numpy array with shape (num_samples, num_channels) |
| | """ |
| | rir, rir_sample_rate = load_rir_simulation(room_filepath, source=source, rir_key=rir_key) |
| | if rir_sample_rate != sample_rate: |
| | raise RuntimeError( |
| | f'RIR sample rate ({sample_rate}) is not matching the expected sample rate ({sample_rate}). File: {room_filepath}' |
| | ) |
| | return rir |
| |
|
| | |
| | target_rir = load_rir(target_cfg['room_filepath'], source=target_cfg['source'], sample_rate=sample_rate) |
| | target_rir_anechoic = load_rir( |
| | target_cfg['room_filepath'], source=target_cfg['source'], sample_rate=sample_rate, rir_key='anechoic' |
| | ) |
| |
|
| | |
| | check_min_sample_rate(target_cfg['audio_filepath'], sample_rate) |
| | target_segment = AudioSegment.from_file( |
| | target_cfg['audio_filepath'], target_sr=sample_rate, duration=target_cfg['duration'] |
| | ) |
| | if target_segment.num_channels > 1: |
| | raise RuntimeError( |
| | f'Expecting single-channel source signal, but received {target_segment.num_channels}. File: {target_cfg["audio_filepath"]}' |
| | ) |
| | target_signal = normalize_max(target_segment.samples) |
| |
|
| | |
| | target_reverberant = convolve_rir(target_signal, target_rir) |
| | target_anechoic = convolve_rir(target_signal, target_rir_anechoic) |
| |
|
| | |
| | noise = load_audio_from_multiple_files(noise_cfg, sample_rate=sample_rate, total_len=len(target_reverberant)) |
| |
|
| | |
| | if interference_cfg is None: |
| | interference = None |
| | else: |
| | |
| | interference = 0 |
| | for i_cfg in interference_cfg: |
| | |
| | i_signal = load_audio_from_multiple_files( |
| | i_cfg['audio'], sample_rate=sample_rate, total_len=len(target_reverberant) |
| | ) |
| | |
| | i_rir = load_rir(target_cfg['room_filepath'], source=i_cfg['source'], sample_rate=sample_rate) |
| | |
| | i_reverberant = convolve_rir(i_signal, i_rir) |
| | |
| | interference += i_reverberant |
| |
|
| | |
| | mix = target_reverberant.copy() |
| |
|
| | if noise is not None: |
| | noise = scaled_disturbance( |
| | signal=target_reverberant, |
| | disturbance=noise, |
| | sdr=mix_cfg['rsnr'], |
| | sample_rate=sample_rate, |
| | ref_channel=mix_cfg['ref_mic'], |
| | ) |
| | |
| | mix += noise |
| |
|
| | if interference is not None: |
| | interference = scaled_disturbance( |
| | signal=target_reverberant, |
| | disturbance=interference, |
| | sdr=mix_cfg['rsir'], |
| | sample_rate=sample_rate, |
| | ref_channel=mix_cfg['ref_mic'], |
| | ) |
| | |
| | mix += interference |
| |
|
| | |
| | mix_rms = rms(mix[:, mix_cfg['ref_mic']]) |
| | global_gain = db2mag(mix_cfg['ref_mic_rms']) / (mix_rms + eps) |
| | mix_max = np.max(np.abs(mix)) |
| | if (clipped_max := mix_max * global_gain) > max_amplitude: |
| | |
| | clipping_prevention_gain = max_amplitude / clipped_max |
| | global_gain *= clipping_prevention_gain |
| | mix_cfg['ref_mic_rms'] += mag2db(clipping_prevention_gain) |
| |
|
| | logging.debug( |
| | 'Clipping prevented for example %s (protection gain: %.2f dB)', |
| | base_output_filepath, |
| | mag2db(clipping_prevention_gain), |
| | ) |
| |
|
| | |
| | mix *= global_gain |
| | target_reverberant *= global_gain |
| | target_anechoic *= global_gain |
| | if noise is not None: |
| | noise *= global_gain |
| | if interference is not None: |
| | interference *= global_gain |
| |
|
| | |
| | mic_filepath = base_output_filepath + '_mic.wav' |
| | sf.write(mic_filepath, mix, sample_rate, 'float') |
| |
|
| | target_reverberant_filepath = base_output_filepath + '_target_reverberant.wav' |
| | sf.write(target_reverberant_filepath, target_reverberant, sample_rate, 'float') |
| |
|
| | target_anechoic_filepath = base_output_filepath + '_target_anechoic.wav' |
| | sf.write(target_anechoic_filepath, target_anechoic, sample_rate, 'float') |
| |
|
| | if noise is not None: |
| | noise_filepath = base_output_filepath + '_noise.wav' |
| | sf.write(noise_filepath, noise, sample_rate, 'float') |
| | else: |
| | noise_filepath = None |
| |
|
| | if interference is not None: |
| | interference_filepath = base_output_filepath + '_interference.wav' |
| | sf.write(interference_filepath, interference, sample_rate, 'float') |
| | else: |
| | interference_filepath = None |
| |
|
| | |
| | direct_path_delay = np.argmax(target_rir_anechoic, axis=0) |
| | drr = calculate_drr(target_rir, sample_rate, direct_path_delay) |
| |
|
| | metadata = { |
| | 'audio_filepath': mic_filepath, |
| | 'target_reverberant_filepath': target_reverberant_filepath, |
| | 'target_anechoic_filepath': target_anechoic_filepath, |
| | 'noise_filepath': noise_filepath, |
| | 'interference_filepath': interference_filepath, |
| | 'text': target_cfg.get('text'), |
| | 'duration': target_cfg['duration'], |
| | 'target_cfg': target_cfg, |
| | 'noise_cfg': noise_cfg, |
| | 'interference_cfg': interference_cfg, |
| | 'mix_cfg': mix_cfg, |
| | 'rt60': target_cfg.get('rt60'), |
| | 'drr': drr, |
| | 'rsnr': None if noise_cfg is None else mix_cfg['rsnr'], |
| | 'rsir': None if interference_cfg is None else mix_cfg['rsir'], |
| | } |
| |
|
| | return convert_numpy_to_serializable(metadata) |
| |
|
| |
|
| | def simulate_room_mix_kwargs(kwargs: dict) -> dict: |
| | """Wrapper around `simulate_room_mix` to handle kwargs. |
| | |
| | `pool.map(simulate_room_kwargs, examples)` would be |
| | equivalent to `pool.starstarmap(simulate_room_mix, examples)` |
| | if `starstarmap` would exist. |
| | |
| | Args: |
| | kwargs: kwargs that are forwarded to `simulate_room_mix` |
| | |
| | Returns: |
| | Dictionary with metadata, see `simulate_room_mix` |
| | """ |
| | return simulate_room_mix(**kwargs) |
| |
|
| |
|
| | def plot_mix_manifest_info(filepath: str, plot_filepath: str = None): |
| | """Plot distribution of parameters from the manifest file. |
| | |
| | Args: |
| | filepath: path to a RIR corpus manifest file |
| | plot_filepath: path to save the plot at |
| | """ |
| | metadata = read_manifest(filepath) |
| |
|
| | |
| | target_distance = [] |
| | target_azimuth = [] |
| | target_elevation = [] |
| | target_duration = [] |
| |
|
| | |
| | rt60 = [] |
| | drr = [] |
| |
|
| | |
| | rsnr = [] |
| | rsir = [] |
| |
|
| | |
| | for data in metadata: |
| | |
| | target_distance.append(data['target_cfg']['distance']) |
| | target_azimuth.append(data['target_cfg']['azimuth']) |
| | target_elevation.append(data['target_cfg']['elevation']) |
| | target_duration.append(data['duration']) |
| |
|
| | |
| | rt60.append(data['rt60']) |
| | drr += data['drr'] |
| |
|
| | |
| | rsnr.append(data['rsnr']) |
| | rsir.append(data['rsir']) |
| |
|
| | |
| | plt.figure(figsize=(12, 6)) |
| |
|
| | plt.subplot(2, 4, 1) |
| | plt.hist(target_distance, label='distance') |
| | plt.xlabel('distance / m') |
| | plt.ylabel('# examples') |
| | plt.title('Target-to-array distance') |
| |
|
| | plt.subplot(2, 4, 2) |
| | plt.hist(target_azimuth, label='azimuth') |
| | plt.xlabel('azimuth / deg') |
| | plt.ylabel('# examples') |
| | plt.title('Target-to-array azimuth') |
| |
|
| | plt.subplot(2, 4, 3) |
| | plt.hist(target_elevation, label='elevation') |
| | plt.xlabel('elevation / deg') |
| | plt.ylabel('# examples') |
| | plt.title('Target-to-array elevation') |
| |
|
| | plt.subplot(2, 4, 4) |
| | plt.hist(target_duration, label='duration') |
| | plt.xlabel('time / s') |
| | plt.ylabel('# examples') |
| | plt.title('Target duration') |
| |
|
| | plt.subplot(2, 4, 5) |
| | plt.hist(rt60, label='RT60') |
| | plt.xlabel('RT60 / s') |
| | plt.ylabel('# examples') |
| | plt.title('RT60') |
| |
|
| | plt.subplot(2, 4, 6) |
| | plt.hist(drr, label='DRR') |
| | plt.xlabel('DRR / dB') |
| | plt.ylabel('# examples') |
| | plt.title('DRR (average over mics)') |
| |
|
| | if not any([val is None for val in rsnr]): |
| | plt.subplot(2, 4, 7) |
| | plt.hist(rsnr, label='RSNR') |
| | plt.xlabel('RSNR / dB') |
| | plt.ylabel('# examples') |
| | plt.title('RSNR') |
| |
|
| | if not any([val is None for val in rsir]): |
| | plt.subplot(2, 4, 8) |
| | plt.hist(rsir, label='RSIR') |
| | plt.xlabel('RSIR / dB') |
| | plt.ylabel('# examples') |
| | plt.title('RSIR') |
| |
|
| | for n in range(8): |
| | plt.subplot(2, 4, n + 1) |
| | plt.grid() |
| | plt.legend(loc='lower left') |
| |
|
| | plt.tight_layout() |
| |
|
| | if plot_filepath is not None: |
| | plt.savefig(plot_filepath) |
| | plt.close() |
| | logging.info('Plot saved at %s', plot_filepath) |
| |
|