File size: 7,323 Bytes
3051899
 
 
67ba0d5
3051899
 
67ba0d5
 
 
 
 
 
3051899
 
 
 
 
 
 
 
 
29d3797
67ba0d5
 
 
 
 
 
3051899
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
67ba0d5
 
 
 
 
 
 
 
 
 
 
 
 
 
3051899
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
67ba0d5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3051899
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
67ba0d5
 
 
 
 
 
 
3051899
 
 
 
 
 
 
 
 
 
 
 
 
 
 
67ba0d5
 
3051899
 
 
 
 
29d3797
3051899
29d3797
 
3051899
 
 
 
67ba0d5
3051899
 
 
 
67ba0d5
 
3051899
 
 
 
 
 
 
29d3797
3051899
29d3797
 
 
 
 
 
 
 
 
 
3051899
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
import json
import os
import re
import time
from pathlib import Path

from huggingface_hub import (
    CommitOperationAdd,
    CommitOperationCopy,
    CommitOperationDelete,
    HfApi,
)


DATASET_DIR = Path(os.getenv("COMMONVOICE_DIR", "CommonVoice24-FA"))
CHECKPOINT_FILE = Path(
    os.getenv("COMMONVOICE_UPLOAD_CHECKPOINT", ".commonvoice_upload_checkpoint.json")
)
REPO_OVERRIDE = os.getenv("COMMONVOICE_REPO")
PREFIX_RE = re.compile(r"^common_voice_fa_(\d+)\.mp3$")
CHUNK_SIZE = int(os.getenv("COMMONVOICE_CHUNK_SIZE", "2000"))
MAX_CHUNKS = int(os.getenv("COMMONVOICE_MAX_CHUNKS", "0"))
BUCKET_COUNT = int(os.getenv("COMMONVOICE_BUCKETS", "100"))
BUCKET_WIDTH = max(2, len(str(max(BUCKET_COUNT - 1, 0))))
MOVE_BATCH_SIZE = int(os.getenv("COMMONVOICE_MOVE_BATCH", "100"))
MIGRATE_EXISTING = os.getenv("COMMONVOICE_MIGRATE", "1") == "1"
COMMIT_RETRIES = int(os.getenv("COMMONVOICE_COMMIT_RETRIES", "3"))
COMMIT_SLEEP = float(os.getenv("COMMONVOICE_COMMIT_SLEEP", "5"))


def load_env(path: Path) -> dict:
    data = {}
    if not path.exists():
        return data
    for raw in path.read_text().splitlines():
        line = raw.strip()
        if not line or line.startswith("#") or "=" not in line:
            continue
        key, value = line.split("=", 1)
        key = key.strip()
        value = value.strip().strip('"').strip("'")
        data[key] = value
    return data


def load_checkpoint(path: Path) -> dict:
    if not path.exists():
        return {
            "metadata_uploaded": False,
            "prefixes": [],
            "clip_index": 0,
            "bucketed": False,
            "bucket_count": BUCKET_COUNT,
        }
    data = json.loads(path.read_text())
    data.setdefault("metadata_uploaded", False)
    data.setdefault("prefixes", [])
    data.setdefault("clip_index", 0)
    data.setdefault("bucketed", False)
    data.setdefault("bucket_count", BUCKET_COUNT)
    return data


def save_checkpoint(path: Path, data: dict) -> None:
    path.write_text(json.dumps(data, indent=2))


def get_clip_files(clip_dir: Path) -> list[Path]:
    files = []
    for filename in clip_dir.iterdir():
        if not filename.is_file():
            continue
        if not PREFIX_RE.match(filename.name):
            continue
        files.append(filename)
    return sorted(files)


def bucket_for_filename(filename: str) -> str:
    match = PREFIX_RE.match(filename)
    if not match:
        return "misc"
    clip_id = int(match.group(1))
    return f"{clip_id % BUCKET_COUNT:0{BUCKET_WIDTH}d}"


def bucketed_repo_path(filename: str) -> str:
    bucket = bucket_for_filename(filename)
    return f"clips/{bucket}/{filename}"


def create_commit_with_retry(api: HfApi, **kwargs) -> None:
    for attempt in range(1, COMMIT_RETRIES + 1):
        try:
            api.create_commit(**kwargs)
            return
        except Exception as exc:
            if attempt >= COMMIT_RETRIES:
                raise
            print(
                "Commit failed, retrying "
                f"({attempt}/{COMMIT_RETRIES}): {exc}"
            )
            time.sleep(COMMIT_SLEEP)


