|
|
import os |
|
|
import os.path as osp |
|
|
import soundfile as sf |
|
|
import argparse |
|
|
from tqdm import tqdm |
|
|
import re |
|
|
|
|
|
SAMPLE_RATE = 16000 |
|
|
|
|
|
def time_str_to_frame(time_str, sample_rate=16000): |
|
|
h, m, s = map(float, time_str.replace(',', '.').split(':')) |
|
|
return int((h * 3600 + m * 60 + s) * sample_rate), time_str.split(',')[-1] |
|
|
|
|
|
def get_segments_by_srt(srt_file, sample_rate=16000): |
|
|
segments = [] |
|
|
ms_set = set() |
|
|
with open(srt_file, 'r') as f: |
|
|
lines = f.readlines() |
|
|
for i in range(0, len(lines), 4): |
|
|
items = list(map(lambda l: l.strip(), lines[i:i+4])) |
|
|
if len(items) < 4: |
|
|
continue |
|
|
_, timestamp, trans, _ = items |
|
|
time_reprs = re.findall(r'\d{2}:\d{2}:\d{2},\d{3}', timestamp) |
|
|
start_time = time_reprs[0] |
|
|
end_time = time_reprs[1] |
|
|
start_frame, ms = time_str_to_frame(start_time, sample_rate) |
|
|
end_frame, ms = time_str_to_frame(end_time, sample_rate) |
|
|
ms_set.add(ms) |
|
|
segments.append((start_frame, end_frame, trans)) |
|
|
|
|
|
return segments, ms_set |
|
|
|
|
|
def segment_audio(audio_dir, srt_dir, output_dir): |
|
|
|
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
|
|
|
|
|
audio_files = [f for f in os.listdir(audio_dir) if f.endswith('.mp3') or f.endswith('.wav') or f.endswith('.flac')] |
|
|
|
|
|
unsuccesful_logfw = open(osp.join(audio_dir, '../unsuccesful.log'), 'w') |
|
|
|
|
|
for audio_file in tqdm(audio_files, total=len(audio_files)): |
|
|
tqdm.write(f"Processing {audio_file}") |
|
|
|
|
|
audio_postfix = '.' + audio_file.split('.')[-1] |
|
|
input_file = osp.join(audio_dir, audio_file) |
|
|
srt_file = osp.join(srt_dir, audio_file.replace(audio_postfix, '.srt')) |
|
|
output_subdir = osp.join(output_dir, audio_file.replace(audio_postfix, '')) |
|
|
segments, ms_set = get_segments_by_srt(srt_file, sample_rate=SAMPLE_RATE) |
|
|
if len(ms_set) < 5: |
|
|
tqdm.write(f"ms_set: {ms_set} too small, skip {audio_file}") |
|
|
print(f"ms_set: {ms_set} too small, skip {audio_file}", file=unsuccesful_logfw, flush=True) |
|
|
continue |
|
|
if not osp.exists(output_subdir): |
|
|
os.makedirs(output_subdir, exist_ok=True) |
|
|
|
|
|
audio, sr = sf.read(input_file) |
|
|
length = len(audio) |
|
|
assert sr == SAMPLE_RATE, f"Sample rate mismatch: {sr} != {SAMPLE_RATE}" |
|
|
|
|
|
for i, (start_frame, end_frame, trans) in enumerate(segments): |
|
|
if end_frame > length: |
|
|
tqdm.write(f"end_frame {end_frame} > length {length}") |
|
|
continue |
|
|
segment_audio = audio[start_frame:end_frame] |
|
|
output_file = osp.join(output_subdir, f"{i:05d}.flac") |
|
|
sf.write(output_file, segment_audio, SAMPLE_RATE) |
|
|
with open(osp.join(output_subdir, f"{i:05d}.txt"), 'w') as fw: |
|
|
print(trans, file=fw, flush=True) |
|
|
|
|
|
def main(args): |
|
|
print(args) |
|
|
audio_dir = args.audio_dir |
|
|
srt_dir = args.srt_dir |
|
|
output_dir = args.output_dir |
|
|
segment_audio(audio_dir, srt_dir, output_dir) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
parser = argparse.ArgumentParser() |
|
|
parser.add_argument( |
|
|
"--audio_dir", |
|
|
default="", |
|
|
help="directory containing audio files", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--srt_dir", |
|
|
default="", |
|
|
help="directory containing srt files", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--output_dir", |
|
|
default="", |
|
|
help="directory to save the segmented audio files", |
|
|
) |
|
|
args = parser.parse_args() |
|
|
|
|
|
main(args) |