File size: 3,158 Bytes
aa9be1e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 | import os
import json
from tqdm import tqdm
from my_tool import path_join, load_json
from concurrent.futures import ProcessPoolExecutor, as_completed
def _check_label(label:str, max_length:int=30) -> bool:
"""Check if label is valid (non-empty, not timestamp, not long lyrics)"""
length = len(label.strip())
if length == 0:
# print("Error Label: Empty")
return False
if length > max_length:
# print(f"Error Label: Words - {label}")
return False
if label.find(":") != -1 and label.find(".") != -1:
# Considered as timestamp
# print(f"Error Label: Timestamp - {label}")
return False
return True
def _convert_one(path:str):
"""Segment a song's metadata, remove redundant content"""
data = load_json(path)
dir = os.path.dirname(path)
name = f"{data['song_id']}_{data['track_index']}.mp3"
path = path_join(dir, name)
new_data = {
"path": path,
"song_id": data['song_id'],
"segments": []
}
words_info = data['timestamped_lyrics']['alignedWords'] # Sentence-by-sentence information
seg_info = None
empty_head = False
for id, word_info in enumerate(words_info):
if not word_info['success']:
continue
word:str = word_info['word']
label = ""
if word.startswith('['):
if seg_info is not None:
new_data['segments'].append(seg_info)
label_end = word.find(']')
label = word[1:label_end]
if not _check_label(label):
label = ""
if label != "":
seg_info = {
"start": word_info['startS'],
"end": 0,
"label": label,
"word": word[label_end+2:]
}
elif seg_info is not None:
seg_info['end'] = word_info['endS']
seg_info['word'] += word
else:
empty_head = True
if seg_info is not None:
seg_info['end'] = word_info['endS']
seg_info['word'] += word
else:
empty_head = True
if empty_head:
# print(f"Empty Head, segment: {len(new_data['segments'])}, path: {path}")
pass
return new_data
# ===== External Interface =====
def get_convert_segments(data_dir:str, save_path:str, max_workers:int=10):
paths = []
for name in tqdm(os.listdir(data_dir), desc="Getting the JSON Paths"):
if name.endswith(".json"):
path = path_join(data_dir, name)
paths.append(path)
dataset = []
with open(save_path, 'w', encoding='utf-8') as file:
with ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(_convert_one, path) for path in paths]
with tqdm(total=len(futures), desc="Converting Segments") as pbar:
for future in as_completed(futures):
result = future.result()
dataset.append(result)
json.dump(result, file, ensure_ascii=False)
file.write("\n")
pbar.update(1)
return dataset |