Spaces:
Paused
Paused
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| import matplotlib | |
| import librosa | |
| from scipy.io.wavfile import write | |
| import torch | |
| k = 1e-16 | |
| def np_log10(x): | |
| """Safe log function with base 10.""" | |
| numerator = np.log(x + 1e-16) | |
| denominator = np.log(10) | |
| return numerator / denominator | |
| def sigmoid(x): | |
| """Safe log function with base 10.""" | |
| s = 1 / (1 + np.exp(-x)) | |
| return s | |
| def inv_sigmoid(s): | |
| """Safe inverse sigmoid function.""" | |
| x = np.log((s / (1 - s)) + 1e-16) | |
| return x | |
| def spc_to_VAE_input(spc): | |
| """Restrict value range from [0, infinite] to [0, 1]. (deprecated )""" | |
| return spc / (1 + spc) | |
| def VAE_out_put_to_spc(o): | |
| """Inverse transform of function 'spc_to_VAE_input'. (deprecated )""" | |
| return o / (1 - o + k) | |
| def np_power_to_db(S, amin=1e-16, top_db=80.0): | |
| """Helper method for numpy data scaling. (deprecated )""" | |
| ref = S.max() | |
| log_spec = 10.0 * np_log10(np.maximum(amin, S)) | |
| log_spec -= 10.0 * np_log10(np.maximum(amin, ref)) | |
| log_spec = np.maximum(log_spec, log_spec.max() - top_db) | |
| return log_spec | |
| def show_spc(spc): | |
| """Show a spectrogram. (deprecated )""" | |
| s = np.shape(spc) | |
| spc = np.reshape(spc, (s[0], s[1])) | |
| magnitude_spectrum = np.abs(spc) | |
| log_spectrum = np_power_to_db(magnitude_spectrum) | |
| plt.imshow(np.flipud(log_spectrum)) | |
| plt.show() | |
| def save_results(spectrogram, spectrogram_image_path, waveform_path): | |
| """Save the input 'spectrogram' and its waveform (reconstructed by Griffin Lim) | |
| to path provided by 'spectrogram_image_path' and 'waveform_path'.""" | |
| magnitude_spectrum = np.abs(spectrogram) | |
| log_spc = np_power_to_db(magnitude_spectrum) | |
| log_spc = np.reshape(log_spc, (512, 256)) | |
| matplotlib.pyplot.imsave(spectrogram_image_path, log_spc, vmin=-100, vmax=0, | |
| origin='lower') | |
| # save waveform | |
| abs_spec = np.zeros((513, 256)) | |
| abs_spec[:512, :] = abs_spec[:512, :] + np.sqrt(np.reshape(spectrogram, (512, 256))) | |
| rec_signal = librosa.griffinlim(abs_spec, n_iter=32, hop_length=256, win_length=1024) | |
| write(waveform_path, 16000, rec_signal) | |
| def plot_log_spectrogram(signal: np.ndarray, | |
| path: str, | |
| n_fft=2048, | |
| frame_length=1024, | |
| frame_step=256): | |
| """Save spectrogram.""" | |
| stft = librosa.stft(signal, n_fft=n_fft, hop_length=frame_step, win_length=frame_length) | |
| amp = np.square(np.real(stft)) + np.square(np.imag(stft)) | |
| magnitude_spectrum = np.abs(amp) | |
| log_mel = np_power_to_db(magnitude_spectrum) | |
| matplotlib.pyplot.imsave(path, log_mel, vmin=-100, vmax=0, origin='lower') | |
| def visualize_feature_maps(device, model, inputs, channel_indices=[0, 3,]): | |
| """ | |
| Visualize feature maps before and after quantization for given input. | |
| Parameters: | |
| - model: Your VQ-VAE model. | |
| - inputs: A batch of input data. | |
| - channel_indices: Indices of feature map channels to visualize. | |
| """ | |
| model.eval() | |
| inputs = inputs.to(device) | |
| with torch.no_grad(): | |
| z_e = model._encoder(inputs) | |
| z_q, loss, (perplexity, min_encodings, min_encoding_indices) = model._vq_vae(z_e) | |
| # Assuming inputs have shape [batch_size, channels, height, width] | |
| batch_size = z_e.size(0) | |
| for idx in range(batch_size): | |
| fig, axs = plt.subplots(1, len(channel_indices)*2, figsize=(15, 5)) | |
| for i, channel_idx in enumerate(channel_indices): | |
| # Plot encoder output | |
| axs[2*i].imshow(z_e[idx][channel_idx].cpu().numpy(), cmap='viridis') | |
| axs[2*i].set_title(f"Encoder Output - Channel {channel_idx}") | |
| # Plot quantized output | |
| axs[2*i+1].imshow(z_q[idx][channel_idx].cpu().numpy(), cmap='viridis') | |
| axs[2*i+1].set_title(f"Quantized Output - Channel {channel_idx}") | |
| plt.show() | |
| def adjust_audio_length(audio, desired_length, original_sample_rate, target_sample_rate): | |
| """ | |
| Adjust the audio length to the desired length and resample to target sample rate. | |
| Parameters: | |
| - audio (np.array): The input audio signal | |
| - desired_length (int): The desired length of the output audio | |
| - original_sample_rate (int): The original sample rate of the audio | |
| - target_sample_rate (int): The target sample rate for the output audio | |
| Returns: | |
| - np.array: The adjusted and resampled audio | |
| """ | |
| if not (original_sample_rate == target_sample_rate): | |
| audio = librosa.core.resample(audio, orig_sr=original_sample_rate, target_sr=target_sample_rate) | |
| if len(audio) > desired_length: | |
| return audio[:desired_length] | |
| elif len(audio) < desired_length: | |
| padded_audio = np.zeros(desired_length) | |
| padded_audio[:len(audio)] = audio | |
| return padded_audio | |
| else: | |
| return audio | |
| def safe_int(s, default=0): | |
| try: | |
| return int(s) | |
| except ValueError: | |
| return default | |
| def pad_spectrogram(D): | |
| """Resize spectrogram to (512, 256). (deprecated )""" | |
| D = D[1:, :] | |
| padding_length = 256 - D.shape[1] | |
| D_padded = np.pad(D, ((0, 0), (0, padding_length)), 'constant') | |
| return D_padded | |
| def pad_STFT(D, time_resolution=256): | |
| """Resize spectral matrix by padding and cropping""" | |
| D = D[1:, :] | |
| if time_resolution is None: | |
| return D | |
| padding_length = time_resolution - D.shape[1] | |
| if padding_length > 0: | |
| D_padded = np.pad(D, ((0, 0), (0, padding_length)), 'constant') | |
| return D_padded | |
| else: | |
| return D | |
| def depad_STFT(D_padded): | |
| """Inverse function of 'pad_STFT'""" | |
| zero_row = np.zeros((1, D_padded.shape[1])) | |
| D_restored = np.concatenate([zero_row, D_padded], axis=0) | |
| return D_restored | |
| def nnData2Audio(spectrogram_batch, resolution=(512, 256), squared=False): | |
| """Transform batch of numpy spectrogram into signals and encodings.""" | |
| # Todo: remove resolution hard-coding | |
| frequency_resolution, time_resolution = resolution | |
| if isinstance(spectrogram_batch, torch.Tensor): | |
| spectrogram_batch = spectrogram_batch.to("cpu").detach().numpy() | |
| origin_signals = [] | |
| for spectrogram in spectrogram_batch: | |
| spc = VAE_out_put_to_spc(spectrogram) | |
| # get_audio | |
| abs_spec = np.zeros((frequency_resolution+1, time_resolution)) | |
| if squared: | |
| abs_spec[1:, :] = abs_spec[1:, :] + np.sqrt(np.reshape(spc, (frequency_resolution, time_resolution))) | |
| else: | |
| abs_spec[1:, :] = abs_spec[1:, :] + np.reshape(spc, (frequency_resolution, time_resolution)) | |
| origin_signal = librosa.griffinlim(abs_spec, n_iter=32, hop_length=256, win_length=1024) | |
| origin_signals.append(origin_signal) | |
| return origin_signals | |
| def amp_to_audio(amp, n_iter=50): | |
| """The Griffin-Lim algorithm.""" | |
| y_reconstructed = librosa.griffinlim(amp, n_iter=n_iter, hop_length=256, win_length=1024) | |
| return y_reconstructed | |
| def rescale(amp, method="log1p"): | |
| """Rescale function.""" | |
| if method == "log1p": | |
| return np.log1p(amp) | |
| elif method == "NormalizedLogisticCompression": | |
| return amp / (1.0 + amp) | |
| else: | |
| raise NotImplementedError() | |
| def unrescale(scaled_amp, method="NormalizedLogisticCompression"): | |
| """Inverse function of 'rescale'""" | |
| if method == "log1p": | |
| return np.expm1(scaled_amp) | |
| elif method == "NormalizedLogisticCompression": | |
| return scaled_amp / (1.0 - scaled_amp + 1e-10) | |
| else: | |
| raise NotImplementedError() | |
| def create_key(attributes): | |
| """Create unique key for each multi-label.""" | |
| qualities_str = ''.join(map(str, attributes["qualities"])) | |
| instrument_source_str = attributes["instrument_source_str"] | |
| instrument_family = attributes["instrument_family_str"] | |
| key = f"{instrument_source_str}_{instrument_family}_{qualities_str}" | |
| return key | |
| def merge_dictionaries(dicts): | |
| """Merge dictionaries.""" | |
| merged_dict = {} | |
| for dictionary in dicts: | |
| for key, value in dictionary.items(): | |
| if key in merged_dict: | |
| merged_dict[key] += value | |
| else: | |
| merged_dict[key] = value | |
| return merged_dict | |
| def adsr_envelope(signal, sample_rate, duration, attack_time, decay_time, sustain_level, release_time): | |
| """ | |
| Apply an ADSR envelope to an audio signal. | |
| :param signal: The original audio signal (numpy array). | |
| :param sample_rate: The sample rate of the audio signal. | |
| :param attack_time: Attack time in seconds. | |
| :param decay_time: Decay time in seconds. | |
| :param sustain_level: Sustain level as a fraction of the peak (0 to 1). | |
| :param release_time: Release time in seconds. | |
| :return: The audio signal with the ADSR envelope applied. | |
| """ | |
| # Calculate the number of samples for each ADSR phase | |
| duration_samples = int(duration * sample_rate) | |
| # assert (duration_samples + int(1.0 * sample_rate)) <= len(signal), "(duration_samples + sample_rate) > len(signal)" | |
| assert release_time <= 1.0, "release_time > 1.0" | |
| attack_samples = int(attack_time * sample_rate) | |
| decay_samples = int(decay_time * sample_rate) | |
| release_samples = int(release_time * sample_rate) | |
| sustain_samples = max(0, duration_samples - attack_samples - decay_samples) | |
| # Create ADSR envelope | |
| attack_env = np.linspace(0, 1, attack_samples) | |
| decay_env = np.linspace(1, sustain_level, decay_samples) | |
| sustain_env = np.full(sustain_samples, sustain_level) | |
| release_env = np.linspace(sustain_level, 0, release_samples) | |
| release_env_expand = np.zeros(int(1.0 * sample_rate)) | |
| release_env_expand[:len(release_env)] = release_env | |
| # Concatenate all phases to create the complete envelope | |
| envelope = np.concatenate([attack_env, decay_env, sustain_env, release_env_expand]) | |
| # Apply the envelope to the signal | |
| if len(envelope) <= len(signal): | |
| applied_signal = signal[:len(envelope)] * envelope | |
| else: | |
| signal_expanded = np.zeros(len(envelope)) | |
| signal_expanded[:len(signal)] = signal | |
| applied_signal = signal_expanded * envelope | |
| return applied_signal | |
| def rms_normalize(audio, target_rms=0.1): | |
| """Normalize the RMS value.""" | |
| current_rms = np.sqrt(np.mean(audio**2)) | |
| scaling_factor = target_rms / current_rms | |
| normalized_audio = audio * scaling_factor | |
| return normalized_audio | |
| def encode_stft(D): | |
| """'STFT+' function that transform spectral matrix into spectral representation.""" | |
| magnitude = np.abs(D) | |
| phase = np.angle(D) | |
| log_magnitude = np.log1p(magnitude) | |
| cos_phase = np.cos(phase) | |
| sin_phase = np.sin(phase) | |
| encoded_D = np.stack([log_magnitude, cos_phase, sin_phase], axis=0) | |
| return encoded_D | |
| def decode_stft(encoded_D): | |
| """'ISTFT+' function that reconstructs spectral matrix from spectral representation.""" | |
| log_magnitude = encoded_D[0, ...] | |
| cos_phase = encoded_D[1, ...] | |
| sin_phase = encoded_D[2, ...] | |
| magnitude = np.expm1(log_magnitude) | |
| phase = np.arctan2(sin_phase, cos_phase) | |
| D = magnitude * (np.cos(phase) + 1j * np.sin(phase)) | |
| return D | |