File size: 3,158 Bytes
aa9be1e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import os
import json
from tqdm import tqdm
from my_tool import path_join, load_json
from concurrent.futures import ProcessPoolExecutor, as_completed

def _check_label(label:str, max_length:int=30) -> bool:
    """Check if label is valid (non-empty, not timestamp, not long lyrics)"""
    length = len(label.strip())
    if length == 0:
        # print("Error Label: Empty")
        return False
    if length > max_length:
        # print(f"Error Label: Words - {label}")
        return False
    if label.find(":") != -1 and label.find(".") != -1:
        # Considered as timestamp
        # print(f"Error Label: Timestamp - {label}")
        return False
    return True

def _convert_one(path:str):
    """Segment a song's metadata, remove redundant content"""
    data = load_json(path)
    dir = os.path.dirname(path)
    name = f"{data['song_id']}_{data['track_index']}.mp3"
    path = path_join(dir, name)
    new_data = {
        "path": path,
        "song_id": data['song_id'],
        "segments": []
    }
    words_info = data['timestamped_lyrics']['alignedWords']  # Sentence-by-sentence information
    seg_info = None

    empty_head = False
    for id, word_info in enumerate(words_info):
        if not word_info['success']:
            continue
        word:str = word_info['word']
        
        label = ""
        if word.startswith('['):
            if seg_info is not None:
                new_data['segments'].append(seg_info)
            label_end = word.find(']')
            label = word[1:label_end]
            if not _check_label(label):
                label = ""

        if label != "":
            seg_info = {
                "start": word_info['startS'],
                "end": 0,
                "label": label,
                "word": word[label_end+2:]
            }
        elif seg_info is not None:
            seg_info['end'] = word_info['endS']
            seg_info['word'] += word
        else:
            empty_head = True
    if seg_info is not None:
        seg_info['end'] = word_info['endS']
        seg_info['word'] += word
    else:
        empty_head = True
    if empty_head:
        # print(f"Empty Head, segment: {len(new_data['segments'])}, path: {path}")
        pass
    return new_data

# ===== External Interface =====

def get_convert_segments(data_dir:str, save_path:str, max_workers:int=10):
    paths = []
    for name in tqdm(os.listdir(data_dir), desc="Getting the JSON Paths"):
        if name.endswith(".json"):
            path = path_join(data_dir, name)
            paths.append(path)

    dataset = []
    with open(save_path, 'w', encoding='utf-8') as file:
        with ProcessPoolExecutor(max_workers=max_workers) as executor:
            futures = [executor.submit(_convert_one, path) for path in paths]
            with tqdm(total=len(futures), desc="Converting Segments") as pbar:
                for future in as_completed(futures):
                    result = future.result()
                    dataset.append(result)
                    json.dump(result, file, ensure_ascii=False)
                    file.write("\n")
                    pbar.update(1)
    return dataset