| import audioread
|
| import librosa
|
| import numpy as np
|
| import soundfile as sf
|
| import math
|
| import platform
|
| import traceback
|
| from . import pyrb
|
| from scipy.signal import correlate, hilbert
|
| import io
|
|
|
| OPERATING_SYSTEM = platform.system()
|
| SYSTEM_ARCH = platform.platform()
|
| SYSTEM_PROC = platform.processor()
|
| ARM = 'arm'
|
|
|
| AUTO_PHASE = "Automatic"
|
| POSITIVE_PHASE = "Positive Phase"
|
| NEGATIVE_PHASE = "Negative Phase"
|
| NONE_P = "None",
|
| LOW_P = "Shifts: Low",
|
| MED_P = "Shifts: Medium",
|
| HIGH_P = "Shifts: High",
|
| VHIGH_P = "Shifts: Very High"
|
| MAXIMUM_P = "Shifts: Maximum"
|
|
|
| progress_value = 0
|
| last_update_time = 0
|
| is_macos = False
|
|
|
| if OPERATING_SYSTEM == 'Windows':
|
| from pyrubberband import pyrb
|
| else:
|
| from . import pyrb
|
|
|
| if OPERATING_SYSTEM == 'Darwin':
|
| wav_resolution = "polyphase" if SYSTEM_PROC == ARM or ARM in SYSTEM_ARCH else "sinc_fastest"
|
| wav_resolution_float_resampling = "kaiser_best" if SYSTEM_PROC == ARM or ARM in SYSTEM_ARCH else wav_resolution
|
| is_macos = True
|
| else:
|
| wav_resolution = "sinc_fastest"
|
| wav_resolution_float_resampling = wav_resolution
|
|
|
| MAX_SPEC = 'Max Spec'
|
| MIN_SPEC = 'Min Spec'
|
| LIN_ENSE = 'Linear Ensemble'
|
|
|
| MAX_WAV = MAX_SPEC
|
| MIN_WAV = MIN_SPEC
|
|
|
| AVERAGE = 'Average'
|
|
|
| def crop_center(h1, h2):
|
| h1_shape = h1.size()
|
| h2_shape = h2.size()
|
|
|
| if h1_shape[3] == h2_shape[3]:
|
| return h1
|
| elif h1_shape[3] < h2_shape[3]:
|
| raise ValueError('h1_shape[3] must be greater than h2_shape[3]')
|
|
|
| s_time = (h1_shape[3] - h2_shape[3]) // 2
|
| e_time = s_time + h2_shape[3]
|
| h1 = h1[:, :, :, s_time:e_time]
|
|
|
| return h1
|
|
|
| def preprocess(X_spec):
|
| X_mag = np.abs(X_spec)
|
| X_phase = np.angle(X_spec)
|
|
|
| return X_mag, X_phase
|
|
|
| def make_padding(width, cropsize, offset):
|
| left = offset
|
| roi_size = cropsize - offset * 2
|
| if roi_size == 0:
|
| roi_size = cropsize
|
| right = roi_size - (width % roi_size) + left
|
|
|
| return left, right, roi_size
|
|
|
| def normalize(wave, is_normalize=False):
|
| """Normalize audio"""
|
|
|
| maxv = np.abs(wave).max()
|
| if maxv > 1.0:
|
| if is_normalize:
|
| print("Above clipping threshold.")
|
| wave /= maxv
|
|
|
| return wave
|
|
|
| def auto_transpose(audio_array:np.ndarray):
|
| """
|
| Ensure that the audio array is in the (channels, samples) format.
|
|
|
| Parameters:
|
| audio_array (ndarray): Input audio array.
|
|
|
| Returns:
|
| ndarray: Transposed audio array if necessary.
|
| """
|
|
|
|
|
| if audio_array.shape[1] == 2:
|
| return audio_array.T
|
| return audio_array
|
|
|
| def write_array_to_mem(audio_data, subtype):
|
| if isinstance(audio_data, np.ndarray):
|
| audio_buffer = io.BytesIO()
|
| sf.write(audio_buffer, audio_data, 44100, subtype=subtype, format='WAV')
|
| audio_buffer.seek(0)
|
| return audio_buffer
|
| else:
|
| return audio_data
|
|
|
| def spectrogram_to_image(spec, mode='magnitude'):
|
| if mode == 'magnitude':
|
| if np.iscomplexobj(spec):
|
| y = np.abs(spec)
|
| else:
|
| y = spec
|
| y = np.log10(y ** 2 + 1e-8)
|
| elif mode == 'phase':
|
| if np.iscomplexobj(spec):
|
| y = np.angle(spec)
|
| else:
|
| y = spec
|
|
|
| y -= y.min()
|
| y *= 255 / y.max()
|
| img = np.uint8(y)
|
|
|
| if y.ndim == 3:
|
| img = img.transpose(1, 2, 0)
|
| img = np.concatenate([
|
| np.max(img, axis=2, keepdims=True), img
|
| ], axis=2)
|
|
|
| return img
|
|
|
| def reduce_vocal_aggressively(X, y, softmask):
|
| v = X - y
|
| y_mag_tmp = np.abs(y)
|
| v_mag_tmp = np.abs(v)
|
|
|
| v_mask = v_mag_tmp > y_mag_tmp
|
| y_mag = np.clip(y_mag_tmp - v_mag_tmp * v_mask * softmask, 0, np.inf)
|
|
|
| return y_mag * np.exp(1.j * np.angle(y))
|
|
|
| def merge_artifacts(y_mask, thres=0.01, min_range=64, fade_size=32):
|
| mask = y_mask
|
|
|
| try:
|
| if min_range < fade_size * 2:
|
| raise ValueError('min_range must be >= fade_size * 2')
|
|
|
| idx = np.where(y_mask.min(axis=(0, 1)) > thres)[0]
|
| start_idx = np.insert(idx[np.where(np.diff(idx) != 1)[0] + 1], 0, idx[0])
|
| end_idx = np.append(idx[np.where(np.diff(idx) != 1)[0]], idx[-1])
|
| artifact_idx = np.where(end_idx - start_idx > min_range)[0]
|
| weight = np.zeros_like(y_mask)
|
| if len(artifact_idx) > 0:
|
| start_idx = start_idx[artifact_idx]
|
| end_idx = end_idx[artifact_idx]
|
| old_e = None
|
| for s, e in zip(start_idx, end_idx):
|
| if old_e is not None and s - old_e < fade_size:
|
| s = old_e - fade_size * 2
|
|
|
| if s != 0:
|
| weight[:, :, s:s + fade_size] = np.linspace(0, 1, fade_size)
|
| else:
|
| s -= fade_size
|
|
|
| if e != y_mask.shape[2]:
|
| weight[:, :, e - fade_size:e] = np.linspace(1, 0, fade_size)
|
| else:
|
| e += fade_size
|
|
|
| weight[:, :, s + fade_size:e - fade_size] = 1
|
| old_e = e
|
|
|
| v_mask = 1 - y_mask
|
| y_mask += weight * v_mask
|
|
|
| mask = y_mask
|
| except Exception as e:
|
| error_name = f'{type(e).__name__}'
|
| traceback_text = ''.join(traceback.format_tb(e.__traceback__))
|
| message = f'{error_name}: "{e}"\n{traceback_text}"'
|
| print('Post Process Failed: ', message)
|
|
|
| return mask
|
|
|
| def align_wave_head_and_tail(a, b):
|
| l = min([a[0].size, b[0].size])
|
|
|
| return a[:l,:l], b[:l,:l]
|
|
|
| def convert_channels(spec, mp, band):
|
| cc = mp.param['band'][band].get('convert_channels')
|
|
|
| if 'mid_side_c' == cc:
|
| spec_left = np.add(spec[0], spec[1] * .25)
|
| spec_right = np.subtract(spec[1], spec[0] * .25)
|
| elif 'mid_side' == cc:
|
| spec_left = np.add(spec[0], spec[1]) / 2
|
| spec_right = np.subtract(spec[0], spec[1])
|
| elif 'stereo_n' == cc:
|
| spec_left = np.add(spec[0], spec[1] * .25) / 0.9375
|
| spec_right = np.add(spec[1], spec[0] * .25) / 0.9375
|
| else:
|
| return spec
|
|
|
| return np.asfortranarray([spec_left, spec_right])
|
|
|
| def combine_spectrograms(specs, mp, is_v51_model=False):
|
| l = min([specs[i].shape[2] for i in specs])
|
| spec_c = np.zeros(shape=(2, mp.param['bins'] + 1, l), dtype=np.complex64)
|
| offset = 0
|
| bands_n = len(mp.param['band'])
|
|
|
| for d in range(1, bands_n + 1):
|
| h = mp.param['band'][d]['crop_stop'] - mp.param['band'][d]['crop_start']
|
| spec_c[:, offset:offset+h, :l] = specs[d][:, mp.param['band'][d]['crop_start']:mp.param['band'][d]['crop_stop'], :l]
|
| offset += h
|
|
|
| if offset > mp.param['bins']:
|
| raise ValueError('Too much bins')
|
|
|
|
|
|
|
| if mp.param['pre_filter_start'] > 0:
|
| if is_v51_model:
|
| spec_c *= get_lp_filter_mask(spec_c.shape[1], mp.param['pre_filter_start'], mp.param['pre_filter_stop'])
|
| else:
|
| if bands_n == 1:
|
| spec_c = fft_lp_filter(spec_c, mp.param['pre_filter_start'], mp.param['pre_filter_stop'])
|
| else:
|
| gp = 1
|
| for b in range(mp.param['pre_filter_start'] + 1, mp.param['pre_filter_stop']):
|
| g = math.pow(10, -(b - mp.param['pre_filter_start']) * (3.5 - gp) / 20.0)
|
| gp = g
|
| spec_c[:, b, :] *= g
|
|
|
| return np.asfortranarray(spec_c)
|
|
|
| def wave_to_spectrogram(wave, hop_length, n_fft, mp, band, is_v51_model=False):
|
|
|
| if wave.ndim == 1:
|
| wave = np.asfortranarray([wave,wave])
|
|
|
| if not is_v51_model:
|
| if mp.param['reverse']:
|
| wave_left = np.flip(np.asfortranarray(wave[0]))
|
| wave_right = np.flip(np.asfortranarray(wave[1]))
|
| elif mp.param['mid_side']:
|
| wave_left = np.asfortranarray(np.add(wave[0], wave[1]) / 2)
|
| wave_right = np.asfortranarray(np.subtract(wave[0], wave[1]))
|
| elif mp.param['mid_side_b2']:
|
| wave_left = np.asfortranarray(np.add(wave[1], wave[0] * .5))
|
| wave_right = np.asfortranarray(np.subtract(wave[0], wave[1] * .5))
|
| else:
|
| wave_left = np.asfortranarray(wave[0])
|
| wave_right = np.asfortranarray(wave[1])
|
| else:
|
| wave_left = np.asfortranarray(wave[0])
|
| wave_right = np.asfortranarray(wave[1])
|
|
|
| spec_left = librosa.stft(wave_left, n_fft, hop_length=hop_length)
|
| spec_right = librosa.stft(wave_right, n_fft, hop_length=hop_length)
|
|
|
| spec = np.asfortranarray([spec_left, spec_right])
|
|
|
| if is_v51_model:
|
| spec = convert_channels(spec, mp, band)
|
|
|
| return spec
|
|
|
| def spectrogram_to_wave(spec, hop_length=1024, mp={}, band=0, is_v51_model=True):
|
| spec_left = np.asfortranarray(spec[0])
|
| spec_right = np.asfortranarray(spec[1])
|
|
|
| wave_left = librosa.istft(spec_left, hop_length=hop_length)
|
| wave_right = librosa.istft(spec_right, hop_length=hop_length)
|
|
|
| if is_v51_model:
|
| cc = mp.param['band'][band].get('convert_channels')
|
| if 'mid_side_c' == cc:
|
| return np.asfortranarray([np.subtract(wave_left / 1.0625, wave_right / 4.25), np.add(wave_right / 1.0625, wave_left / 4.25)])
|
| elif 'mid_side' == cc:
|
| return np.asfortranarray([np.add(wave_left, wave_right / 2), np.subtract(wave_left, wave_right / 2)])
|
| elif 'stereo_n' == cc:
|
| return np.asfortranarray([np.subtract(wave_left, wave_right * .25), np.subtract(wave_right, wave_left * .25)])
|
| else:
|
| if mp.param['reverse']:
|
| return np.asfortranarray([np.flip(wave_left), np.flip(wave_right)])
|
| elif mp.param['mid_side']:
|
| return np.asfortranarray([np.add(wave_left, wave_right / 2), np.subtract(wave_left, wave_right / 2)])
|
| elif mp.param['mid_side_b2']:
|
| return np.asfortranarray([np.add(wave_right / 1.25, .4 * wave_left), np.subtract(wave_left / 1.25, .4 * wave_right)])
|
|
|
| return np.asfortranarray([wave_left, wave_right])
|
|
|
| def cmb_spectrogram_to_wave(spec_m, mp, extra_bins_h=None, extra_bins=None, is_v51_model=False):
|
| bands_n = len(mp.param['band'])
|
| offset = 0
|
|
|
| for d in range(1, bands_n + 1):
|
| bp = mp.param['band'][d]
|
| spec_s = np.ndarray(shape=(2, bp['n_fft'] // 2 + 1, spec_m.shape[2]), dtype=complex)
|
| h = bp['crop_stop'] - bp['crop_start']
|
| spec_s[:, bp['crop_start']:bp['crop_stop'], :] = spec_m[:, offset:offset+h, :]
|
|
|
| offset += h
|
| if d == bands_n:
|
| if extra_bins_h:
|
| max_bin = bp['n_fft'] // 2
|
| spec_s[:, max_bin-extra_bins_h:max_bin, :] = extra_bins[:, :extra_bins_h, :]
|
| if bp['hpf_start'] > 0:
|
| if is_v51_model:
|
| spec_s *= get_hp_filter_mask(spec_s.shape[1], bp['hpf_start'], bp['hpf_stop'] - 1)
|
| else:
|
| spec_s = fft_hp_filter(spec_s, bp['hpf_start'], bp['hpf_stop'] - 1)
|
| if bands_n == 1:
|
| wave = spectrogram_to_wave(spec_s, bp['hl'], mp, d, is_v51_model)
|
| else:
|
| wave = np.add(wave, spectrogram_to_wave(spec_s, bp['hl'], mp, d, is_v51_model))
|
| else:
|
| sr = mp.param['band'][d+1]['sr']
|
| if d == 1:
|
| if is_v51_model:
|
| spec_s *= get_lp_filter_mask(spec_s.shape[1], bp['lpf_start'], bp['lpf_stop'])
|
| else:
|
| spec_s = fft_lp_filter(spec_s, bp['lpf_start'], bp['lpf_stop'])
|
| wave = librosa.resample(spectrogram_to_wave(spec_s, bp['hl'], mp, d, is_v51_model), bp['sr'], sr, res_type=wav_resolution)
|
| else:
|
| if is_v51_model:
|
| spec_s *= get_hp_filter_mask(spec_s.shape[1], bp['hpf_start'], bp['hpf_stop'] - 1)
|
| spec_s *= get_lp_filter_mask(spec_s.shape[1], bp['lpf_start'], bp['lpf_stop'])
|
| else:
|
| spec_s = fft_hp_filter(spec_s, bp['hpf_start'], bp['hpf_stop'] - 1)
|
| spec_s = fft_lp_filter(spec_s, bp['lpf_start'], bp['lpf_stop'])
|
|
|
| wave2 = np.add(wave, spectrogram_to_wave(spec_s, bp['hl'], mp, d, is_v51_model))
|
| wave = librosa.resample(wave2, bp['sr'], sr, res_type=wav_resolution)
|
|
|
| return wave
|
|
|
| def get_lp_filter_mask(n_bins, bin_start, bin_stop):
|
| mask = np.concatenate([
|
| np.ones((bin_start - 1, 1)),
|
| np.linspace(1, 0, bin_stop - bin_start + 1)[:, None],
|
| np.zeros((n_bins - bin_stop, 1))
|
| ], axis=0)
|
|
|
| return mask
|
|
|
| def get_hp_filter_mask(n_bins, bin_start, bin_stop):
|
| mask = np.concatenate([
|
| np.zeros((bin_stop + 1, 1)),
|
| np.linspace(0, 1, 1 + bin_start - bin_stop)[:, None],
|
| np.ones((n_bins - bin_start - 2, 1))
|
| ], axis=0)
|
|
|
| return mask
|
|
|
| def fft_lp_filter(spec, bin_start, bin_stop):
|
| g = 1.0
|
| for b in range(bin_start, bin_stop):
|
| g -= 1 / (bin_stop - bin_start)
|
| spec[:, b, :] = g * spec[:, b, :]
|
|
|
| spec[:, bin_stop:, :] *= 0
|
|
|
| return spec
|
|
|
| def fft_hp_filter(spec, bin_start, bin_stop):
|
| g = 1.0
|
| for b in range(bin_start, bin_stop, -1):
|
| g -= 1 / (bin_start - bin_stop)
|
| spec[:, b, :] = g * spec[:, b, :]
|
|
|
| spec[:, 0:bin_stop+1, :] *= 0
|
|
|
| return spec
|
|
|
| def spectrogram_to_wave_old(spec, hop_length=1024):
|
| if spec.ndim == 2:
|
| wave = librosa.istft(spec, hop_length=hop_length)
|
| elif spec.ndim == 3:
|
| spec_left = np.asfortranarray(spec[0])
|
| spec_right = np.asfortranarray(spec[1])
|
|
|
| wave_left = librosa.istft(spec_left, hop_length=hop_length)
|
| wave_right = librosa.istft(spec_right, hop_length=hop_length)
|
| wave = np.asfortranarray([wave_left, wave_right])
|
|
|
| return wave
|
|
|
| def wave_to_spectrogram_old(wave, hop_length, n_fft):
|
| wave_left = np.asfortranarray(wave[0])
|
| wave_right = np.asfortranarray(wave[1])
|
|
|
| spec_left = librosa.stft(wave_left, n_fft, hop_length=hop_length)
|
| spec_right = librosa.stft(wave_right, n_fft, hop_length=hop_length)
|
|
|
| spec = np.asfortranarray([spec_left, spec_right])
|
|
|
| return spec
|
|
|
| def mirroring(a, spec_m, input_high_end, mp):
|
| if 'mirroring' == a:
|
| mirror = np.flip(np.abs(spec_m[:, mp.param['pre_filter_start']-10-input_high_end.shape[1]:mp.param['pre_filter_start']-10, :]), 1)
|
| mirror = mirror * np.exp(1.j * np.angle(input_high_end))
|
|
|
| return np.where(np.abs(input_high_end) <= np.abs(mirror), input_high_end, mirror)
|
|
|
| if 'mirroring2' == a:
|
| mirror = np.flip(np.abs(spec_m[:, mp.param['pre_filter_start']-10-input_high_end.shape[1]:mp.param['pre_filter_start']-10, :]), 1)
|
| mi = np.multiply(mirror, input_high_end * 1.7)
|
|
|
| return np.where(np.abs(input_high_end) <= np.abs(mi), input_high_end, mi)
|
|
|
| def adjust_aggr(mask, is_non_accom_stem, aggressiveness):
|
| aggr = aggressiveness['value'] * 2
|
|
|
| if aggr != 0:
|
| if is_non_accom_stem:
|
| aggr = 1 - aggr
|
|
|
| aggr = [aggr, aggr]
|
|
|
| if aggressiveness['aggr_correction'] is not None:
|
| aggr[0] += aggressiveness['aggr_correction']['left']
|
| aggr[1] += aggressiveness['aggr_correction']['right']
|
|
|
| for ch in range(2):
|
| mask[ch, :aggressiveness['split_bin']] = np.power(mask[ch, :aggressiveness['split_bin']], 1 + aggr[ch] / 3)
|
| mask[ch, aggressiveness['split_bin']:] = np.power(mask[ch, aggressiveness['split_bin']:], 1 + aggr[ch])
|
|
|
| return mask
|
|
|
| def stft(wave, nfft, hl):
|
| wave_left = np.asfortranarray(wave[0])
|
| wave_right = np.asfortranarray(wave[1])
|
| spec_left = librosa.stft(wave_left, nfft, hop_length=hl)
|
| spec_right = librosa.stft(wave_right, nfft, hop_length=hl)
|
| spec = np.asfortranarray([spec_left, spec_right])
|
|
|
| return spec
|
|
|
| def istft(spec, hl):
|
| spec_left = np.asfortranarray(spec[0])
|
| spec_right = np.asfortranarray(spec[1])
|
| wave_left = librosa.istft(spec_left, hop_length=hl)
|
| wave_right = librosa.istft(spec_right, hop_length=hl)
|
| wave = np.asfortranarray([wave_left, wave_right])
|
|
|
| return wave
|
|
|
| def spec_effects(wave, algorithm='Default', value=None):
|
| spec = [stft(wave[0],2048,1024), stft(wave[1],2048,1024)]
|
| if algorithm == 'Min_Mag':
|
| v_spec_m = np.where(np.abs(spec[1]) <= np.abs(spec[0]), spec[1], spec[0])
|
| wave = istft(v_spec_m,1024)
|
| elif algorithm == 'Max_Mag':
|
| v_spec_m = np.where(np.abs(spec[1]) >= np.abs(spec[0]), spec[1], spec[0])
|
| wave = istft(v_spec_m,1024)
|
| elif algorithm == 'Default':
|
| wave = (wave[1] * value) + (wave[0] * (1-value))
|
| elif algorithm == 'Invert_p':
|
| X_mag = np.abs(spec[0])
|
| y_mag = np.abs(spec[1])
|
| max_mag = np.where(X_mag >= y_mag, X_mag, y_mag)
|
| v_spec = spec[1] - max_mag * np.exp(1.j * np.angle(spec[0]))
|
| wave = istft(v_spec,1024)
|
|
|
| return wave
|
|
|
| def spectrogram_to_wave_no_mp(spec, n_fft=2048, hop_length=1024):
|
| wave = librosa.istft(spec, n_fft=n_fft, hop_length=hop_length)
|
|
|
| if wave.ndim == 1:
|
| wave = np.asfortranarray([wave,wave])
|
|
|
| return wave
|
|
|
| def wave_to_spectrogram_no_mp(wave):
|
|
|
| spec = librosa.stft(wave, n_fft=2048, hop_length=1024)
|
|
|
| if spec.ndim == 1:
|
| spec = np.asfortranarray([spec,spec])
|
|
|
| return spec
|
|
|
| def invert_audio(specs, invert_p=True):
|
|
|
| ln = min([specs[0].shape[2], specs[1].shape[2]])
|
| specs[0] = specs[0][:,:,:ln]
|
| specs[1] = specs[1][:,:,:ln]
|
|
|
| if invert_p:
|
| X_mag = np.abs(specs[0])
|
| y_mag = np.abs(specs[1])
|
| max_mag = np.where(X_mag >= y_mag, X_mag, y_mag)
|
| v_spec = specs[1] - max_mag * np.exp(1.j * np.angle(specs[0]))
|
| else:
|
| specs[1] = reduce_vocal_aggressively(specs[0], specs[1], 0.2)
|
| v_spec = specs[0] - specs[1]
|
|
|
| return v_spec
|
|
|
| def invert_stem(mixture, stem):
|
| mixture = wave_to_spectrogram_no_mp(mixture)
|
| stem = wave_to_spectrogram_no_mp(stem)
|
| output = spectrogram_to_wave_no_mp(invert_audio([mixture, stem]))
|
|
|
| return -output.T
|
|
|
| def ensembling(a, inputs, is_wavs=False):
|
|
|
| for i in range(1, len(inputs)):
|
| if i == 1:
|
| input = inputs[0]
|
|
|
| if is_wavs:
|
| ln = min([input.shape[1], inputs[i].shape[1]])
|
| input = input[:,:ln]
|
| inputs[i] = inputs[i][:,:ln]
|
| else:
|
| ln = min([input.shape[2], inputs[i].shape[2]])
|
| input = input[:,:,:ln]
|
| inputs[i] = inputs[i][:,:,:ln]
|
|
|
| if MIN_SPEC == a:
|
| input = np.where(np.abs(inputs[i]) <= np.abs(input), inputs[i], input)
|
| if MAX_SPEC == a:
|
| input = np.where(np.abs(inputs[i]) >= np.abs(input), inputs[i], input)
|
|
|
|
|
|
|
|
|
| return input
|
|
|
| def ensemble_for_align(waves):
|
|
|
| specs = []
|
|
|
| for wav in waves:
|
| spec = wave_to_spectrogram_no_mp(wav.T)
|
| specs.append(spec)
|
|
|
| wav_aligned = spectrogram_to_wave_no_mp(ensembling(MIN_SPEC, specs)).T
|
| wav_aligned = match_array_shapes(wav_aligned, waves[1], is_swap=True)
|
|
|
| return wav_aligned
|
|
|
| def ensemble_inputs(audio_input, algorithm, is_normalization, wav_type_set, save_path, is_wave=False, is_array=False):
|
|
|
| wavs_ = []
|
|
|
| if algorithm == AVERAGE:
|
| output = average_audio(audio_input)
|
| samplerate = 44100
|
| else:
|
| specs = []
|
|
|
| for i in range(len(audio_input)):
|
| wave, samplerate = librosa.load(audio_input[i], mono=False, sr=44100)
|
| wavs_.append(wave)
|
| spec = wave if is_wave else wave_to_spectrogram_no_mp(wave)
|
| specs.append(spec)
|
|
|
| wave_shapes = [w.shape[1] for w in wavs_]
|
| target_shape = wavs_[wave_shapes.index(max(wave_shapes))]
|
|
|
| if is_wave:
|
| output = ensembling(algorithm, specs, is_wavs=True)
|
| else:
|
| output = spectrogram_to_wave_no_mp(ensembling(algorithm, specs))
|
|
|
| output = to_shape(output, target_shape.shape)
|
|
|
| sf.write(save_path, normalize(output.T, is_normalization), samplerate, subtype=wav_type_set)
|
|
|
| def to_shape(x, target_shape):
|
| padding_list = []
|
| for x_dim, target_dim in zip(x.shape, target_shape):
|
| pad_value = (target_dim - x_dim)
|
| pad_tuple = ((0, pad_value))
|
| padding_list.append(pad_tuple)
|
|
|
| return np.pad(x, tuple(padding_list), mode='constant')
|
|
|
| def to_shape_minimize(x: np.ndarray, target_shape):
|
|
|
| padding_list = []
|
| for x_dim, target_dim in zip(x.shape, target_shape):
|
| pad_value = (target_dim - x_dim)
|
| pad_tuple = ((0, pad_value))
|
| padding_list.append(pad_tuple)
|
|
|
| return np.pad(x, tuple(padding_list), mode='constant')
|
|
|
| def detect_leading_silence(audio, sr, silence_threshold=0.007, frame_length=1024):
|
| """
|
| Detect silence at the beginning of an audio signal.
|
|
|
| :param audio: np.array, audio signal
|
| :param sr: int, sample rate
|
| :param silence_threshold: float, magnitude threshold below which is considered silence
|
| :param frame_length: int, the number of samples to consider for each check
|
|
|
| :return: float, duration of the leading silence in milliseconds
|
| """
|
|
|
| if len(audio.shape) == 2:
|
|
|
| channel = np.argmax(np.sum(np.abs(audio), axis=1))
|
| audio = audio[channel]
|
|
|
| for i in range(0, len(audio), frame_length):
|
| if np.max(np.abs(audio[i:i+frame_length])) > silence_threshold:
|
| return (i / sr) * 1000
|
|
|
| return (len(audio) / sr) * 1000
|
|
|
| def adjust_leading_silence(target_audio, reference_audio, silence_threshold=0.01, frame_length=1024):
|
| """
|
| Adjust the leading silence of the target_audio to match the leading silence of the reference_audio.
|
|
|
| :param target_audio: np.array, audio signal that will have its silence adjusted
|
| :param reference_audio: np.array, audio signal used as a reference
|
| :param sr: int, sample rate
|
| :param silence_threshold: float, magnitude threshold below which is considered silence
|
| :param frame_length: int, the number of samples to consider for each check
|
|
|
| :return: np.array, target_audio adjusted to have the same leading silence as reference_audio
|
| """
|
|
|
| def find_silence_end(audio):
|
| if len(audio.shape) == 2:
|
|
|
| channel = np.argmax(np.sum(np.abs(audio), axis=1))
|
| audio_mono = audio[channel]
|
| else:
|
| audio_mono = audio
|
|
|
| for i in range(0, len(audio_mono), frame_length):
|
| if np.max(np.abs(audio_mono[i:i+frame_length])) > silence_threshold:
|
| return i
|
| return len(audio_mono)
|
|
|
| ref_silence_end = find_silence_end(reference_audio)
|
| target_silence_end = find_silence_end(target_audio)
|
| silence_difference = ref_silence_end - target_silence_end
|
|
|
| try:
|
| ref_silence_end_p = (ref_silence_end / 44100) * 1000
|
| target_silence_end_p = (target_silence_end / 44100) * 1000
|
| silence_difference_p = ref_silence_end_p - target_silence_end_p
|
| print("silence_difference: ", silence_difference_p)
|
| except Exception as e:
|
| pass
|
|
|
| if silence_difference > 0:
|
| if len(target_audio.shape) == 2:
|
| silence_to_add = np.zeros((target_audio.shape[0], silence_difference))
|
| else:
|
| silence_to_add = np.zeros(silence_difference)
|
| return np.hstack((silence_to_add, target_audio))
|
| elif silence_difference < 0:
|
| if len(target_audio.shape) == 2:
|
| return target_audio[:, -silence_difference:]
|
| else:
|
| return target_audio[-silence_difference:]
|
| else:
|
| return target_audio
|
|
|
| def match_array_shapes(array_1:np.ndarray, array_2:np.ndarray, is_swap=False):
|
|
|
| if is_swap:
|
| array_1, array_2 = array_1.T, array_2.T
|
|
|
|
|
| if array_1.shape[1] > array_2.shape[1]:
|
| array_1 = array_1[:,:array_2.shape[1]]
|
| elif array_1.shape[1] < array_2.shape[1]:
|
| padding = array_2.shape[1] - array_1.shape[1]
|
| array_1 = np.pad(array_1, ((0,0), (0,padding)), 'constant', constant_values=0)
|
|
|
|
|
|
|
| if is_swap:
|
| array_1, array_2 = array_1.T, array_2.T
|
|
|
| return array_1
|
|
|
| def match_mono_array_shapes(array_1: np.ndarray, array_2: np.ndarray):
|
|
|
| if len(array_1) > len(array_2):
|
| array_1 = array_1[:len(array_2)]
|
| elif len(array_1) < len(array_2):
|
| padding = len(array_2) - len(array_1)
|
| array_1 = np.pad(array_1, (0, padding), 'constant', constant_values=0)
|
|
|
| return array_1
|
|
|
| def change_pitch_semitones(y, sr, semitone_shift):
|
| factor = 2 ** (semitone_shift / 12)
|
| y_pitch_tuned = []
|
| for y_channel in y:
|
| y_pitch_tuned.append(librosa.resample(y_channel, sr, sr*factor, res_type=wav_resolution_float_resampling))
|
| y_pitch_tuned = np.array(y_pitch_tuned)
|
| new_sr = sr * factor
|
| return y_pitch_tuned, new_sr
|
|
|
| def augment_audio(export_path, audio_file, rate, is_normalization, wav_type_set, save_format=None, is_pitch=False, is_time_correction=True):
|
|
|
| wav, sr = librosa.load(audio_file, sr=44100, mono=False)
|
|
|
| if wav.ndim == 1:
|
| wav = np.asfortranarray([wav,wav])
|
|
|
| if not is_time_correction:
|
| wav_mix = change_pitch_semitones(wav, 44100, semitone_shift=-rate)[0]
|
| else:
|
| if is_pitch:
|
| wav_1 = pyrb.pitch_shift(wav[0], sr, rate, rbargs=None)
|
| wav_2 = pyrb.pitch_shift(wav[1], sr, rate, rbargs=None)
|
| else:
|
| wav_1 = pyrb.time_stretch(wav[0], sr, rate, rbargs=None)
|
| wav_2 = pyrb.time_stretch(wav[1], sr, rate, rbargs=None)
|
|
|
| if wav_1.shape > wav_2.shape:
|
| wav_2 = to_shape(wav_2, wav_1.shape)
|
| if wav_1.shape < wav_2.shape:
|
| wav_1 = to_shape(wav_1, wav_2.shape)
|
|
|
| wav_mix = np.asfortranarray([wav_1, wav_2])
|
|
|
| sf.write(export_path, normalize(wav_mix.T, is_normalization), sr, subtype=wav_type_set)
|
| save_format(export_path)
|
|
|
| def average_audio(audio):
|
|
|
| waves = []
|
| wave_shapes = []
|
| final_waves = []
|
|
|
| for i in range(len(audio)):
|
| wave = librosa.load(audio[i], sr=44100, mono=False)
|
| waves.append(wave[0])
|
| wave_shapes.append(wave[0].shape[1])
|
|
|
| wave_shapes_index = wave_shapes.index(max(wave_shapes))
|
| target_shape = waves[wave_shapes_index]
|
| waves.pop(wave_shapes_index)
|
| final_waves.append(target_shape)
|
|
|
| for n_array in waves:
|
| wav_target = to_shape(n_array, target_shape.shape)
|
| final_waves.append(wav_target)
|
|
|
| waves = sum(final_waves)
|
| waves = waves/len(audio)
|
|
|
| return waves
|
|
|
| def average_dual_sources(wav_1, wav_2, value):
|
|
|
| if wav_1.shape > wav_2.shape:
|
| wav_2 = to_shape(wav_2, wav_1.shape)
|
| if wav_1.shape < wav_2.shape:
|
| wav_1 = to_shape(wav_1, wav_2.shape)
|
|
|
| wave = (wav_1 * value) + (wav_2 * (1-value))
|
|
|
| return wave
|
|
|
| def reshape_sources(wav_1: np.ndarray, wav_2: np.ndarray):
|
|
|
| if wav_1.shape > wav_2.shape:
|
| wav_2 = to_shape(wav_2, wav_1.shape)
|
| if wav_1.shape < wav_2.shape:
|
| ln = min([wav_1.shape[1], wav_2.shape[1]])
|
| wav_2 = wav_2[:,:ln]
|
|
|
| ln = min([wav_1.shape[1], wav_2.shape[1]])
|
| wav_1 = wav_1[:,:ln]
|
| wav_2 = wav_2[:,:ln]
|
|
|
| return wav_2
|
|
|
| def reshape_sources_ref(wav_1_shape, wav_2: np.ndarray):
|
|
|
| if wav_1_shape > wav_2.shape:
|
| wav_2 = to_shape(wav_2, wav_1_shape)
|
|
|
| return wav_2
|
|
|
| def combine_arrarys(audio_sources, is_swap=False):
|
| source = np.zeros_like(max(audio_sources, key=np.size))
|
|
|
| for v in audio_sources:
|
| v = match_array_shapes(v, source, is_swap=is_swap)
|
| source += v
|
|
|
| return source
|
|
|
| def combine_audio(paths: list, audio_file_base=None, wav_type_set='FLOAT', save_format=None):
|
|
|
| source = combine_arrarys([load_audio(i) for i in paths])
|
| save_path = f"{audio_file_base}_combined.wav"
|
| sf.write(save_path, source.T, 44100, subtype=wav_type_set)
|
| save_format(save_path)
|
|
|
| def reduce_mix_bv(inst_source, voc_source, reduction_rate=0.9):
|
|
|
| inst_source = inst_source * (1 - reduction_rate)
|
|
|
| mix_reduced = combine_arrarys([inst_source, voc_source], is_swap=True)
|
|
|
| return mix_reduced
|
|
|
| def organize_inputs(inputs):
|
| input_list = {
|
| "target":None,
|
| "reference":None,
|
| "reverb":None,
|
| "inst":None
|
| }
|
|
|
| for i in inputs:
|
| if i.endswith("_(Vocals).wav"):
|
| input_list["reference"] = i
|
| elif "_RVC_" in i:
|
| input_list["target"] = i
|
| elif i.endswith("reverbed_stem.wav"):
|
| input_list["reverb"] = i
|
| elif i.endswith("_(Instrumental).wav"):
|
| input_list["inst"] = i
|
|
|
| return input_list
|
|
|
| def check_if_phase_inverted(wav1, wav2, is_mono=False):
|
|
|
| if not is_mono:
|
| wav1 = np.mean(wav1, axis=0)
|
| wav2 = np.mean(wav2, axis=0)
|
|
|
|
|
| correlation = np.corrcoef(wav1[:1000], wav2[:1000])
|
|
|
| return correlation[0,1] < 0
|
|
|
| def align_audio(file1,
|
| file2,
|
| file2_aligned,
|
| file_subtracted,
|
| wav_type_set,
|
| is_save_aligned,
|
| command_Text,
|
| save_format,
|
| align_window:list,
|
| align_intro_val:list,
|
| db_analysis:tuple,
|
| set_progress_bar,
|
| phase_option,
|
| phase_shifts,
|
| is_match_silence,
|
| is_spec_match):
|
|
|
| global progress_value
|
| progress_value = 0
|
| is_mono = False
|
|
|
| def get_diff(a, b):
|
| corr = np.correlate(a, b, "full")
|
| diff = corr.argmax() - (b.shape[0] - 1)
|
|
|
| return diff
|
|
|
| def progress_bar(length):
|
| global progress_value
|
| progress_value += 1
|
|
|
| if (0.90/length*progress_value) >= 0.9:
|
| length = progress_value + 1
|
|
|
| set_progress_bar(0.1, (0.9/length*progress_value))
|
|
|
|
|
|
|
| if file1.endswith(".mp3") and is_macos:
|
| length1 = rerun_mp3(file1)
|
| wav1, sr1 = librosa.load(file1, duration=length1, sr=44100, mono=False)
|
| else:
|
| wav1, sr1 = librosa.load(file1, sr=44100, mono=False)
|
|
|
| if file2.endswith(".mp3") and is_macos:
|
| length2 = rerun_mp3(file2)
|
| wav2, sr2 = librosa.load(file2, duration=length2, sr=44100, mono=False)
|
| else:
|
| wav2, sr2 = librosa.load(file2, sr=44100, mono=False)
|
|
|
| if wav1.ndim == 1 and wav2.ndim == 1:
|
| is_mono = True
|
| elif wav1.ndim == 1:
|
| wav1 = np.asfortranarray([wav1,wav1])
|
| elif wav2.ndim == 1:
|
| wav2 = np.asfortranarray([wav2,wav2])
|
|
|
|
|
| if phase_option == AUTO_PHASE:
|
| if check_if_phase_inverted(wav1, wav2, is_mono=is_mono):
|
| wav2 = -wav2
|
| elif phase_option == POSITIVE_PHASE:
|
| wav2 = +wav2
|
| elif phase_option == NEGATIVE_PHASE:
|
| wav2 = -wav2
|
|
|
| if is_match_silence:
|
| wav2 = adjust_leading_silence(wav2, wav1)
|
|
|
| wav1_length = int(librosa.get_duration(y=wav1, sr=44100))
|
| wav2_length = int(librosa.get_duration(y=wav2, sr=44100))
|
|
|
| if not is_mono:
|
| wav1 = wav1.transpose()
|
| wav2 = wav2.transpose()
|
|
|
| wav2_org = wav2.copy()
|
|
|
| command_Text("Processing files... \n")
|
| seconds_length = min(wav1_length, wav2_length)
|
|
|
| wav2_aligned_sources = []
|
|
|
| for sec_len in align_intro_val:
|
|
|
| sec_seg = 1 if sec_len == 1 else int(seconds_length // sec_len)
|
| index = sr1*sec_seg
|
|
|
| if is_mono:
|
| samp1, samp2 = wav1[index : index + sr1], wav2[index : index + sr1]
|
| diff = get_diff(samp1, samp2)
|
|
|
| else:
|
| index = sr1*sec_seg
|
| samp1, samp2 = wav1[index : index + sr1, 0], wav2[index : index + sr1, 0]
|
| samp1_r, samp2_r = wav1[index : index + sr1, 1], wav2[index : index + sr1, 1]
|
| diff, diff_r = get_diff(samp1, samp2), get_diff(samp1_r, samp2_r)
|
|
|
|
|
|
|
| if diff > 0:
|
| zeros_to_append = np.zeros(diff) if is_mono else np.zeros((diff, 2))
|
| wav2_aligned = np.append(zeros_to_append, wav2_org, axis=0)
|
| elif diff < 0:
|
| wav2_aligned = wav2_org[-diff:]
|
| else:
|
| wav2_aligned = wav2_org
|
|
|
|
|
| if not any(np.array_equal(wav2_aligned, source) for source in wav2_aligned_sources):
|
| wav2_aligned_sources.append(wav2_aligned)
|
|
|
|
|
|
|
| unique_sources = len(wav2_aligned_sources)
|
|
|
| sub_mapper_big_mapper = {}
|
|
|
| for s in wav2_aligned_sources:
|
| wav2_aligned = match_mono_array_shapes(s, wav1) if is_mono else match_array_shapes(s, wav1, is_swap=True)
|
|
|
| if align_window:
|
| wav_sub = time_correction(wav1, wav2_aligned, seconds_length, align_window=align_window, db_analysis=db_analysis, progress_bar=progress_bar, unique_sources=unique_sources, phase_shifts=phase_shifts)
|
| wav_sub_size = np.abs(wav_sub).mean()
|
| sub_mapper_big_mapper = {**sub_mapper_big_mapper, **{wav_sub_size:wav_sub}}
|
| else:
|
| wav2_aligned = wav2_aligned * np.power(10, db_analysis[0] / 20)
|
| db_range = db_analysis[1]
|
|
|
| for db_adjustment in db_range:
|
|
|
| s_adjusted = wav2_aligned * (10 ** (db_adjustment / 20))
|
| wav_sub = wav1 - s_adjusted
|
| wav_sub_size = np.abs(wav_sub).mean()
|
| sub_mapper_big_mapper = {**sub_mapper_big_mapper, **{wav_sub_size:wav_sub}}
|
|
|
|
|
|
|
| sub_mapper_value_list = list(sub_mapper_big_mapper.values())
|
|
|
| if is_spec_match and len(sub_mapper_value_list) >= 2:
|
|
|
| wav_sub = ensemble_for_align(list(sub_mapper_big_mapper.values()))
|
| else:
|
|
|
| wav_sub = ensemble_wav(list(sub_mapper_big_mapper.values()))
|
|
|
|
|
|
|
| wav_sub = np.clip(wav_sub, -1, +1)
|
|
|
| command_Text(f"Saving inverted track... ")
|
|
|
| if is_save_aligned or is_spec_match:
|
| wav1 = match_mono_array_shapes(wav1, wav_sub) if is_mono else match_array_shapes(wav1, wav_sub, is_swap=True)
|
| wav2_aligned = wav1 - wav_sub
|
|
|
| if is_spec_match:
|
| if wav1.ndim == 1 and wav2.ndim == 1:
|
| wav2_aligned = np.asfortranarray([wav2_aligned, wav2_aligned]).T
|
| wav1 = np.asfortranarray([wav1, wav1]).T
|
|
|
| wav2_aligned = ensemble_for_align([wav2_aligned, wav1])
|
| wav_sub = wav1 - wav2_aligned
|
|
|
| if is_save_aligned:
|
| sf.write(file2_aligned, wav2_aligned, sr1, subtype=wav_type_set)
|
| save_format(file2_aligned)
|
|
|
| sf.write(file_subtracted, wav_sub, sr1, subtype=wav_type_set)
|
| save_format(file_subtracted)
|
|
|
| def phase_shift_hilbert(signal, degree):
|
| analytic_signal = hilbert(signal)
|
| return np.cos(np.radians(degree)) * analytic_signal.real - np.sin(np.radians(degree)) * analytic_signal.imag
|
|
|
| def get_phase_shifted_tracks(track, phase_shift):
|
| if phase_shift == 180:
|
| return [track, -track]
|
|
|
| step = phase_shift
|
| end = 180 - (180 % step) if 180 % step == 0 else 181
|
| phase_range = range(step, end, step)
|
|
|
| flipped_list = [track, -track]
|
| for i in phase_range:
|
| flipped_list.extend([phase_shift_hilbert(track, i), phase_shift_hilbert(track, -i)])
|
|
|
| return flipped_list
|
|
|
| def time_correction(mix:np.ndarray, instrumental:np.ndarray, seconds_length, align_window, db_analysis, sr=44100, progress_bar=None, unique_sources=None, phase_shifts=NONE_P):
|
|
|
|
|
| def align_tracks(track1, track2):
|
|
|
| shifted_tracks = {}
|
|
|
|
|
| track2 = track2 * np.power(10, db_analysis[0] / 20)
|
| db_range = db_analysis[1]
|
|
|
| if phase_shifts == 190:
|
| track2_flipped = [track2]
|
| else:
|
| track2_flipped = get_phase_shifted_tracks(track2, phase_shifts)
|
|
|
| for db_adjustment in db_range:
|
| for t in track2_flipped:
|
|
|
| track2_adjusted = t * (10 ** (db_adjustment / 20))
|
| corr = correlate(track1, track2_adjusted)
|
| delay = np.argmax(np.abs(corr)) - (len(track1) - 1)
|
| track2_shifted = np.roll(track2_adjusted, shift=delay)
|
|
|
|
|
| track2_shifted_sub = track1 - track2_shifted
|
| mean_abs_value = np.abs(track2_shifted_sub).mean()
|
|
|
|
|
| shifted_tracks[mean_abs_value] = track2_shifted
|
|
|
|
|
|
|
| return shifted_tracks[min(shifted_tracks.keys())]
|
|
|
|
|
|
|
| assert mix.shape == instrumental.shape, f"Audio files must have the same shape - Mix: {mix.shape}, Inst: {instrumental.shape}"
|
|
|
| seconds_length = seconds_length // 2
|
|
|
| sub_mapper = {}
|
|
|
| progress_update_interval = 120
|
| total_iterations = 0
|
|
|
| if len(align_window) > 2:
|
| progress_update_interval = 320
|
|
|
| for secs in align_window:
|
| step = secs / 2
|
| window_size = int(sr * secs)
|
| step_size = int(sr * step)
|
|
|
| if len(mix.shape) == 1:
|
| total_mono = (len(range(0, len(mix) - window_size, step_size))//progress_update_interval)*unique_sources
|
| total_iterations += total_mono
|
| else:
|
| total_stereo_ = len(range(0, len(mix[:, 0]) - window_size, step_size))*2
|
| total_stereo = (total_stereo_//progress_update_interval) * unique_sources
|
| total_iterations += total_stereo
|
|
|
|
|
|
|
| for secs in align_window:
|
| sub = np.zeros_like(mix)
|
| divider = np.zeros_like(mix)
|
| step = secs / 2
|
| window_size = int(sr * secs)
|
| step_size = int(sr * step)
|
| window = np.hanning(window_size)
|
|
|
|
|
| if len(mix.shape) == 1:
|
|
|
| counter = 0
|
| for i in range(0, len(mix) - window_size, step_size):
|
| counter += 1
|
| if counter % progress_update_interval == 0:
|
| progress_bar(total_iterations)
|
| window_mix = mix[i:i+window_size] * window
|
| window_instrumental = instrumental[i:i+window_size] * window
|
| window_instrumental_aligned = align_tracks(window_mix, window_instrumental)
|
| sub[i:i+window_size] += window_mix - window_instrumental_aligned
|
| divider[i:i+window_size] += window
|
| else:
|
|
|
| counter = 0
|
| for ch in range(mix.shape[1]):
|
| for i in range(0, len(mix[:, ch]) - window_size, step_size):
|
| counter += 1
|
| if counter % progress_update_interval == 0:
|
| progress_bar(total_iterations)
|
| window_mix = mix[i:i+window_size, ch] * window
|
| window_instrumental = instrumental[i:i+window_size, ch] * window
|
| window_instrumental_aligned = align_tracks(window_mix, window_instrumental)
|
| sub[i:i+window_size, ch] += window_mix - window_instrumental_aligned
|
| divider[i:i+window_size, ch] += window
|
|
|
|
|
| sub = np.where(divider > 1e-6, sub / divider, sub)
|
| sub_size = np.abs(sub).mean()
|
| sub_mapper = {**sub_mapper, **{sub_size: sub}}
|
|
|
|
|
|
|
| sub = ensemble_wav(list(sub_mapper.values()), split_size=12)
|
|
|
| return sub
|
|
|
| def ensemble_wav(waveforms, split_size=240):
|
|
|
| waveform_thirds = {i: np.array_split(waveform, split_size) for i, waveform in enumerate(waveforms)}
|
|
|
|
|
| final_waveform = []
|
|
|
|
|
| for third_idx in range(split_size):
|
|
|
| means = [np.abs(waveform_thirds[i][third_idx]).mean() for i in range(len(waveforms))]
|
|
|
|
|
| min_index = np.argmin(means)
|
|
|
|
|
| final_waveform.append(waveform_thirds[min_index][third_idx])
|
|
|
|
|
| final_waveform = np.concatenate(final_waveform)
|
|
|
| return final_waveform
|
|
|
| def ensemble_wav_min(waveforms):
|
| for i in range(1, len(waveforms)):
|
| if i == 1:
|
| wave = waveforms[0]
|
|
|
| ln = min(len(wave), len(waveforms[i]))
|
| wave = wave[:ln]
|
| waveforms[i] = waveforms[i][:ln]
|
|
|
| wave = np.where(np.abs(waveforms[i]) <= np.abs(wave), waveforms[i], wave)
|
|
|
| return wave
|
|
|
| def align_audio_test(wav1, wav2, sr1=44100):
|
| def get_diff(a, b):
|
| corr = np.correlate(a, b, "full")
|
| diff = corr.argmax() - (b.shape[0] - 1)
|
| return diff
|
|
|
|
|
| wav1 = wav1.transpose()
|
| wav2 = wav2.transpose()
|
|
|
|
|
|
|
| wav2_org = wav2.copy()
|
|
|
|
|
| index = sr1
|
| samp1 = wav1[index : index + sr1, 0]
|
| samp2 = wav2[index : index + sr1, 0]
|
| diff = get_diff(samp1, samp2)
|
|
|
|
|
| if diff > 0:
|
| wav2_aligned = np.append(np.zeros((diff, 1)), wav2_org, axis=0)
|
| elif diff < 0:
|
| wav2_aligned = wav2_org[-diff:]
|
| else:
|
| wav2_aligned = wav2_org
|
|
|
| return wav2_aligned
|
|
|
| def load_audio(audio_file):
|
| wav, sr = librosa.load(audio_file, sr=44100, mono=False)
|
|
|
| if wav.ndim == 1:
|
| wav = np.asfortranarray([wav,wav])
|
|
|
| return wav
|
|
|
| def rerun_mp3(audio_file):
|
| with audioread.audio_open(audio_file) as f:
|
| track_length = int(f.duration)
|
|
|
| return track_length
|
|
|