def migrate_root_clips(
    api: HfApi, repo_id: str, checkpoint: dict
) -> None:
    if checkpoint.get("bucketed"):
        return
    if not MIGRATE_EXISTING:
        return

    repo_files = api.list_repo_files(repo_id, repo_type="dataset")
    root_clips = [
        path
        for path in repo_files
        if path.startswith("clips/")
        and path.count("/") == 1
        and PREFIX_RE.match(Path(path).name)
    ]
    if not root_clips:
        checkpoint["bucketed"] = True
        save_checkpoint(CHECKPOINT_FILE, checkpoint)
        return

    for start in range(0, len(root_clips), MOVE_BATCH_SIZE):
        batch = root_clips[start:start + MOVE_BATCH_SIZE]
        operations = []
        for path in batch:
            new_path = bucketed_repo_path(Path(path).name)
            operations.append(
                CommitOperationCopy(
                    src_path_in_repo=path,
                    path_in_repo=new_path,
                )
            )
            operations.append(CommitOperationDelete(path_in_repo=path))
        create_commit_with_retry(
            api,
            repo_id=repo_id,
            repo_type="dataset",
            operations=operations,
            commit_message=(
                "Move Common Voice clips into bucketed subfolders"
            ),
        )

    checkpoint["bucketed"] = True
    checkpoint["bucket_count"] = BUCKET_COUNT
    save_checkpoint(CHECKPOINT_FILE, checkpoint)


def main() -> None:
    env = load_env(Path(".env"))
    token = (
        os.getenv("HF_TOKEN")
        or env.get("HF_TOKEN")
        or env.get("HUGGINGFACEHUB_API_TOKEN")
        or env.get("HF_API_TOKEN")
    )
    if not token:
        raise SystemExit("HF token not found in .env (HF_TOKEN)")

    if not DATASET_DIR.exists():
        raise SystemExit(f"Dataset dir not found: {DATASET_DIR}")

    api = HfApi(token=token)
    username = api.whoami()["name"]
    repo_id = REPO_OVERRIDE or f"{username}/commonvoice-24-fa"

    api.create_repo(repo_id, repo_type="dataset", exist_ok=True)

    checkpoint = load_checkpoint(CHECKPOINT_FILE)
    if int(checkpoint.get("bucket_count", BUCKET_COUNT)) != BUCKET_COUNT:
        raise SystemExit(
            "Bucket count mismatch. "
            f"Checkpoint has {checkpoint.get('bucket_count')}, "
            f"env has {BUCKET_COUNT}. "
            "Set COMMONVOICE_BUCKETS to match the existing upload."
        )

    if not checkpoint.get("metadata_uploaded"):
        api.upload_folder(
            repo_id=repo_id,
            repo_type="dataset",
            folder_path=str(DATASET_DIR),
            ignore_patterns=[
                "clips/**",
                ".DS_Store",
                "**/.DS_Store",
            ],
        )
        checkpoint["metadata_uploaded"] = True
        save_checkpoint(CHECKPOINT_FILE, checkpoint)

    migrate_root_clips(api, repo_id, checkpoint)

    clip_dir = DATASET_DIR / "clips"
    clip_files = get_clip_files(clip_dir)
    total = len(clip_files)
    start_index = int(checkpoint.get("clip_index", 0))

    chunks_done = 0
    for start in range(start_index, total, CHUNK_SIZE):
        if MAX_CHUNKS and chunks_done >= MAX_CHUNKS:
            break
        end = min(total, start + CHUNK_SIZE)
        batch = clip_files[start:end]
        operations = [
            CommitOperationAdd(
                path_in_repo=bucketed_repo_path(path.name),
                path_or_fileobj=str(path),
            )
            for path in batch
        ]
        create_commit_with_retry(
            api,
            repo_id=repo_id,
            repo_type="dataset",
            operations=operations,
            commit_message=f"Add clips {start + 1}-{end} of {total}",
        )
        checkpoint["clip_index"] = end
        save_checkpoint(CHECKPOINT_FILE, checkpoint)
        chunks_done += 1

    uploaded = int(checkpoint.get("clip_index", 0))
    if uploaded >= total:
        print(
            f"Dataset upload complete: https://huggingface.co/datasets/{repo_id}"
        )
    else:
        print(
            f"Uploaded {uploaded}/{total} clips so far: "
            f"https://huggingface.co/datasets/{repo_id}"
        )


if __name__ == "__main__":
    main()