| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| """ |
| Description: |
| This script contains a collection of functions designed to handle various |
| audio processing. |
| """ |
|
|
| import random |
| import soxr |
| import soundfile |
| import torch |
| import torchaudio |
| import numpy as np |
|
|
| from pathlib import Path |
| from typing import Tuple |
| from numpy.lib.stride_tricks import sliding_window_view |
|
|
|
|
| def audio_volume_normalize(audio: np.ndarray, coeff: float = 0.2) -> np.ndarray: |
| """ |
| Normalize the volume of an audio signal. |
| |
| Parameters: |
| audio (numpy array): Input audio signal array. |
| coeff (float): Target coefficient for normalization, default is 0.2. |
| |
| Returns: |
| numpy array: The volume-normalized audio signal. |
| """ |
| |
| temp = np.sort(np.abs(audio)) |
|
|
| |
| if temp[-1] < 0.1: |
| scaling_factor = max( |
| temp[-1], 1e-3 |
| ) |
| audio = audio / scaling_factor * 0.1 |
|
|
| |
| temp = temp[temp > 0.01] |
| L = temp.shape[0] |
|
|
| |
| if L <= 10: |
| return audio |
|
|
| |
| volume = np.mean(temp[int(0.9 * L) : int(0.99 * L)]) |
|
|
| |
| audio = audio * np.clip(coeff / volume, a_min=0.1, a_max=10) |
|
|
| |
| max_value = np.max(np.abs(audio)) |
| if max_value > 1: |
| audio = audio / max_value |
|
|
| return audio |
|
|
|
|
| def load_audio( |
| adfile: Path, |
| sampling_rate: int = None, |
| length: int = None, |
| volume_normalize: bool = False, |
| segment_duration: int = None, |
| ) -> np.ndarray: |
| r"""Load audio file with target sampling rate and lsength |
| |
| Args: |
| adfile (Path): path to audio file. |
| sampling_rate (int, optional): target sampling rate. Defaults to None. |
| length (int, optional): target audio length. Defaults to None. |
| volume_normalize (bool, optional): whether perform volume normalization. Defaults to False. |
| segment_duration (int): random select a segment with duration of {segment_duration}s. |
| Defualt to None which means the whole audio will be used. |
| |
| Returns: |
| audio (np.ndarray): audio |
| """ |
|
|
| audio, sr = soundfile.read(adfile) |
| if len(audio.shape) > 1: |
| audio = audio[:, 0] |
|
|
| if sampling_rate is not None and sr != sampling_rate: |
| audio = soxr.resample(audio, sr, sampling_rate, quality="VHQ") |
| sr = sampling_rate |
|
|
| if segment_duration is not None: |
| seg_length = int(sr * segment_duration) |
| audio = random_select_audio_segment(audio, seg_length) |
|
|
| |
| if volume_normalize: |
| audio = audio_volume_normalize(audio) |
| |
| if length is not None: |
| assert abs(audio.shape[0] - length) < 1000 |
| if audio.shape[0] > length: |
| audio = audio[:length] |
| else: |
| audio = np.pad(audio, (0, int(length - audio.shape[0]))) |
| return audio |
|
|
|
|
| def random_select_audio_segment(audio: np.ndarray, length: int) -> np.ndarray: |
| """get an audio segment given the length |
| |
| Args: |
| audio (np.ndarray): |
| length (int): audio length = sampling_rate * duration |
| """ |
| if audio.shape[0] < length: |
| audio = np.pad(audio, (0, int(length - audio.shape[0]))) |
| start_index = random.randint(0, audio.shape[0] - length) |
| end_index = int(start_index + length) |
|
|
| return audio[start_index:end_index] |
|
|
|
|
| def audio_highpass_filter(audio, sample_rate, highpass_cutoff_freq): |
| """apply highpass fileter to audio |
| |
| Args: |
| audio (np.ndarray): |
| sample_rate (ind): |
| highpass_cutoff_freq (int): |
| """ |
|
|
| audio = torchaudio.functional.highpass_biquad( |
| torch.from_numpy(audio), sample_rate, cutoff_freq=highpass_cutoff_freq |
| ) |
| return audio.numpy() |
|
|
|
|
| def stft( |
| x: torch.Tensor, |
| fft_size: int, |
| hop_size: int, |
| win_length: int, |
| window: str, |
| use_complex: bool = False, |
| ) -> torch.Tensor: |
| """Perform STFT and convert to magnitude spectrogram. |
| Args: |
| x (Tensor): Input signal tensor (B, T). |
| fft_size (int): FFT size. |
| hop_size (int): Hop size. |
| win_length (int): Window length. |
| window (str): Window function type. |
| Returns: |
| Tensor: Magnitude spectrogram (B, #frames, fft_size // 2 + 1). |
| """ |
|
|
| x_stft = torch.stft( |
| x, fft_size, hop_size, win_length, window.to(x.device), return_complex=True |
| ) |
|
|
| |
| if not use_complex: |
| return torch.sqrt( |
| torch.clamp(x_stft.real**2 + x_stft.imag**2, min=1e-7, max=1e3) |
| ).transpose(2, 1) |
| else: |
| res = torch.cat([x_stft.real.unsqueeze(1), x_stft.imag.unsqueeze(1)], dim=1) |
| res = res.transpose(2, 3) |
| return res |
|
|
|
|
| def detect_speech_boundaries( |
| wav: np.ndarray, |
| sample_rate: int, |
| window_duration: float = 0.1, |
| energy_threshold: float = 0.01, |
| margin_factor: int = 2 |
| ) -> Tuple[int, int]: |
| """Detect the start and end points of speech in an audio signal using RMS energy. |
| |
| Args: |
| wav: Input audio signal array with values in [-1, 1] |
| sample_rate: Audio sample rate in Hz |
| window_duration: Duration of detection window in seconds |
| energy_threshold: RMS energy threshold for speech detection |
| margin_factor: Factor to determine extra margin around detected boundaries |
| |
| Returns: |
| tuple: (start_index, end_index) of speech segment |
| |
| Raises: |
| ValueError: If the audio contains only silence |
| """ |
| window_size = int(window_duration * sample_rate) |
| margin = margin_factor * window_size |
| step_size = window_size // 10 |
| |
| |
| windows = sliding_window_view(wav, window_size)[::step_size] |
| |
| |
| energy = np.sqrt(np.mean(windows ** 2, axis=1)) |
| speech_mask = energy >= energy_threshold |
| |
| if not np.any(speech_mask): |
| raise ValueError("No speech detected in audio (only silence)") |
| |
| start = max(0, np.argmax(speech_mask) * step_size - margin) |
| end = min(len(wav), (len(speech_mask) - 1 - np.argmax(speech_mask[::-1])) * step_size + margin) |
| |
| return start, end |
|
|
|
|
| def remove_silence_on_both_ends( |
| wav: np.ndarray, |
| sample_rate: int, |
| window_duration: float = 0.1, |
| volume_threshold: float = 0.01 |
| ) -> np.ndarray: |
| """Remove silence from both ends of an audio signal. |
| |
| Args: |
| wav: Input audio signal array |
| sample_rate: Audio sample rate in Hz |
| window_duration: Duration of detection window in seconds |
| volume_threshold: Amplitude threshold for silence detection |
| |
| Returns: |
| np.ndarray: Audio signal with silence removed from both ends |
| |
| Raises: |
| ValueError: If the audio contains only silence |
| """ |
| start, end = detect_speech_boundaries( |
| wav, |
| sample_rate, |
| window_duration, |
| volume_threshold |
| ) |
| return wav[start:end] |
|
|
|
|
|
|
| def hertz_to_mel(pitch: float) -> float: |
| """ |
| Converts a frequency from the Hertz scale to the Mel scale. |
| |
| Parameters: |
| - pitch: float or ndarray |
| Frequency in Hertz. |
| |
| Returns: |
| - mel: float or ndarray |
| Frequency in Mel scale. |
| """ |
| mel = 2595 * np.log10(1 + pitch / 700) |
| return mel |