| import os |
| import pdb |
|
|
| import numpy as np |
| import sox |
| from scipy.io import wavfile |
|
|
| from .logs import get_logger_from_arg |
|
|
|
|
| def reformat_and_trim_wav_file(wav_file, fs, bit_depth, nb_channels, overwrite=True, out_path=None, |
| silence_threshold=0.1, min_silence_duration=0.01, silence_pad=True, logger=None): |
| """ Format WAV files with the specified parameters using SoX |
| |
| :param wav_file: WAV file to format (full path) |
| :param fs: desired sampling frequency of WAV file |
| :param bit_depth: desired bit depth of WAV file |
| :param nb_channels: desired number of channels of WAV file |
| :param overwrite: overwrite existing WAV file with their new version |
| if not, a folder is created to store the new files |
| :param out_path: path to save reformatted WAV file |
| only used when overwrite is set to False |
| :param silence_threshold: threshold to detect silences |
| :param min_silence_duration: min silence duration to remove |
| only used when silence_threshold is superior to 0. |
| :param silence_pad: pad audio with silences at the beginning and the end |
| :param logger: arg to create logger object |
| """ |
| |
| logger = get_logger_from_arg(logger) |
|
|
| |
|
|
| |
| initial_path = os.path.normpath(wav_file).strip() |
|
|
| if overwrite: |
| |
| out_path = os.path.join(os.path.dirname(initial_path), |
| os.path.basename(initial_path).replace('.wav', '_tmp.wav')) |
| else: |
| if out_path: |
| |
| out_path = os.path.normpath(out_path).strip() |
| else: |
| |
| out_path = os.path.join(os.path.dirname(initial_path), f'processed_{fs}') |
| os.makedirs(out_path, exist_ok=True) |
| out_path = os.path.join(out_path, os.path.basename(initial_path)) |
|
|
| |
|
|
| |
| tfm = sox.Transformer() |
|
|
| |
| if silence_threshold > 0.: |
| |
| tfm.silence(location=1, silence_threshold=silence_threshold, |
| min_silence_duration=min_silence_duration, buffer_around_silence=True) |
| |
| tfm.silence(location=-1, silence_threshold=silence_threshold, |
| min_silence_duration=min_silence_duration, buffer_around_silence=True) |
|
|
| |
| tfm.rate(samplerate=fs, quality='h') |
|
|
| |
| tfm.convert(samplerate=None, n_channels=nb_channels, bitdepth=bit_depth) |
|
|
| |
| if silence_pad: |
| tfm.pad(start_duration=0.01, end_duration=0.01) |
|
|
| |
| logger.info(f'SoX transformer effects: {tfm.effects_log}') |
|
|
| |
| print(initial_path, out_path) |
|
|
| tfm.build(initial_path, out_path) |
|
|
| |
|
|
| |
| if overwrite: |
| os.remove(initial_path) |
| os.rename(out_path, initial_path) |
|
|
|
|
| def read_wavfile(file_path, rescale=False, desired_fs=None, desired_nb_channels=None, out_type='float32', logger=None): |
| """ Read a WAV file and return the samples in a float32 numpy array |
| |
| :param file_path: path to the file to read |
| :param rescale: rescale the file to get amplitudes in the range between -1 and +1 |
| only the range is rescaled, not the amplitude |
| :param desired_fs: frequency expected from the WAV file |
| if not specified, the original WAV file sampling frequency is used |
| :param desired_nb_channels: number of channels expected from the WAV file |
| if not specified, the original WAV number of channels is used |
| :param out_type: desired output type of the audio waveform |
| :param logger: arg to create logger object |
| |
| :return: sampling frequency and samples |
| """ |
| |
| logger = get_logger_from_arg(logger) |
|
|
| |
| assert ('int' in out_type or 'float' in out_type), \ |
| logger.error(f'Inconsistent argument: only output of type "int" or "float" are supported, not "{out_type}"') |
| if rescale: |
| assert ('float' in out_type), logger.error(f'Inconsistent arguments: cannot rescale if out_type={out_type}') |
|
|
| |
| file_path = os.path.normpath(file_path).strip() |
|
|
| try: |
| |
| fs, x = wavfile.read(file_path) |
| |
| current_bit_depth = int(str(x.dtype).replace('int', '').replace('uint', '').replace('float', '')) |
| if desired_fs and fs != desired_fs or desired_nb_channels and len(x.shape) != desired_nb_channels: |
| raise BadSamplingFrequencyError(f'Format readable but requirements not met -- currently is ' |
| f'{fs}Hz/{current_bit_depth} bits/{len(x.shape)} channels') |
|
|
| except (ValueError, BadSamplingFrequencyError) as e: |
| |
| tmp_wav = os.path.join(os.path.dirname(file_path), |
| os.path.basename(file_path).replace('.wav', '_tmp.wav')) |
|
|
| |
| desired_fs = desired_fs if desired_fs else 22050 |
| desired_nb_channels = desired_nb_channels if desired_nb_channels else 1 |
|
|
| |
| desired_bit_depth = int(out_type.replace('int', '').replace('uint', '').replace('float', '')) |
|
|
| |
| logger.info(f'{file_path} -- {e}') |
| logger.info(f'converting to {desired_fs}Hz/{desired_bit_depth} bits/{desired_nb_channels} channels') |
| reformat_and_trim_wav_file(file_path, fs=desired_fs, bit_depth=desired_bit_depth, |
| nb_channels=desired_nb_channels, overwrite=False, out_path=tmp_wav, |
| silence_threshold=-1., silence_pad=False, logger=logger) |
|
|
| |
| fs, x = wavfile.read(tmp_wav) |
| os.remove(tmp_wav) |
|
|
| |
| if rescale: |
| x = _rescale_wav_to_float32(x) |
|
|
| |
| current_dtype = str(x.dtype) |
| if 'int' in current_dtype and 'float' in out_type: |
| logger.warning(f'Waveform is "{current_dtype}", converting to "{out_type}" but values will not be in ' |
| f'[-1., 1.] -- Use rescale=True to have samples in [-1., 1.]') |
| if 'float' in current_dtype: |
| assert ('int' not in out_type), logger.error(f'Waveform is "{current_dtype}", cannot convert to "{out_type}"') |
|
|
| |
| x = np.asarray(x).astype(out_type) |
|
|
| return fs, x |
|
|
|
|
| def write_wavefile(fileName, pcmData, sampling_rate, out_type='int16'): |
| """ write a WAV file from a numpy array |
| |
| :param fileName: path and file name to write to |
| :param pcmData: The numpy array containing the PCM data |
| :param sampling_rate: the sampling rate of the data |
| :param out_type: desired output type of the audio waveform |
| """ |
| current_dtype = str(pcmData.dtype) |
| if 'float' in current_dtype and out_type == 'int16': |
| data = pcmData * 2 ** 15 |
| else: |
| data = pcmData |
|
|
| data = data.astype(out_type) |
| wavfile.write(fileName, sampling_rate, data) |
|
|
|
|
| def rescale_wav_array(x, desired_dtype='float32'): |
| """ Rescale WAV array to a specified dtype |
| |
| rescales the samples in the given array from the range of its current dtype |
| to the range of the specified dtype. see ranges by type below... |
| |
| float32 samples are assumed to be in the range [-1.0,1.0], otherwise an exception is raised. |
| |
| ===================== =========== =========== ============= |
| WAV format Min Max NumPy dtype |
| ===================== =========== =========== ============= |
| 32-bit floating-point -1.0 +1.0 float32 |
| 32-bit PCM -2147483648 +2147483647 int32 |
| 16-bit PCM -32768 +32767 int16 |
| 8-bit PCM 0 255 uint8 |
| ===================== =========== =========== ============= |
| |
| :param x: audio array |
| :param desired_dtype: nuympy dtype to rescale to |
| |
| :return: the rescaled audio array in float32 |
| """ |
| y = _rescale_wav_to_float32(x) |
| z = _rescale_wav_from_float32(y, desired_dtype) |
| return z |
|
|
|
|
| def _rescale_wav_to_float32(x): |
| """ Rescale WAV array between -1.f and 1.f based on the current format |
| |
| :param x: audio array |
| |
| :return: the rescaled audio array in float32 |
| """ |
|
|
| |
| y = np.zeros(x.shape, dtype='float32') |
| if x.dtype == 'int16': |
| y = x / 32768.0 |
| elif x.dtype == 'int32': |
| y = x / 2147483648.0 |
| elif x.dtype == 'float32' or x.dtype == 'float64': |
| max_ampl = np.max(np.abs(x)) |
| if max_ampl > 1.0: |
| raise ValueError(f'float32 wav contains samples not in the range [-1., 1.] -- ' |
| f'max amplitude: {max_ampl}') |
| y = x.astype('float32') |
| elif x.dtype == 'uint8': |
| y = ((x / 255.0) - 0.5) * 2 |
| else: |
| raise TypeError(f"could not normalize wav, unsupported sample type {x.dtype}") |
|
|
| return y |
|
|
|
|
| def _rescale_wav_from_float32(x, dtype): |
| """ Rescale WAV array from between -1.f and 1.f to the provided format/dtype |
| |
| :param x: audio array |
| :param dtype: numpy dtype to scale to |
| |
| :return: the rescaled audio array in specified format/dtype |
| """ |
|
|
| max_ampl = np.max(np.abs(x)) |
| if max_ampl > 1.0: |
| raise ValueError(f'float32 wav contains samples not in the range [-1., 1.] -- ' \ |
| f'max amplitude: {max_ampl}') |
|
|
| |
| y = np.zeros(x.shape, dtype=dtype) |
| if dtype == 'int16': |
| y = x * 32768.0 |
| elif dtype == 'int32': |
| y = x * 2147483648.0 |
| elif dtype == 'float32' or dtype == 'float64': |
| y = x |
| elif dtype == 'uint8': |
| y = 255.0 * ((x / 2.0) + 0.5) |
| else: |
| raise TypeError(f"could not normalize wav, unsupported sample type {x.dtype}") |
|
|
| |
| y = y.astype(dtype) |
|
|
| return y |
|
|
|
|
| class BadSamplingFrequencyError(Exception): |
| def __init__(self, message): |
| self.message = message |
|
|