| import gradio as gr |
| import pyworld |
| import numpy as np |
| from scipy.io import wavfile |
| from wsola import WSOLA |
| from scipy.signal import firwin, lfilter, resample, filtfilt |
| from numpy.fft import fft, ifft |
| import librosa |
| import soundfile as sf |
|
|
| |
| |
| |
|
|
| def shift_pitch(signal, fs, f_ratio): |
| peaks = find_peaks(signal, fs) |
| new_signal = psola(signal, peaks, f_ratio) |
| return new_signal |
|
|
|
|
| def find_peaks(signal, fs, max_hz=950, min_hz=75, analysis_win_ms=40, max_change=1.005, min_change=0.995): |
| N = len(signal) |
| min_period = fs // max_hz |
| max_period = fs // min_hz |
|
|
| |
| sequence = int(analysis_win_ms / 1000 * fs) |
| periods = compute_periods_per_sequence(signal, sequence, min_period, max_period) |
|
|
| |
| mean_period = np.mean(periods) |
| max_period = int(mean_period * 1.1) |
| min_period = int(mean_period * 0.9) |
| periods = compute_periods_per_sequence(signal, sequence, min_period, max_period) |
|
|
| |
| peaks = [np.argmax(signal[:int(periods[0]*1.1)])] |
| while True: |
| prev = peaks[-1] |
| idx = prev // sequence |
| if prev + int(periods[idx] * max_change) >= N: |
| break |
| |
| peaks.append(prev + int(periods[idx] * min_change) + |
| np.argmax(signal[prev + int(periods[idx] * min_change): prev + int(periods[idx] * max_change)])) |
| return np.array(peaks) |
|
|
|
|
| def compute_periods_per_sequence(signal, sequence, min_period, max_period): |
| offset = 0 |
| periods = [] |
| N = len(signal) |
|
|
| while offset < N: |
| fourier = fft(signal[offset: offset + sequence]) |
| fourier[0] = 0 |
| autoc = ifft(fourier * np.conj(fourier)).real |
| autoc_peak = min_period + np.argmax(autoc[min_period: max_period]) |
| periods.append(autoc_peak) |
| offset += sequence |
| return periods |
|
|
|
|
| def psola(signal, peaks, f_ratio): |
| N = len(signal) |
| |
| new_signal = np.zeros(N) |
| |
| new_peaks_ref = np.linspace(0, len(peaks) - 1, int(len(peaks) * f_ratio)) |
| new_peaks = np.zeros(len(new_peaks_ref)).astype(int) |
|
|
| for i in range(len(new_peaks)): |
| weight = new_peaks_ref[i] % 1 |
| left = np.floor(new_peaks_ref[i]).astype(int) |
| right = np.ceil(new_peaks_ref[i]).astype(int) |
| new_peaks[i] = int(peaks[left] * (1 - weight) + peaks[right] * weight) |
|
|
| |
| for j in range(len(new_peaks)): |
| |
| i = np.argmin(np.abs(peaks - new_peaks[j])) |
| |
| P1 = [new_peaks[j] if j == 0 else new_peaks[j] - new_peaks[j-1], |
| N - 1 - new_peaks[j] if j == len(new_peaks) - 1 else new_peaks[j+1] - new_peaks[j]] |
| |
| if peaks[i] - P1[0] < 0: |
| P1[0] = peaks[i] |
| if peaks[i] + P1[1] > N - 1: |
| P1[1] = N - 1 - peaks[i] |
| |
| window = list(np.linspace(0, 1, P1[0] + 1)[1:]) + list(np.linspace(1, 0, P1[1] + 1)[1:]) |
| |
| new_signal[new_peaks[j] - P1[0]: new_peaks[j] + P1[1]] += window * signal[peaks[i] - P1[0]: peaks[i] + P1[1]] |
| return new_signal |
|
|
|
|
| |
| |
| |
|
|
|
|
| |
| def low_cut_filter(x, fs, cutoff=70): |
| nyquist = fs // 2 |
| norm_cutoff = cutoff / nyquist |
|
|
| |
| fil = firwin(255, norm_cutoff, pass_zero=False) |
| lcf_x = lfilter(fil, 1, x) |
|
|
| return lcf_x |
|
|
| |
| def high_frequency_completion(x, transformed,f0rate,par): |
| x = np.array(x, dtype=np.float64) |
| |
| f0, time_axis = pyworld.harvest(x, par['fs'], f0_floor=par['minf0'], |
| f0_ceil=par['maxf0'], frame_period=par['shiftms']) |
| spc = pyworld.cheaptrick(x, f0, time_axis, par['fs'], |
| fft_size=par['fftl']) |
| ap = pyworld.d4c(x, f0, time_axis, par['fs'], fft_size=par['fftl']) |
| |
| |
| uf0 = np.zeros(len(f0)) |
| unvoice_anasyn = pyworld.synthesize(uf0, spc, ap, |
| par['fs'], frame_period=par['shiftms']) |
| |
| |
| fil = firwin(255, f0rate, pass_zero=False) |
| HPFed_unvoice_anasyn = filtfilt(fil, 1, unvoice_anasyn) |
|
|
| if len(HPFed_unvoice_anasyn) > len(transformed): |
| return transformed + HPFed_unvoice_anasyn[:len(transformed)] |
| else: |
| transformed[:len(HPFed_unvoice_anasyn)] += HPFed_unvoice_anasyn |
| return transformed |
|
|
| def transform_f0(x,f0rate,config): |
| if f0rate < 1.0: |
| completion = True |
| else: |
| completion = False |
| |
| fs = config["fs"] |
| x = low_cut_filter(x, fs, cutoff=70) |
| |
| |
| wsola = WSOLA(config["fs"], 1 / f0rate, shiftms=10) |
| wsolaed = wsola.duration_modification(x) |
| |
| |
| xlen = len(x) |
| transformed = resample(wsolaed, xlen) |
| |
| |
| if completion: |
| transformed = high_frequency_completion(x, transformed, f0rate,config) |
| |
| return transformed |
|
|
| with gr.Blocks() as interface: |
| with gr.Row(): |
| wav_path = gr.Audio(source='microphone',type='filepath') |
| |
| with gr.Column(): |
| minf0 = gr.Slider(50, 300, 70, step=10, label="minf0") |
| turn_tune = gr.Slider(0.2, 3, 1.5, step=0.1, label="turn_tune") |
| with gr.Column(): |
| maxf0 = gr.Slider(500, 1100, 700, step=10, label="maxf0") |
| shiftms = gr.Slider(1, 50, 10, step=1, label="shiftms") |
| with gr.Column(): |
| fr = gr.Slider(0.1, 15, 1, step=0.1, label="fr") |
|
|
| with gr.Row(): |
| audio_output = gr.Audio(type='filepath') |
| |
| section_btn1 = gr.Button("change") |
| |
| |
| def change(wav_path,turn_tune,minf0,maxf0,shiftms,fr): |
| fs, x = wavfile.read(wav_path) |
| x = np.array(x, dtype=np.float64) |
| outfile = str(wav_path).split('.')[0] + '-output.wav' |
| |
| config = {} |
| config["fs"] = fs |
| config["minf0"] = minf0 |
| config["maxf0"] = maxf0 |
| config["shiftms"] = shiftms |
| config["fftl"] =1024 |
|
|
| wav_slow = transform_f0(x,turn_tune,config) |
| wavfile.write(outfile, fs, wav_slow.astype(np.int16)) |
| fr = float(fr) |
| print('fr->',fr) |
| if fr != 1: |
| orig_signal, fs = librosa.load(outfile, sr=None) |
| N = len(orig_signal) |
| f_ratio = fr ** (-2 / 12) |
| new_signal = shift_pitch(orig_signal, fs, f_ratio) |
| sf.write(outfile,new_signal,fs) |
|
|
| return outfile |
| |
| section_btn1.click(change, inputs=[wav_path,turn_tune,minf0,maxf0,shiftms,fr], outputs=[audio_output]) |
|
|
| interface.launch(show_api=False) |