|
|
import csv
|
|
|
import json
|
|
|
import pathlib
|
|
|
from decimal import Decimal
|
|
|
from math import isclose
|
|
|
|
|
|
import click
|
|
|
import librosa
|
|
|
import numpy as np
|
|
|
from tqdm import tqdm
|
|
|
|
|
|
from get_pitch import get_pitch
|
|
|
|
|
|
|
|
|
def try_resolve_note_slur_by_matching(ph_dur, ph_num, note_dur, tol):
|
|
|
if len(ph_num) > len(note_dur):
|
|
|
raise ValueError("ph_num should not be longer than note_dur.")
|
|
|
ph_num_cum = np.cumsum([0] + ph_num)
|
|
|
word_pos = np.cumsum([sum(ph_dur[l:r]) for l, r in zip(ph_num_cum[:-1], ph_num_cum[1:])])
|
|
|
note_pos = np.cumsum(note_dur)
|
|
|
new_note_dur = []
|
|
|
|
|
|
note_slur = []
|
|
|
idx_word, idx_note = 0, 0
|
|
|
slur = False
|
|
|
while idx_word < len(word_pos) and idx_note < len(note_pos):
|
|
|
if isclose(word_pos[idx_word], note_pos[idx_note], abs_tol=tol):
|
|
|
note_slur.append(1 if slur else 0)
|
|
|
new_note_dur.append(word_pos[idx_word])
|
|
|
idx_word += 1
|
|
|
idx_note += 1
|
|
|
slur = False
|
|
|
elif note_pos[idx_note] > word_pos[idx_word]:
|
|
|
raise ValueError("Cannot resolve note_slur by matching.")
|
|
|
elif note_pos[idx_note] <= word_pos[idx_word]:
|
|
|
note_slur.append(1 if slur else 0)
|
|
|
new_note_dur.append(note_pos[idx_note])
|
|
|
idx_note += 1
|
|
|
slur = True
|
|
|
ret_note_dur = np.diff(new_note_dur, prepend=Decimal("0.0")).tolist()
|
|
|
assert len(ret_note_dur) == len(note_slur)
|
|
|
return ret_note_dur, note_slur
|
|
|
|
|
|
|
|
|
def try_resolve_slur_by_slicing(ph_dur, ph_num, note_seq, note_dur, tol):
|
|
|
ph_num_cum = np.cumsum([0] + ph_num)
|
|
|
word_pos = np.cumsum([sum(ph_dur[l:r]) for l, r in zip(ph_num_cum[:-1], ph_num_cum[1:])])
|
|
|
note_pos = np.cumsum(note_dur)
|
|
|
new_note_seq = []
|
|
|
new_note_dur = []
|
|
|
|
|
|
note_slur = []
|
|
|
idx_word, idx_note = 0, 0
|
|
|
while idx_word < len(word_pos):
|
|
|
slur = False
|
|
|
if note_pos[idx_note] > word_pos[idx_word] and not isclose(
|
|
|
note_pos[idx_note], word_pos[idx_word], abs_tol=tol
|
|
|
):
|
|
|
new_note_seq.append(note_seq[idx_note])
|
|
|
new_note_dur.append(word_pos[idx_word])
|
|
|
note_slur.append(1 if slur else 0)
|
|
|
else:
|
|
|
while idx_note < len(note_pos) and (
|
|
|
note_pos[idx_note] < word_pos[idx_word]
|
|
|
or isclose(note_pos[idx_note], word_pos[idx_word], abs_tol=tol)
|
|
|
):
|
|
|
new_note_seq.append(note_seq[idx_note])
|
|
|
new_note_dur.append(note_pos[idx_note])
|
|
|
note_slur.append(1 if slur else 0)
|
|
|
slur = True
|
|
|
idx_note += 1
|
|
|
if new_note_dur[-1] < word_pos[idx_word]:
|
|
|
if isclose(new_note_dur[-1], word_pos[idx_word], abs_tol=tol):
|
|
|
new_note_dur[-1] = word_pos[idx_word]
|
|
|
else:
|
|
|
new_note_seq.append(note_seq[idx_note])
|
|
|
new_note_dur.append(word_pos[idx_word])
|
|
|
note_slur.append(1 if slur else 0)
|
|
|
idx_word += 1
|
|
|
ret_note_dur = np.diff(new_note_dur, prepend=Decimal("0.0")).tolist()
|
|
|
assert len(new_note_seq) == len(ret_note_dur) == len(note_slur)
|
|
|
return new_note_seq, ret_note_dur, note_slur
|
|
|
|
|
|
|
|
|
@click.group()
|
|
|
def cli():
|
|
|
pass
|
|
|
|
|
|
|
|
|
@click.command(help="Convert a transcription file to DS files")
|
|
|
@click.argument(
|
|
|
"transcription_file",
|
|
|
type=click.Path(
|
|
|
dir_okay=False,
|
|
|
resolve_path=True,
|
|
|
path_type=pathlib.Path,
|
|
|
exists=True,
|
|
|
readable=True,
|
|
|
),
|
|
|
metavar="TRANSCRIPTIONS",
|
|
|
)
|
|
|
@click.argument(
|
|
|
"wavs_folder",
|
|
|
type=click.Path(file_okay=False, resolve_path=True, path_type=pathlib.Path),
|
|
|
metavar="FOLDER",
|
|
|
)
|
|
|
@click.option(
|
|
|
"--tolerance",
|
|
|
"-t",
|
|
|
type=float,
|
|
|
default=0.005,
|
|
|
help="Tolerance for ph_dur/note_dur mismatch",
|
|
|
metavar="FLOAT",
|
|
|
)
|
|
|
@click.option(
|
|
|
"--hop_size", "-h", type=int, default=512, help="Hop size for f0_seq", metavar="INT"
|
|
|
)
|
|
|
@click.option(
|
|
|
"--sample_rate",
|
|
|
"-s",
|
|
|
type=int,
|
|
|
default=44100,
|
|
|
help="Sample rate of audio",
|
|
|
metavar="INT",
|
|
|
)
|
|
|
@click.option(
|
|
|
"--pe",
|
|
|
type=str,
|
|
|
default="parselmouth",
|
|
|
help="Pitch extractor (parselmouth, rmvpe)",
|
|
|
metavar="ALGORITHM",
|
|
|
)
|
|
|
def csv2ds(transcription_file, wavs_folder, tolerance, hop_size, sample_rate, pe):
|
|
|
"""Convert a transcription file to DS file"""
|
|
|
assert wavs_folder.is_dir(), "wavs folder not found."
|
|
|
out_ds = {}
|
|
|
out_exists = []
|
|
|
with open(transcription_file, "r", encoding="utf-8") as f:
|
|
|
for trans_line in tqdm(csv.DictReader(f)):
|
|
|
item_name = trans_line["name"]
|
|
|
wav_fn = wavs_folder / f"{item_name}.wav"
|
|
|
ds_fn = wavs_folder / f"{item_name}.ds"
|
|
|
ph_dur = list(map(Decimal, trans_line["ph_dur"].strip().split()))
|
|
|
ph_num = list(map(int, trans_line["ph_num"].strip().split()))
|
|
|
note_seq = trans_line["note_seq"].strip().split()
|
|
|
note_dur = list(map(Decimal, trans_line["note_dur"].strip().split()))
|
|
|
note_glide = trans_line["note_glide"].strip().split() if "note_glide" in trans_line else None
|
|
|
|
|
|
assert wav_fn.is_file(), f"{item_name}.wav not found."
|
|
|
assert len(ph_dur) == sum(ph_num), "ph_dur and ph_num mismatch."
|
|
|
assert len(note_seq) == len(note_dur), "note_seq and note_dur should have the same length."
|
|
|
if note_glide:
|
|
|
assert len(note_glide) == len(note_seq), "note_glide and note_seq should have the same length."
|
|
|
assert isclose(
|
|
|
sum(ph_dur), sum(note_dur), abs_tol=tolerance
|
|
|
), f"[{item_name}] ERROR: mismatch total duration: {sum(ph_dur) - sum(note_dur)}"
|
|
|
|
|
|
|
|
|
if "note_slur" in trans_line and trans_line["note_slur"]:
|
|
|
note_slur = list(map(int, trans_line["note_slur"].strip().split()))
|
|
|
else:
|
|
|
try:
|
|
|
note_dur, note_slur = try_resolve_note_slur_by_matching(
|
|
|
ph_dur, ph_num, note_dur, tolerance
|
|
|
)
|
|
|
except ValueError:
|
|
|
|
|
|
note_seq, note_dur, note_slur = try_resolve_slur_by_slicing(
|
|
|
ph_dur, ph_num, note_seq, note_dur, tolerance
|
|
|
)
|
|
|
|
|
|
wav, _ = librosa.load(wav_fn, sr=sample_rate, mono=True)
|
|
|
|
|
|
|
|
|
f0_timestep, f0, _ = get_pitch(pe, wav, hop_size, sample_rate)
|
|
|
ds_content = [
|
|
|
{
|
|
|
"offset": 0.0,
|
|
|
"text": trans_line["ph_seq"],
|
|
|
"ph_seq": trans_line["ph_seq"],
|
|
|
"ph_dur": " ".join(str(round(d, 6)) for d in ph_dur),
|
|
|
"ph_num": trans_line["ph_num"],
|
|
|
"note_seq": " ".join(note_seq),
|
|
|
"note_dur": " ".join(str(round(d, 6)) for d in note_dur),
|
|
|
"note_slur": " ".join(map(str, note_slur)),
|
|
|
"f0_seq": " ".join(map("{:.1f}".format, f0)),
|
|
|
"f0_timestep": str(f0_timestep),
|
|
|
}
|
|
|
]
|
|
|
if note_glide:
|
|
|
ds_content[0]["note_glide"] = " ".join(note_glide)
|
|
|
out_ds[ds_fn] = ds_content
|
|
|
if ds_fn.exists():
|
|
|
out_exists.append(ds_fn)
|
|
|
if not out_exists or click.confirm(f"Overwrite {len(out_exists)} existing DS files?", abort=False):
|
|
|
for ds_fn, ds_content in out_ds.items():
|
|
|
with open(ds_fn, "w", encoding="utf-8") as f:
|
|
|
json.dump(ds_content, f, ensure_ascii=False, indent=4)
|
|
|
else:
|
|
|
click.echo("Aborted.")
|
|
|
|
|
|
|
|
|
@click.command(help="Convert DS files to a transcription and curve files")
|
|
|
@click.argument(
|
|
|
"ds_folder",
|
|
|
type=click.Path(file_okay=False, resolve_path=True, exists=True, path_type=pathlib.Path),
|
|
|
metavar="FOLDER",
|
|
|
)
|
|
|
@click.argument(
|
|
|
"transcription_file",
|
|
|
type=click.Path(file_okay=True, dir_okay=False, resolve_path=True, path_type=pathlib.Path),
|
|
|
metavar="TRANSCRIPTIONS",
|
|
|
)
|
|
|
@click.option(
|
|
|
"--overwrite",
|
|
|
"-f",
|
|
|
is_flag=True,
|
|
|
default=False,
|
|
|
help="Overwrite existing transcription file",
|
|
|
)
|
|
|
def ds2csv(ds_folder, transcription_file, overwrite):
|
|
|
"""Convert DS files to a transcription file"""
|
|
|
if not overwrite and transcription_file.exists():
|
|
|
raise FileExistsError(f"{transcription_file} already exist.")
|
|
|
|
|
|
transcriptions = []
|
|
|
any_with_glide = False
|
|
|
|
|
|
for fp in tqdm(ds_folder.glob("*.ds"), ncols=80):
|
|
|
if fp.with_suffix(".wav").exists():
|
|
|
with open(fp, "r", encoding="utf-8") as f:
|
|
|
ds = json.load(f)
|
|
|
transcriptions.append(
|
|
|
{
|
|
|
"name": fp.stem,
|
|
|
"ph_seq": ds[0]["ph_seq"],
|
|
|
"ph_dur": " ".join(str(round(Decimal(d), 6)) for d in ds[0]["ph_dur"].split()),
|
|
|
"ph_num": ds[0]["ph_num"],
|
|
|
"note_seq": ds[0]["note_seq"],
|
|
|
"note_dur": " ".join(str(round(Decimal(d), 6)) for d in ds[0]["note_dur"].split()),
|
|
|
|
|
|
}
|
|
|
)
|
|
|
if "note_glide" in ds[0]:
|
|
|
any_with_glide = True
|
|
|
transcriptions[-1]["note_glide"] = ds[0]["note_glide"]
|
|
|
|
|
|
for fp in tqdm(ds_folder.glob("*.ds"), ncols=80):
|
|
|
if not fp.with_suffix(".wav").exists():
|
|
|
with open(fp, "r", encoding="utf-8") as f:
|
|
|
ds = json.load(f)
|
|
|
for idx, sub_ds in enumerate(ds):
|
|
|
item_name = f"{fp.stem}#{idx}" if len(ds) > 1 else fp.stem
|
|
|
transcriptions.append(
|
|
|
{
|
|
|
"name": item_name,
|
|
|
"ph_seq": sub_ds["ph_seq"],
|
|
|
"ph_dur": " ".join(str(round(Decimal(d), 6)) for d in sub_ds["ph_dur"].split()),
|
|
|
"ph_num": sub_ds["ph_num"],
|
|
|
"note_seq": sub_ds["note_seq"],
|
|
|
"note_dur": " ".join(str(round(Decimal(d), 6)) for d in sub_ds["note_dur"].split()),
|
|
|
|
|
|
}
|
|
|
)
|
|
|
if "note_glide" in sub_ds:
|
|
|
any_with_glide = True
|
|
|
transcriptions[-1]["note_glide"] = sub_ds["note_glide"]
|
|
|
if any_with_glide:
|
|
|
for row in transcriptions:
|
|
|
if "note_glide" not in row:
|
|
|
row["note_glide"] = " ".join(["none"] * len(row["note_seq"].split()))
|
|
|
with open(transcription_file, "w", newline="", encoding="utf-8") as f:
|
|
|
writer = csv.DictWriter(
|
|
|
f,
|
|
|
fieldnames=[
|
|
|
"name",
|
|
|
"ph_seq",
|
|
|
"ph_dur",
|
|
|
"ph_num",
|
|
|
"note_seq",
|
|
|
"note_dur",
|
|
|
|
|
|
] + (["note_glide"] if any_with_glide else []),
|
|
|
)
|
|
|
writer.writeheader()
|
|
|
writer.writerows(transcriptions)
|
|
|
|
|
|
|
|
|
cli.add_command(csv2ds)
|
|
|
cli.add_command(ds2csv)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
cli()
|
|
|
|