noblebarkrr's picture
Upload folder using huggingface_hub
9470309 verified
import audioread
import librosa
import numpy as np
import soundfile as sf
import math
import platform
import traceback
from scipy.signal import correlate, hilbert
import io
OPERATING_SYSTEM = platform.system()
SYSTEM_ARCH = platform.platform()
SYSTEM_PROC = platform.processor()
ARM = "arm"
AUTO_PHASE = "Automatic"
POSITIVE_PHASE = "Positive Phase"
NEGATIVE_PHASE = "Negative Phase"
NONE_P = ("None",)
LOW_P = ("Shifts: Low",)
MED_P = ("Shifts: Medium",)
HIGH_P = ("Shifts: High",)
VHIGH_P = "Shifts: Very High"
MAXIMUM_P = "Shifts: Maximum"
progress_value = 0
last_update_time = 0
is_macos = False
if OPERATING_SYSTEM == "Darwin":
wav_resolution = (
"polyphase" if SYSTEM_PROC == ARM or ARM in SYSTEM_ARCH else "sinc_fastest"
)
wav_resolution_float_resampling = (
"kaiser_best" if SYSTEM_PROC == ARM or ARM in SYSTEM_ARCH else wav_resolution
)
is_macos = True
else:
wav_resolution = "sinc_fastest"
wav_resolution_float_resampling = wav_resolution
MAX_SPEC = "Max Spec"
MIN_SPEC = "Min Spec"
LIN_ENSE = "Linear Ensemble"
MAX_WAV = MAX_SPEC
MIN_WAV = MIN_SPEC
AVERAGE = "Average"
def crop_center(h1, h2):
h1_shape = h1.size()
h2_shape = h2.size()
if h1_shape[3] == h2_shape[3]:
return h1
elif h1_shape[3] < h2_shape[3]:
raise ValueError("h1_shape[3] must be greater than h2_shape[3]")
s_time = (h1_shape[3] - h2_shape[3]) // 2
e_time = s_time + h2_shape[3]
h1 = h1[:, :, :, s_time:e_time]
return h1
def preprocess(X_spec):
X_mag = np.abs(X_spec)
X_phase = np.angle(X_spec)
return X_mag, X_phase
def make_padding(width, cropsize, offset):
left = offset
roi_size = cropsize - offset * 2
if roi_size == 0:
roi_size = cropsize
right = roi_size - (width % roi_size) + left
return left, right, roi_size
def normalize(wave, max_peak=1.0, min_peak=None):
maxv = np.abs(wave).max()
if maxv > max_peak:
wave *= max_peak / maxv
elif min_peak is not None and maxv < min_peak:
wave *= min_peak / maxv
return wave
def auto_transpose(audio_array: np.ndarray):
if audio_array.shape[1] == 2:
return audio_array.T
return audio_array
def write_array_to_mem(audio_data, subtype):
if isinstance(audio_data, np.ndarray):
audio_buffer = io.BytesIO()
sf.write(audio_buffer, audio_data, 44100, subtype=subtype, format="WAV")
audio_buffer.seek(0)
return audio_buffer
else:
return audio_data
def spectrogram_to_image(spec, mode="magnitude"):
if mode == "magnitude":
if np.iscomplexobj(spec):
y = np.abs(spec)
else:
y = spec
y = np.log10(y**2 + 1e-8)
elif mode == "phase":
if np.iscomplexobj(spec):
y = np.angle(spec)
else:
y = spec
y -= y.min()
y *= 255 / y.max()
img = np.uint8(y)
if y.ndim == 3:
img = img.transpose(1, 2, 0)
img = np.concatenate([np.max(img, axis=2, keepdims=True), img], axis=2)
return img
def reduce_vocal_aggressively(X, y, softmask):
v = X - y
y_mag_tmp = np.abs(y)
v_mag_tmp = np.abs(v)
v_mask = v_mag_tmp > y_mag_tmp
y_mag = np.clip(y_mag_tmp - v_mag_tmp * v_mask * softmask, 0, np.inf)
return y_mag * np.exp(1.0j * np.angle(y))
def merge_artifacts(y_mask, thres=0.01, min_range=64, fade_size=32):
mask = y_mask
try:
if min_range < fade_size * 2:
raise ValueError("min_range must be >= fade_size * 2")
idx = np.where(y_mask.min(axis=(0, 1)) > thres)[0]
start_idx = np.insert(idx[np.where(np.diff(idx) != 1)[0] + 1], 0, idx[0])
end_idx = np.append(idx[np.where(np.diff(idx) != 1)[0]], idx[-1])
artifact_idx = np.where(end_idx - start_idx > min_range)[0]
weight = np.zeros_like(y_mask)
if len(artifact_idx) > 0:
start_idx = start_idx[artifact_idx]
end_idx = end_idx[artifact_idx]
old_e = None
for s, e in zip(start_idx, end_idx):
if old_e is not None and s - old_e < fade_size:
s = old_e - fade_size * 2
if s != 0:
weight[:, :, s : s + fade_size] = np.linspace(0, 1, fade_size)
else:
s -= fade_size
if e != y_mask.shape[2]:
weight[:, :, e - fade_size : e] = np.linspace(1, 0, fade_size)
else:
e += fade_size
weight[:, :, s + fade_size : e - fade_size] = 1
old_e = e
v_mask = 1 - y_mask
y_mask += weight * v_mask
mask = y_mask
except Exception as e:
error_name = f"{type(e).__name__}"
traceback_text = "".join(traceback.format_tb(e.__traceback__))
message = f'{error_name}: "{e}"\n{traceback_text}"'
print("Post Process Failed: ", message)
return mask
def align_wave_head_and_tail(a, b):
l = min([a[0].size, b[0].size])
return a[:l, :l], b[:l, :l]
def convert_channels(spec, mp, band):
cc = mp.param["band"][band].get("convert_channels")
if "mid_side_c" == cc:
spec_left = np.add(spec[0], spec[1] * 0.25)
spec_right = np.subtract(spec[1], spec[0] * 0.25)
elif "mid_side" == cc:
spec_left = np.add(spec[0], spec[1]) / 2
spec_right = np.subtract(spec[0], spec[1])
elif "stereo_n" == cc:
spec_left = np.add(spec[0], spec[1] * 0.25) / 0.9375
spec_right = np.add(spec[1], spec[0] * 0.25) / 0.9375
else:
return spec
return np.asfortranarray([spec_left, spec_right])
def combine_spectrograms(specs, mp, is_v51_model=False):
l = min([specs[i].shape[2] for i in specs])
spec_c = np.zeros(shape=(2, mp.param["bins"] + 1, l), dtype=np.complex64)
offset = 0
bands_n = len(mp.param["band"])
for d in range(1, bands_n + 1):
h = mp.param["band"][d]["crop_stop"] - mp.param["band"][d]["crop_start"]
spec_c[:, offset : offset + h, :l] = specs[d][
:, mp.param["band"][d]["crop_start"] : mp.param["band"][d]["crop_stop"], :l
]
offset += h
if offset > mp.param["bins"]:
raise ValueError("Too much bins")
if mp.param["pre_filter_start"] > 0:
if is_v51_model:
spec_c *= get_lp_filter_mask(
spec_c.shape[1],
mp.param["pre_filter_start"],
mp.param["pre_filter_stop"],
)
else:
if bands_n == 1:
spec_c = fft_lp_filter(
spec_c, mp.param["pre_filter_start"], mp.param["pre_filter_stop"]
)
else:
gp = 1
for b in range(
mp.param["pre_filter_start"] + 1, mp.param["pre_filter_stop"]
):
g = math.pow(
10, -(b - mp.param["pre_filter_start"]) * (3.5 - gp) / 20.0
)
gp = g
spec_c[:, b, :] *= g
return np.asfortranarray(spec_c)
def wave_to_spectrogram(wave, hop_length, n_fft, mp, band, is_v51_model=False):
if wave.ndim == 1:
wave = np.asfortranarray([wave, wave])
if not is_v51_model:
if mp.param["reverse"]:
wave_left = np.flip(np.asfortranarray(wave[0]))
wave_right = np.flip(np.asfortranarray(wave[1]))
elif mp.param["mid_side"]:
wave_left = np.asfortranarray(np.add(wave[0], wave[1]) / 2)
wave_right = np.asfortranarray(np.subtract(wave[0], wave[1]))
elif mp.param["mid_side_b2"]:
wave_left = np.asfortranarray(np.add(wave[1], wave[0] * 0.5))
wave_right = np.asfortranarray(np.subtract(wave[0], wave[1] * 0.5))
else:
wave_left = np.asfortranarray(wave[0])
wave_right = np.asfortranarray(wave[1])
else:
wave_left = np.asfortranarray(wave[0])
wave_right = np.asfortranarray(wave[1])
spec_left = librosa.stft(wave_left, n_fft=n_fft, hop_length=hop_length)
spec_right = librosa.stft(wave_right, n_fft=n_fft, hop_length=hop_length)
spec = np.asfortranarray([spec_left, spec_right])
if is_v51_model:
spec = convert_channels(spec, mp, band)
return spec
def spectrogram_to_wave(spec, hop_length=1024, mp={}, band=0, is_v51_model=True):
spec_left = np.asfortranarray(spec[0])
spec_right = np.asfortranarray(spec[1])
wave_left = librosa.istft(spec_left, hop_length=hop_length)
wave_right = librosa.istft(spec_right, hop_length=hop_length)
if is_v51_model:
cc = mp.param["band"][band].get("convert_channels")
if "mid_side_c" == cc:
return np.asfortranarray(
[
np.subtract(wave_left / 1.0625, wave_right / 4.25),
np.add(wave_right / 1.0625, wave_left / 4.25),
]
)
elif "mid_side" == cc:
return np.asfortranarray(
[
np.add(wave_left, wave_right / 2),
np.subtract(wave_left, wave_right / 2),
]
)
elif "stereo_n" == cc:
return np.asfortranarray(
[
np.subtract(wave_left, wave_right * 0.25),
np.subtract(wave_right, wave_left * 0.25),
]
)
else:
if mp.param["reverse"]:
return np.asfortranarray([np.flip(wave_left), np.flip(wave_right)])
elif mp.param["mid_side"]:
return np.asfortranarray(
[
np.add(wave_left, wave_right / 2),
np.subtract(wave_left, wave_right / 2),
]
)
elif mp.param["mid_side_b2"]:
return np.asfortranarray(
[
np.add(wave_right / 1.25, 0.4 * wave_left),
np.subtract(wave_left / 1.25, 0.4 * wave_right),
]
)
return np.asfortranarray([wave_left, wave_right])
def cmb_spectrogram_to_wave(
spec_m, mp, extra_bins_h=None, extra_bins=None, is_v51_model=False
):
bands_n = len(mp.param["band"])
offset = 0
for d in range(1, bands_n + 1):
bp = mp.param["band"][d]
spec_s = np.zeros(
shape=(2, bp["n_fft"] // 2 + 1, spec_m.shape[2]), dtype=complex
)
h = bp["crop_stop"] - bp["crop_start"]
spec_s[:, bp["crop_start"] : bp["crop_stop"], :] = spec_m[
:, offset : offset + h, :
]
offset += h
if d == bands_n:
if extra_bins_h:
max_bin = bp["n_fft"] // 2
spec_s[:, max_bin - extra_bins_h : max_bin, :] = extra_bins[
:, :extra_bins_h, :
]
if bp["hpf_start"] > 0:
if is_v51_model:
spec_s *= get_hp_filter_mask(
spec_s.shape[1], bp["hpf_start"], bp["hpf_stop"] - 1
)
else:
spec_s = fft_hp_filter(spec_s, bp["hpf_start"], bp["hpf_stop"] - 1)
if bands_n == 1:
wave = spectrogram_to_wave(spec_s, bp["hl"], mp, d, is_v51_model)
else:
wave = np.add(
wave, spectrogram_to_wave(spec_s, bp["hl"], mp, d, is_v51_model)
)
else:
sr = mp.param["band"][d + 1]["sr"]
if d == 1:
if is_v51_model:
spec_s *= get_lp_filter_mask(
spec_s.shape[1], bp["lpf_start"], bp["lpf_stop"]
)
else:
spec_s = fft_lp_filter(spec_s, bp["lpf_start"], bp["lpf_stop"])
try:
wave = librosa.resample(
spectrogram_to_wave(spec_s, bp["hl"], mp, d, is_v51_model),
orig_sr=bp["sr"],
target_sr=sr,
res_type=wav_resolution,
)
except ValueError as e:
print(f"Error during resampling: {e}")
print(
f"Spec_s shape: {spec_s.shape}, SR: {sr}, Res type: {wav_resolution}"
)
else:
if is_v51_model:
spec_s *= get_hp_filter_mask(
spec_s.shape[1], bp["hpf_start"], bp["hpf_stop"] - 1
)
spec_s *= get_lp_filter_mask(
spec_s.shape[1], bp["lpf_start"], bp["lpf_stop"]
)
else:
spec_s = fft_hp_filter(spec_s, bp["hpf_start"], bp["hpf_stop"] - 1)
spec_s = fft_lp_filter(spec_s, bp["lpf_start"], bp["lpf_stop"])
wave2 = np.add(
wave, spectrogram_to_wave(spec_s, bp["hl"], mp, d, is_v51_model)
)
try:
wave = librosa.resample(
wave2, orig_sr=bp["sr"], target_sr=sr, res_type=wav_resolution
)
except ValueError as e:
print(f"Error during resampling: {e}")
print(
f"Spec_s shape: {spec_s.shape}, SR: {sr}, Res type: {wav_resolution}"
)
return wave
def get_lp_filter_mask(n_bins, bin_start, bin_stop):
mask = np.concatenate(
[
np.ones((bin_start - 1, 1)),
np.linspace(1, 0, bin_stop - bin_start + 1)[:, None],
np.zeros((n_bins - bin_stop, 1)),
],
axis=0,
)
return mask
def get_hp_filter_mask(n_bins, bin_start, bin_stop):
mask = np.concatenate(
[
np.zeros((bin_stop + 1, 1)),
np.linspace(0, 1, 1 + bin_start - bin_stop)[:, None],
np.ones((n_bins - bin_start - 2, 1)),
],
axis=0,
)
return mask
def fft_lp_filter(spec, bin_start, bin_stop):
g = 1.0
for b in range(bin_start, bin_stop):
g -= 1 / (bin_stop - bin_start)
spec[:, b, :] = g * spec[:, b, :]
spec[:, bin_stop:, :] *= 0
return spec
def fft_hp_filter(spec, bin_start, bin_stop):
g = 1.0
for b in range(bin_start, bin_stop, -1):
g -= 1 / (bin_start - bin_stop)
spec[:, b, :] = g * spec[:, b, :]
spec[:, 0 : bin_stop + 1, :] *= 0
return spec
def spectrogram_to_wave_old(spec, hop_length=1024):
if spec.ndim == 2:
wave = librosa.istft(spec, hop_length=hop_length)
elif spec.ndim == 3:
spec_left = np.asfortranarray(spec[0])
spec_right = np.asfortranarray(spec[1])
wave_left = librosa.istft(spec_left, hop_length=hop_length)
wave_right = librosa.istft(spec_right, hop_length=hop_length)
wave = np.asfortranarray([wave_left, wave_right])
return wave
def wave_to_spectrogram_old(wave, hop_length, n_fft):
wave_left = np.asfortranarray(wave[0])
wave_right = np.asfortranarray(wave[1])
spec_left = librosa.stft(wave_left, n_fft=n_fft, hop_length=hop_length)
spec_right = librosa.stft(wave_right, n_fft=n_fft, hop_length=hop_length)
spec = np.asfortranarray([spec_left, spec_right])
return spec
def mirroring(a, spec_m, input_high_end, mp):
if "mirroring" == a:
mirror = np.flip(
np.abs(
spec_m[
:,
mp.param["pre_filter_start"]
- 10
- input_high_end.shape[1] : mp.param["pre_filter_start"]
- 10,
:,
]
),
1,
)
mirror = mirror * np.exp(1.0j * np.angle(input_high_end))
return np.where(
np.abs(input_high_end) <= np.abs(mirror), input_high_end, mirror
)
if "mirroring2" == a:
mirror = np.flip(
np.abs(
spec_m[
:,
mp.param["pre_filter_start"]
- 10
- input_high_end.shape[1] : mp.param["pre_filter_start"]
- 10,
:,
]
),
1,
)
mi = np.multiply(mirror, input_high_end * 1.7)
return np.where(np.abs(input_high_end) <= np.abs(mi), input_high_end, mi)
def adjust_aggr(mask, is_non_accom_stem, aggressiveness):
aggr = aggressiveness["value"] * 2
if aggr != 0:
if is_non_accom_stem:
aggr = 1 - aggr
if np.any(aggr > 10) or np.any(aggr < -10):
print(f"Warning: Extreme aggressiveness values detected: {aggr}")
aggr = [aggr, aggr]
if aggressiveness["aggr_correction"] is not None:
aggr[0] += aggressiveness["aggr_correction"]["left"]
aggr[1] += aggressiveness["aggr_correction"]["right"]
for ch in range(2):
mask[ch, : aggressiveness["split_bin"]] = np.power(
mask[ch, : aggressiveness["split_bin"]], 1 + aggr[ch] / 3
)
mask[ch, aggressiveness["split_bin"] :] = np.power(
mask[ch, aggressiveness["split_bin"] :], 1 + aggr[ch]
)
return mask
def stft(wave, nfft, hl):
wave_left = np.asfortranarray(wave[0])
wave_right = np.asfortranarray(wave[1])
spec_left = librosa.stft(wave_left, n_fft=nfft, hop_length=hl)
spec_right = librosa.stft(wave_right, n_fft=nfft, hop_length=hl)
spec = np.asfortranarray([spec_left, spec_right])
return spec
def istft(spec, hl):
spec_left = np.asfortranarray(spec[0])
spec_right = np.asfortranarray(spec[1])
wave_left = librosa.istft(spec_left, hop_length=hl)
wave_right = librosa.istft(spec_right, hop_length=hl)
wave = np.asfortranarray([wave_left, wave_right])
return wave
def spec_effects(wave, algorithm="Default", value=None):
if np.isnan(wave).any() or np.isinf(wave).any():
print(
f"Warning: Detected NaN or infinite values in wave input. Shape: {wave.shape}"
)
spec = [stft(wave[0], 2048, 1024), stft(wave[1], 2048, 1024)]
if algorithm == "Min_Mag":
v_spec_m = np.where(np.abs(spec[1]) <= np.abs(spec[0]), spec[1], spec[0])
wave = istft(v_spec_m, 1024)
elif algorithm == "Max_Mag":
v_spec_m = np.where(np.abs(spec[1]) >= np.abs(spec[0]), spec[1], spec[0])
wave = istft(v_spec_m, 1024)
elif algorithm == "Default":
wave = (wave[1] * value) + (wave[0] * (1 - value))
elif algorithm == "Invert_p":
X_mag = np.abs(spec[0])
y_mag = np.abs(spec[1])
max_mag = np.where(X_mag >= y_mag, X_mag, y_mag)
v_spec = spec[1] - max_mag * np.exp(1.0j * np.angle(spec[0]))
wave = istft(v_spec, 1024)
return wave
def spectrogram_to_wave_no_mp(spec, n_fft=2048, hop_length=1024):
wave = librosa.istft(spec, n_fft=n_fft, hop_length=hop_length)
if wave.ndim == 1:
wave = np.asfortranarray([wave, wave])
return wave
def wave_to_spectrogram_no_mp(wave):
spec = librosa.stft(wave, n_fft=2048, hop_length=1024)
if spec.ndim == 1:
spec = np.asfortranarray([spec, spec])
return spec
def invert_audio(specs, invert_p=True):
ln = min([specs[0].shape[2], specs[1].shape[2]])
specs[0] = specs[0][:, :, :ln]
specs[1] = specs[1][:, :, :ln]
if invert_p:
X_mag = np.abs(specs[0])
y_mag = np.abs(specs[1])
max_mag = np.where(X_mag >= y_mag, X_mag, y_mag)
v_spec = specs[1] - max_mag * np.exp(1.0j * np.angle(specs[0]))
else:
specs[1] = reduce_vocal_aggressively(specs[0], specs[1], 0.2)
v_spec = specs[0] - specs[1]
return v_spec
def invert_stem(mixture, stem):
mixture = wave_to_spectrogram_no_mp(mixture)
stem = wave_to_spectrogram_no_mp(stem)
output = spectrogram_to_wave_no_mp(invert_audio([mixture, stem]))
return -output.T
def ensembling(a, inputs, is_wavs=False):
for i in range(1, len(inputs)):
if i == 1:
input = inputs[0]
if is_wavs:
ln = min([input.shape[1], inputs[i].shape[1]])
input = input[:, :ln]
inputs[i] = inputs[i][:, :ln]
else:
ln = min([input.shape[2], inputs[i].shape[2]])
input = input[:, :, :ln]
inputs[i] = inputs[i][:, :, :ln]
if MIN_SPEC == a:
input = np.where(np.abs(inputs[i]) <= np.abs(input), inputs[i], input)
if MAX_SPEC == a:
input = np.where(np.abs(inputs[i]) >= np.abs(input), inputs[i], input)
return input
def ensemble_for_align(waves):
specs = []
for wav in waves:
spec = wave_to_spectrogram_no_mp(wav.T)
specs.append(spec)
wav_aligned = spectrogram_to_wave_no_mp(ensembling(MIN_SPEC, specs)).T
wav_aligned = match_array_shapes(wav_aligned, waves[1], is_swap=True)
return wav_aligned
def ensemble_inputs(
audio_input,
algorithm,
is_normalization,
wav_type_set,
save_path,
is_wave=False,
is_array=False,
):
wavs_ = []
if algorithm == AVERAGE:
output = average_audio(audio_input)
samplerate = 44100
else:
specs = []
for i in range(len(audio_input)):
wave, samplerate = librosa.load(audio_input[i], mono=False, sr=44100)
wavs_.append(wave)
spec = wave if is_wave else wave_to_spectrogram_no_mp(wave)
specs.append(spec)
wave_shapes = [w.shape[1] for w in wavs_]
target_shape = wavs_[wave_shapes.index(max(wave_shapes))]
if is_wave:
output = ensembling(algorithm, specs, is_wavs=True)
else:
output = spectrogram_to_wave_no_mp(ensembling(algorithm, specs))
output = to_shape(output, target_shape.shape)
sf.write(
save_path,
normalize(output.T, is_normalization),
samplerate,
subtype=wav_type_set,
)
def to_shape(x, target_shape):
padding_list = []
for x_dim, target_dim in zip(x.shape, target_shape):
pad_value = target_dim - x_dim
pad_tuple = (0, pad_value)
padding_list.append(pad_tuple)
return np.pad(x, tuple(padding_list), mode="constant")
def to_shape_minimize(x: np.ndarray, target_shape):
padding_list = []
for x_dim, target_dim in zip(x.shape, target_shape):
pad_value = target_dim - x_dim
pad_tuple = (0, pad_value)
padding_list.append(pad_tuple)
return np.pad(x, tuple(padding_list), mode="constant")
def detect_leading_silence(audio, sr, silence_threshold=0.007, frame_length=1024):
if len(audio.shape) == 2:
channel = np.argmax(np.sum(np.abs(audio), axis=1))
audio = audio[channel]
for i in range(0, len(audio), frame_length):
if np.max(np.abs(audio[i : i + frame_length])) > silence_threshold:
return (i / sr) * 1000
return (len(audio) / sr) * 1000
def adjust_leading_silence(
target_audio, reference_audio, silence_threshold=0.01, frame_length=1024
):
def find_silence_end(audio):
if len(audio.shape) == 2:
channel = np.argmax(np.sum(np.abs(audio), axis=1))
audio_mono = audio[channel]
else:
audio_mono = audio
for i in range(0, len(audio_mono), frame_length):
if np.max(np.abs(audio_mono[i : i + frame_length])) > silence_threshold:
return i
return len(audio_mono)
ref_silence_end = find_silence_end(reference_audio)
target_silence_end = find_silence_end(target_audio)
silence_difference = ref_silence_end - target_silence_end
try:
ref_silence_end_p = (ref_silence_end / 44100) * 1000
target_silence_end_p = (target_silence_end / 44100) * 1000
silence_difference_p = ref_silence_end_p - target_silence_end_p
print("silence_difference: ", silence_difference_p)
except Exception as e:
pass
if silence_difference > 0:
if len(target_audio.shape) == 2:
silence_to_add = np.zeros((target_audio.shape[0], silence_difference))
else:
silence_to_add = np.zeros(silence_difference)
return np.hstack((silence_to_add, target_audio))
elif silence_difference < 0:
if len(target_audio.shape) == 2:
return target_audio[:, -silence_difference:]
else:
return target_audio[-silence_difference:]
else:
return target_audio
def match_array_shapes(array_1: np.ndarray, array_2: np.ndarray, is_swap=False):
if is_swap:
array_1, array_2 = array_1.T, array_2.T
if array_1.shape[1] > array_2.shape[1]:
array_1 = array_1[:, : array_2.shape[1]]
elif array_1.shape[1] < array_2.shape[1]:
padding = array_2.shape[1] - array_1.shape[1]
array_1 = np.pad(array_1, ((0, 0), (0, padding)), "constant", constant_values=0)
if is_swap:
array_1, array_2 = array_1.T, array_2.T
return array_1
def match_mono_array_shapes(array_1: np.ndarray, array_2: np.ndarray):
if len(array_1) > len(array_2):
array_1 = array_1[: len(array_2)]
elif len(array_1) < len(array_2):
padding = len(array_2) - len(array_1)
array_1 = np.pad(array_1, (0, padding), "constant", constant_values=0)
return array_1
def change_pitch_semitones(y, sr, semitone_shift):
factor = 2 ** (semitone_shift / 12)
y_pitch_tuned = []
for y_channel in y:
y_pitch_tuned.append(
librosa.resample(
y_channel,
orig_sr=sr,
target_sr=sr * factor,
res_type=wav_resolution_float_resampling,
)
)
y_pitch_tuned = np.array(y_pitch_tuned)
new_sr = sr * factor
return y_pitch_tuned, new_sr
def average_audio(audio):
waves = []
wave_shapes = []
final_waves = []
for i in range(len(audio)):
wave = librosa.load(audio[i], sr=44100, mono=False)
waves.append(wave[0])
wave_shapes.append(wave[0].shape[1])
wave_shapes_index = wave_shapes.index(max(wave_shapes))
target_shape = waves[wave_shapes_index]
waves.pop(wave_shapes_index)
final_waves.append(target_shape)
for n_array in waves:
wav_target = to_shape(n_array, target_shape.shape)
final_waves.append(wav_target)
waves = sum(final_waves)
waves = waves / len(audio)
return waves
def average_dual_sources(wav_1, wav_2, value):
if wav_1.shape > wav_2.shape:
wav_2 = to_shape(wav_2, wav_1.shape)
if wav_1.shape < wav_2.shape:
wav_1 = to_shape(wav_1, wav_2.shape)
wave = (wav_1 * value) + (wav_2 * (1 - value))
return wave
def reshape_sources(wav_1: np.ndarray, wav_2: np.ndarray):
if wav_1.shape > wav_2.shape:
wav_2 = to_shape(wav_2, wav_1.shape)
if wav_1.shape < wav_2.shape:
ln = min([wav_1.shape[1], wav_2.shape[1]])
wav_2 = wav_2[:, :ln]
ln = min([wav_1.shape[1], wav_2.shape[1]])
wav_1 = wav_1[:, :ln]
wav_2 = wav_2[:, :ln]
return wav_2
def reshape_sources_ref(wav_1_shape, wav_2: np.ndarray):
if wav_1_shape > wav_2.shape:
wav_2 = to_shape(wav_2, wav_1_shape)
return wav_2
def combine_arrarys(audio_sources, is_swap=False):
source = np.zeros_like(max(audio_sources, key=np.size))
for v in audio_sources:
v = match_array_shapes(v, source, is_swap=is_swap)
source += v
return source
def combine_audio(
paths: list, audio_file_base=None, wav_type_set="FLOAT", save_format=None
):
source = combine_arrarys([load_audio(i) for i in paths])
save_path = f"{audio_file_base}_combined.wav"
sf.write(save_path, source.T, 44100, subtype=wav_type_set)
save_format(save_path)
def reduce_mix_bv(inst_source, voc_source, reduction_rate=0.9):
inst_source = inst_source * (1 - reduction_rate)
mix_reduced = combine_arrarys([inst_source, voc_source], is_swap=True)
return mix_reduced
def organize_inputs(inputs):
input_list = {"target": None, "reference": None, "reverb": None, "inst": None}
for i in inputs:
if i.endswith("_(Vocals).wav"):
input_list["reference"] = i
elif "_RVC_" in i:
input_list["target"] = i
elif i.endswith("reverbed_stem.wav"):
input_list["reverb"] = i
elif i.endswith("_(Instrumental).wav"):
input_list["inst"] = i
return input_list
def check_if_phase_inverted(wav1, wav2, is_mono=False):
if not is_mono:
wav1 = np.mean(wav1, axis=0)
wav2 = np.mean(wav2, axis=0)
correlation = np.corrcoef(wav1[:1000], wav2[:1000])
return correlation[0, 1] < 0
def align_audio(
file1,
file2,
file2_aligned,
file_subtracted,
wav_type_set,
is_save_aligned,
command_Text,
save_format,
align_window: list,
align_intro_val: list,
db_analysis: tuple,
set_progress_bar,
phase_option,
phase_shifts,
is_match_silence,
is_spec_match,
):
global progress_value
progress_value = 0
is_mono = False
def get_diff(a, b):
corr = np.correlate(a, b, "full")
diff = corr.argmax() - (b.shape[0] - 1)
return diff
def progress_bar(length):
global progress_value
progress_value += 1
if (0.90 / length * progress_value) >= 0.9:
length = progress_value + 1
set_progress_bar(0.1, (0.9 / length * progress_value))
if file1.endswith(".mp3") and is_macos:
length1 = rerun_mp3(file1)
wav1, sr1 = librosa.load(file1, duration=length1, sr=44100, mono=False)
else:
wav1, sr1 = librosa.load(file1, sr=44100, mono=False)
if file2.endswith(".mp3") and is_macos:
length2 = rerun_mp3(file2)
wav2, sr2 = librosa.load(file2, duration=length2, sr=44100, mono=False)
else:
wav2, sr2 = librosa.load(file2, sr=44100, mono=False)
if wav1.ndim == 1 and wav2.ndim == 1:
is_mono = True
elif wav1.ndim == 1:
wav1 = np.asfortranarray([wav1, wav1])
elif wav2.ndim == 1:
wav2 = np.asfortranarray([wav2, wav2])
if phase_option == AUTO_PHASE:
if check_if_phase_inverted(wav1, wav2, is_mono=is_mono):
wav2 = -wav2
elif phase_option == POSITIVE_PHASE:
wav2 = +wav2
elif phase_option == NEGATIVE_PHASE:
wav2 = -wav2
if is_match_silence:
wav2 = adjust_leading_silence(wav2, wav1)
wav1_length = int(librosa.get_duration(y=wav1, sr=44100))
wav2_length = int(librosa.get_duration(y=wav2, sr=44100))
if not is_mono:
wav1 = wav1.transpose()
wav2 = wav2.transpose()
wav2_org = wav2.copy()
command_Text("Processing files... \n")
seconds_length = min(wav1_length, wav2_length)
wav2_aligned_sources = []
for sec_len in align_intro_val:
sec_seg = 1 if sec_len == 1 else int(seconds_length // sec_len)
index = sr1 * sec_seg
if is_mono:
samp1, samp2 = wav1[index : index + sr1], wav2[index : index + sr1]
diff = get_diff(samp1, samp2)
else:
index = sr1 * sec_seg
samp1, samp2 = wav1[index : index + sr1, 0], wav2[index : index + sr1, 0]
samp1_r, samp2_r = (
wav1[index : index + sr1, 1],
wav2[index : index + sr1, 1],
)
diff, diff_r = get_diff(samp1, samp2), get_diff(samp1_r, samp2_r)
if diff > 0:
zeros_to_append = np.zeros(diff) if is_mono else np.zeros((diff, 2))
wav2_aligned = np.append(zeros_to_append, wav2_org, axis=0)
elif diff < 0:
wav2_aligned = wav2_org[-diff:]
else:
wav2_aligned = wav2_org
if not any(
np.array_equal(wav2_aligned, source) for source in wav2_aligned_sources
):
wav2_aligned_sources.append(wav2_aligned)
unique_sources = len(wav2_aligned_sources)
sub_mapper_big_mapper = {}
for s in wav2_aligned_sources:
wav2_aligned = (
match_mono_array_shapes(s, wav1)
if is_mono
else match_array_shapes(s, wav1, is_swap=True)
)
if align_window:
wav_sub = time_correction(
wav1,
wav2_aligned,
seconds_length,
align_window=align_window,
db_analysis=db_analysis,
progress_bar=progress_bar,
unique_sources=unique_sources,
phase_shifts=phase_shifts,
)
wav_sub_size = np.abs(wav_sub).mean()
sub_mapper_big_mapper = {**sub_mapper_big_mapper, **{wav_sub_size: wav_sub}}
else:
wav2_aligned = wav2_aligned * np.power(10, db_analysis[0] / 20)
db_range = db_analysis[1]
for db_adjustment in db_range:
s_adjusted = wav2_aligned * (10 ** (db_adjustment / 20))
wav_sub = wav1 - s_adjusted
wav_sub_size = np.abs(wav_sub).mean()
sub_mapper_big_mapper = {
**sub_mapper_big_mapper,
**{wav_sub_size: wav_sub},
}
sub_mapper_value_list = list(sub_mapper_big_mapper.values())
if is_spec_match and len(sub_mapper_value_list) >= 2:
wav_sub = ensemble_for_align(list(sub_mapper_big_mapper.values()))
else:
wav_sub = ensemble_wav(list(sub_mapper_big_mapper.values()))
wav_sub = np.clip(wav_sub, -1, +1)
command_Text(f"Saving inverted track... ")
if is_save_aligned or is_spec_match:
wav1 = (
match_mono_array_shapes(wav1, wav_sub)
if is_mono
else match_array_shapes(wav1, wav_sub, is_swap=True)
)
wav2_aligned = wav1 - wav_sub
if is_spec_match:
if wav1.ndim == 1 and wav2.ndim == 1:
wav2_aligned = np.asfortranarray([wav2_aligned, wav2_aligned]).T
wav1 = np.asfortranarray([wav1, wav1]).T
wav2_aligned = ensemble_for_align([wav2_aligned, wav1])
wav_sub = wav1 - wav2_aligned
if is_save_aligned:
sf.write(file2_aligned, wav2_aligned, sr1, subtype=wav_type_set)
save_format(file2_aligned)
sf.write(file_subtracted, wav_sub, sr1, subtype=wav_type_set)
save_format(file_subtracted)
def phase_shift_hilbert(signal, degree):
analytic_signal = hilbert(signal)
return (
np.cos(np.radians(degree)) * analytic_signal.real
- np.sin(np.radians(degree)) * analytic_signal.imag
)
def get_phase_shifted_tracks(track, phase_shift):
if phase_shift == 180:
return [track, -track]
step = phase_shift
end = 180 - (180 % step) if 180 % step == 0 else 181
phase_range = range(step, end, step)
flipped_list = [track, -track]
for i in phase_range:
flipped_list.extend(
[phase_shift_hilbert(track, i), phase_shift_hilbert(track, -i)]
)
return flipped_list
def time_correction(
mix: np.ndarray,
instrumental: np.ndarray,
seconds_length,
align_window,
db_analysis,
sr=44100,
progress_bar=None,
unique_sources=None,
phase_shifts=NONE_P,
):
def align_tracks(track1, track2):
shifted_tracks = {}
track2 = track2 * np.power(10, db_analysis[0] / 20)
db_range = db_analysis[1]
if phase_shifts == 190:
track2_flipped = [track2]
else:
track2_flipped = get_phase_shifted_tracks(track2, phase_shifts)
for db_adjustment in db_range:
for t in track2_flipped:
track2_adjusted = t * (10 ** (db_adjustment / 20))
corr = correlate(track1, track2_adjusted)
delay = np.argmax(np.abs(corr)) - (len(track1) - 1)
track2_shifted = np.roll(track2_adjusted, shift=delay)
track2_shifted_sub = track1 - track2_shifted
mean_abs_value = np.abs(track2_shifted_sub).mean()
shifted_tracks[mean_abs_value] = track2_shifted
return shifted_tracks[min(shifted_tracks.keys())]
assert (
mix.shape == instrumental.shape
), f"Audio files must have the same shape - Mix: {mix.shape}, Inst: {instrumental.shape}"
seconds_length = seconds_length // 2
sub_mapper = {}
progress_update_interval = 120
total_iterations = 0
if len(align_window) > 2:
progress_update_interval = 320
for secs in align_window:
step = secs / 2
window_size = int(sr * secs)
step_size = int(sr * step)
if len(mix.shape) == 1:
total_mono = (
len(range(0, len(mix) - window_size, step_size))
// progress_update_interval
) * unique_sources
total_iterations += total_mono
else:
total_stereo_ = len(range(0, len(mix[:, 0]) - window_size, step_size)) * 2
total_stereo = (total_stereo_ // progress_update_interval) * unique_sources
total_iterations += total_stereo
for secs in align_window:
sub = np.zeros_like(mix)
divider = np.zeros_like(mix)
step = secs / 2
window_size = int(sr * secs)
step_size = int(sr * step)
window = np.hanning(window_size)
if len(mix.shape) == 1:
counter = 0
for i in range(0, len(mix) - window_size, step_size):
counter += 1
if counter % progress_update_interval == 0:
progress_bar(total_iterations)
window_mix = mix[i : i + window_size] * window
window_instrumental = instrumental[i : i + window_size] * window
window_instrumental_aligned = align_tracks(
window_mix, window_instrumental
)
sub[i : i + window_size] += window_mix - window_instrumental_aligned
divider[i : i + window_size] += window
else:
counter = 0
for ch in range(mix.shape[1]):
for i in range(0, len(mix[:, ch]) - window_size, step_size):
counter += 1
if counter % progress_update_interval == 0:
progress_bar(total_iterations)
window_mix = mix[i : i + window_size, ch] * window
window_instrumental = instrumental[i : i + window_size, ch] * window
window_instrumental_aligned = align_tracks(
window_mix, window_instrumental
)
sub[i : i + window_size, ch] += (
window_mix - window_instrumental_aligned
)
divider[i : i + window_size, ch] += window
sub = np.where(divider > 1e-6, sub / divider, sub)
sub_size = np.abs(sub).mean()
sub_mapper = {**sub_mapper, **{sub_size: sub}}
sub = ensemble_wav(list(sub_mapper.values()), split_size=12)
return sub
def ensemble_wav(waveforms, split_size=240):
waveform_thirds = {
i: np.array_split(waveform, split_size) for i, waveform in enumerate(waveforms)
}
final_waveform = []
for third_idx in range(split_size):
means = [
np.abs(waveform_thirds[i][third_idx]).mean() for i in range(len(waveforms))
]
min_index = np.argmin(means)
final_waveform.append(waveform_thirds[min_index][third_idx])
final_waveform = np.concatenate(final_waveform)
return final_waveform
def ensemble_wav_min(waveforms):
for i in range(1, len(waveforms)):
if i == 1:
wave = waveforms[0]
ln = min(len(wave), len(waveforms[i]))
wave = wave[:ln]
waveforms[i] = waveforms[i][:ln]
wave = np.where(np.abs(waveforms[i]) <= np.abs(wave), waveforms[i], wave)
return wave
def align_audio_test(wav1, wav2, sr1=44100):
def get_diff(a, b):
corr = np.correlate(a, b, "full")
diff = corr.argmax() - (b.shape[0] - 1)
return diff
wav1 = wav1.transpose()
wav2 = wav2.transpose()
wav2_org = wav2.copy()
index = sr1
samp1 = wav1[index : index + sr1, 0]
samp2 = wav2[index : index + sr1, 0]
diff = get_diff(samp1, samp2)
if diff > 0:
wav2_aligned = np.append(np.zeros((diff, 1)), wav2_org, axis=0)
elif diff < 0:
wav2_aligned = wav2_org[-diff:]
else:
wav2_aligned = wav2_org
return wav2_aligned
def load_audio(audio_file):
wav, sr = librosa.load(audio_file, sr=44100, mono=False)
if wav.ndim == 1:
wav = np.asfortranarray([wav, wav])
return wav
def rerun_mp3(audio_file):
with audioread.audio_open(audio_file) as f:
track_length = int(f.duration)
return track_length