import pathlib import re from typing import Dict, List import click import librosa import natsort import numpy import soundfile import textgrid import tqdm def remove_suffix(string, suffix_pattern): match = re.search(f'{suffix_pattern}$', string) if not match: return string return string[:-len(match.group())] @click.command(help='Combine segmented 2-tier TextGrids and wavs into 3-tier TextGrids and long wavs') @click.option( '--wavs', required=True, help='Directory containing the segmented wav files' ) @click.option( '--tg', required=False, help='Directory containing the segmented TextGrid files (defaults to wav directory)' ) @click.option( '--out', required=True, help='Path to output directory for combined files' ) @click.option( '--suffix', required=False, default=r'_\d+', help='Filename suffix pattern for file combination' ) @click.option( '--wav_subtype', required=False, default='PCM_16', help='Wav subtype (defaults to PCM_16)' ) @click.option( '--overwrite', is_flag=True, help='Overwrite existing files' ) def combine_tg(wavs, tg, out, suffix, wav_subtype, overwrite): wav_path_in = pathlib.Path(wavs) tg_path_in = wav_path_in if tg is None else pathlib.Path(tg) del tg combined_path_out = pathlib.Path(out) combined_path_out.mkdir(parents=True, exist_ok=True) filelist: Dict[str, List[pathlib.Path]] = {} for tg_file in tg_path_in.glob('*.TextGrid'): stem = remove_suffix(tg_file.stem, suffix) if stem not in filelist: filelist[stem] = [tg_file] else: filelist[stem].append(tg_file) for name, files in tqdm.tqdm(sorted(filelist.items(), key=lambda kv: kv[0])): wav_segments = [] tg = textgrid.TextGrid() sentences_tier = textgrid.IntervalTier(name='sentences') words_tier = textgrid.IntervalTier(name='words') phones_tier = textgrid.IntervalTier(name='phones') sentence_start = 0. sr = None for tg_file in natsort.natsorted(files): wav_file = (wav_path_in / tg_file.name).with_suffix('.wav') waveform, sr_ = librosa.load(wav_file, sr=None) if sr is None: sr = sr_ else: assert sr_ == sr, f'Cannot combine \'{tg_file.stem}\': incompatible samplerate ({sr_} != {sr})' sentence_end = waveform.shape[0] / sr + sentence_start wav_segments.append(waveform) sentences_tier.add(minTime=sentence_start, maxTime=sentence_end, mark=wav_file.stem) sentence_tg = textgrid.TextGrid() sentence_tg.read(tg_file) start = sentence_start for j, word in enumerate(sentence_tg[0]): if j == len(sentence_tg[0]) - 1: end = sentence_end else: end = start + word.duration() words_tier.add(minTime=start, maxTime=end, mark=word.mark) start = end start = sentence_start for j, phone in enumerate(sentence_tg[1]): if j == len(sentence_tg[1]) - 1: end = sentence_end else: end = start + phone.duration() phones_tier.add(minTime=start, maxTime=end, mark=phone.mark) start = end sentence_start = sentence_end tg.append(sentences_tier) tg.append(words_tier) tg.append(phones_tier) tg_file_out = combined_path_out / f'{name}.TextGrid' wav_file_out = tg_file_out.with_suffix('.wav') if wav_file_out.exists() and not overwrite: raise FileExistsError(str(wav_file_out)) if tg_file_out.exists() and not overwrite: raise FileExistsError(str(tg_file_out)) tg.write(tg_file_out) full_wav = numpy.concatenate(wav_segments) soundfile.write(wav_file_out, full_wav, samplerate=sr, subtype=wav_subtype) if __name__ == '__main__': combine_tg()