File size: 2,204 Bytes
4853fdc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import json
import re
from typing import Union, Dict
from pathlib import Path
import os

MAX_FILE_NAME_LENGTH = 100


def read_jsonl_to_mapping(
    jsonl_file: Union[str, Path],
    key_col: str,
    value_col: str,
    base_path=None,
    overwrite=True,
) -> Dict[str, str]:
    """
    Read two columns, indicated by `key_col` and `value_col`, from the
    given jsonl file to return the mapping dict
    TODO handle duplicate keys
    """
    mapping = {}
    with open(jsonl_file, 'r') as file:
        for line in file.readlines():
            data = json.loads(line.strip())
            key = data[key_col]
            value = data[value_col]
            if base_path:
                value = os.path.join(base_path, value)
            if key not in mapping.keys() or overwrite:
                mapping[key] = value
    return mapping


def sanitize_filename(name: str, max_len: int = MAX_FILE_NAME_LENGTH) -> str:
    """
    Clean and truncate a string to make it a valid and safe filename.
    """
    name = re.sub(r'[\\/*?:"<>|]', '_', name)
    name = name.replace('/', '_')
    max_len = min(len(name), max_len)
    return name[:max_len]


def transform_gen_fn_to_id(audio_file: Path, task: str) -> str:
    if task == "svs":
        audio_id = audio_file.stem.split("_")[0]
    elif task == "sr":
        audio_id = audio_file.stem
    elif task == "tta":
        audio_id = audio_file.stem[:12] + '.wav'
    elif task == "ttm":
        audio_id = audio_file.stem[:11]
        # audio_id = audio_file.stem[:12] + '.wav'
    elif task == "v2a":
        audio_id = audio_file.stem.rsplit("_", 1)[0] + ".mp4"
    elif task == "sta_test" or task == "tta_test":
        audio_id = audio_file.stem[:12] + '.wav'
    elif task == "sta_base":
        audio_id = 'Y' + audio_file.stem[:11] + '.wav'
    else:
        audio_id = audio_file.stem
    return audio_id


def audio_dir_to_mapping(audio_dir: str | Path, task: str) -> dict:
    mapping = {}
    audio_dir = Path(audio_dir)
    audio_files = sorted(audio_dir.iterdir())
    for audio_file in audio_files:
        audio_id = transform_gen_fn_to_id(audio_file, task)
        mapping[audio_id] = str(audio_file.resolve())
    return mapping