| | import os, traceback, sys, parselmouth
|
| | import librosa
|
| | import pyworld
|
| | from scipy.io import wavfile
|
| | import numpy as np, logging
|
| |
|
| | logging.getLogger("numba").setLevel(logging.WARNING)
|
| | from multiprocessing import Process
|
| |
|
| | exp_dir = sys.argv[1]
|
| | f = open("%s/extract_f0_feature.log" % exp_dir, "a+")
|
| |
|
| |
|
| | def printt(strr):
|
| | print(strr)
|
| | f.write("%s\n" % strr)
|
| | f.flush()
|
| |
|
| |
|
| | n_p = int(sys.argv[2])
|
| | f0method = sys.argv[3]
|
| |
|
| |
|
| | class FeatureInput(object):
|
| | def __init__(self, samplerate=16000, hop_size=160):
|
| | self.fs = samplerate
|
| | self.hop = hop_size
|
| |
|
| | self.f0_bin = 256
|
| | self.f0_max = 1100.0
|
| | self.f0_min = 50.0
|
| | self.f0_mel_min = 1127 * np.log(1 + self.f0_min / 700)
|
| | self.f0_mel_max = 1127 * np.log(1 + self.f0_max / 700)
|
| |
|
| | def compute_f0(self, path, f0_method):
|
| |
|
| |
|
| | x, sr = librosa.load(path, self.fs)
|
| | p_len = x.shape[0] // self.hop
|
| | assert sr == self.fs
|
| | if f0_method == "pm":
|
| | time_step = 160 / 16000 * 1000
|
| | f0_min = 50
|
| | f0_max = 1100
|
| | f0 = (
|
| | parselmouth.Sound(x, sr)
|
| | .to_pitch_ac(
|
| | time_step=time_step / 1000,
|
| | voicing_threshold=0.6,
|
| | pitch_floor=f0_min,
|
| | pitch_ceiling=f0_max,
|
| | )
|
| | .selected_array["frequency"]
|
| | )
|
| | pad_size = (p_len - len(f0) + 1) // 2
|
| | if pad_size > 0 or p_len - len(f0) - pad_size > 0:
|
| | f0 = np.pad(
|
| | f0, [[pad_size, p_len - len(f0) - pad_size]], mode="constant"
|
| | )
|
| | elif f0_method == "harvest":
|
| | f0, t = pyworld.harvest(
|
| | x.astype(np.double),
|
| | fs=sr,
|
| | f0_ceil=self.f0_max,
|
| | f0_floor=self.f0_min,
|
| | frame_period=1000 * self.hop / sr,
|
| | )
|
| | f0 = pyworld.stonemask(x.astype(np.double), f0, t, self.fs)
|
| | elif f0_method == "dio":
|
| | f0, t = pyworld.dio(
|
| | x.astype(np.double),
|
| | fs=sr,
|
| | f0_ceil=self.f0_max,
|
| | f0_floor=self.f0_min,
|
| | frame_period=1000 * self.hop / sr,
|
| | )
|
| | f0 = pyworld.stonemask(x.astype(np.double), f0, t, self.fs)
|
| | return f0
|
| |
|
| | def coarse_f0(self, f0):
|
| | f0_mel = 1127 * np.log(1 + f0 / 700)
|
| | f0_mel[f0_mel > 0] = (f0_mel[f0_mel > 0] - self.f0_mel_min) * (
|
| | self.f0_bin - 2
|
| | ) / (self.f0_mel_max - self.f0_mel_min) + 1
|
| |
|
| |
|
| | f0_mel[f0_mel <= 1] = 1
|
| | f0_mel[f0_mel > self.f0_bin - 1] = self.f0_bin - 1
|
| | f0_coarse = np.rint(f0_mel).astype(np.int)
|
| | assert f0_coarse.max() <= 255 and f0_coarse.min() >= 1, (
|
| | f0_coarse.max(),
|
| | f0_coarse.min(),
|
| | )
|
| | return f0_coarse
|
| |
|
| | def go(self, paths, f0_method):
|
| | if len(paths) == 0:
|
| | printt("no-f0-todo")
|
| | else:
|
| | printt("todo-f0-%s" % len(paths))
|
| | n = max(len(paths) // 5, 1)
|
| | for idx, (inp_path, opt_path1, opt_path2) in enumerate(paths):
|
| | try:
|
| | if idx % n == 0:
|
| | printt("f0ing,now-%s,all-%s,-%s" % (idx, len(paths), inp_path))
|
| | if (
|
| | os.path.exists(opt_path1 + ".npy") == True
|
| | and os.path.exists(opt_path2 + ".npy") == True
|
| | ):
|
| | continue
|
| | featur_pit = self.compute_f0(inp_path, f0_method)
|
| | np.save(
|
| | opt_path2,
|
| | featur_pit,
|
| | allow_pickle=False,
|
| | )
|
| | coarse_pit = self.coarse_f0(featur_pit)
|
| | np.save(
|
| | opt_path1,
|
| | coarse_pit,
|
| | allow_pickle=False,
|
| | )
|
| | except:
|
| | printt("f0fail-%s-%s-%s" % (idx, inp_path, traceback.format_exc()))
|
| |
|
| |
|
| | if __name__ == "__main__":
|
| |
|
| |
|
| |
|
| | printt(sys.argv)
|
| | featureInput = FeatureInput()
|
| | paths = []
|
| | inp_root = "%s/1_16k_wavs" % (exp_dir)
|
| | opt_root1 = "%s/2a_f0" % (exp_dir)
|
| | opt_root2 = "%s/2b-f0nsf" % (exp_dir)
|
| |
|
| | os.makedirs(opt_root1, exist_ok=True)
|
| | os.makedirs(opt_root2, exist_ok=True)
|
| | for name in sorted(list(os.listdir(inp_root))):
|
| | inp_path = "%s/%s" % (inp_root, name)
|
| | if "spec" in inp_path:
|
| | continue
|
| | opt_path1 = "%s/%s" % (opt_root1, name)
|
| | opt_path2 = "%s/%s" % (opt_root2, name)
|
| | paths.append([inp_path, opt_path1, opt_path2])
|
| |
|
| | ps = []
|
| | for i in range(n_p):
|
| | p = Process(
|
| | target=featureInput.go,
|
| | args=(
|
| | paths[i::n_p],
|
| | f0method,
|
| | ),
|
| | )
|
| | p.start()
|
| | ps.append(p)
|
| | for p in ps:
|
| | p.join()
|
| |
|