|
|
from pathlib import Path |
|
|
import json |
|
|
|
|
|
from transformers.models.mamba2.modeling_mamba2 import segment_sum |
|
|
|
|
|
from lib.utils import cmd |
|
|
from environment import TEST_DATA |
|
|
|
|
|
|
|
|
def read_recording(folder: Path=Path("./recordings"), count_limit=None): |
|
|
pass |
|
|
|
|
|
def read_dataset(file: Path=Path("dataset/dataset.txt"), count_limit=None): |
|
|
"""line sample: {"audio": {"path": "dataset/audio/data_aishell/wav/test/S0916/BAC009S0916W0158.wav"}, "sentence": "顾客体验的核心是真善美", "duration": 3.22, "sentences": [{"start": 0, "end": 3.22, "text": "顾客体验的核心是真善美"}]}""" |
|
|
with open(file) as f: |
|
|
lines =f.readlines() |
|
|
count = 0 |
|
|
for line in lines: |
|
|
if count_limit and count > count_limit: |
|
|
break |
|
|
count += 1 |
|
|
line = line.strip() |
|
|
if not line: |
|
|
continue |
|
|
data = json.loads(line) |
|
|
|
|
|
yield data["audio"]["path"], data["sentence"], data["duration"] |
|
|
|
|
|
def read_emilia(folder: Path=TEST_DATA/"ZH-B000000", count_limit=None): |
|
|
"""读取 emilia 数据集,返回音频路径、文本、时长, |
|
|
json 文件样例: |
|
|
{"id": "ZH_B00000_S00110_W000000", "wav": "ZH_B00000/ZH_B00000_S00110/mp3/ZH_B00000_S00110_W000000.mp3", "text": "\u628a\u63e1\u6700\u524d\u6cbf\u7684\u91d1\u878d\u9886\u57df\u548c\u533a\u5757\u94fe\u6700\u65b0\u8d44\u8baf\u3002\u6211\u4eec\u4e00\u8d77\u6765\u4e86\u89e3\u4e00\u4e0b\u4eca\u5929\u5e02\u573a\u4e0a\u6709\u53d1\u751f\u54ea\u4e9b\u91cd\u8981\u4e8b\u4ef6\u3002", "duration": 7.963, "speaker": "ZH_B00000_S00110", "language": "zh", "dnsmos": 3.3808}""" |
|
|
count = 0 |
|
|
for json_file in sorted(folder.glob("*.json")): |
|
|
count += 1 |
|
|
if count_limit and count > count_limit: |
|
|
break |
|
|
with open(json_file, encoding="utf-8") as f: |
|
|
data = json.load(f) |
|
|
text = data["text"] |
|
|
duration = data["duration"] |
|
|
wav_path = folder /f'{json_file.stem}.wav' |
|
|
if not wav_path.exists(): |
|
|
mp3_path = folder / f'{json_file.stem}.mp3' |
|
|
command=f"ffmpeg -i {mp3_path} -ac 1 -ar 16000 {wav_path}" |
|
|
cmd(command) |
|
|
yield wav_path, text, duration |
|
|
|
|
|
def read_st(folder: Path=TEST_DATA/"ST-CMDS-20170001_1-OS", count_limit=None): |
|
|
"""读取 st 数据集,返回音频路径、文本、时长, |
|
|
""" |
|
|
count = 0 |
|
|
for wav in sorted(folder.glob("*.wav")): |
|
|
count += 1 |
|
|
if count_limit and count > count_limit: |
|
|
break |
|
|
txt = wav.with_suffix(".txt") |
|
|
with open(txt, encoding="utf-8") as f: |
|
|
text = f.read() |
|
|
|
|
|
yield wav, text |
|
|
|
|
|
def read_wenet(folder: Path=TEST_DATA/"wenet", json_file="WenetSpeech_TEST_NET.json", count_limit=None): |
|
|
"""读取 wenet 数据集,返回音频路径、文本、时长, |
|
|
""" |
|
|
count = 0 |
|
|
with open(folder/json_file, encoding="utf-8") as f: |
|
|
data = json.load(f) |
|
|
audios = data["audios"] |
|
|
for a in audios: |
|
|
audio_file = Path(folder/a['path']) |
|
|
if len(a["segments"])>=100: |
|
|
continue |
|
|
for seg in a["segments"]: |
|
|
if count > count_limit: |
|
|
break |
|
|
seg_file = audio_file.parent / (seg["sid"]+".wav") |
|
|
if not seg_file.exists(): |
|
|
command = f"ffmpeg -i {audio_file} -ar 16000 -ac 1 -ss {seg['begin_time']} -to {seg['end_time']} {seg_file}" |
|
|
cmd(command) |
|
|
count +=1 |
|
|
yield seg_file, seg["text"] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
read_wenet() |