representation-chizzler / scripts /upload_commonvoice_chunks.py
Reza2kn's picture
Update app for GPU-aware model loading and dataset fixes
67ba0d5 verified
import json
import os
import re
import time
from pathlib import Path
from huggingface_hub import (
CommitOperationAdd,
CommitOperationCopy,
CommitOperationDelete,
HfApi,
)
DATASET_DIR = Path(os.getenv("COMMONVOICE_DIR", "CommonVoice24-FA"))
CHECKPOINT_FILE = Path(
os.getenv("COMMONVOICE_UPLOAD_CHECKPOINT", ".commonvoice_upload_checkpoint.json")
)
REPO_OVERRIDE = os.getenv("COMMONVOICE_REPO")
PREFIX_RE = re.compile(r"^common_voice_fa_(\d+)\.mp3$")
CHUNK_SIZE = int(os.getenv("COMMONVOICE_CHUNK_SIZE", "2000"))
MAX_CHUNKS = int(os.getenv("COMMONVOICE_MAX_CHUNKS", "0"))
BUCKET_COUNT = int(os.getenv("COMMONVOICE_BUCKETS", "100"))
BUCKET_WIDTH = max(2, len(str(max(BUCKET_COUNT - 1, 0))))
MOVE_BATCH_SIZE = int(os.getenv("COMMONVOICE_MOVE_BATCH", "100"))
MIGRATE_EXISTING = os.getenv("COMMONVOICE_MIGRATE", "1") == "1"
COMMIT_RETRIES = int(os.getenv("COMMONVOICE_COMMIT_RETRIES", "3"))
COMMIT_SLEEP = float(os.getenv("COMMONVOICE_COMMIT_SLEEP", "5"))
def load_env(path: Path) -> dict:
data = {}
if not path.exists():
return data
for raw in path.read_text().splitlines():
line = raw.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
key = key.strip()
value = value.strip().strip('"').strip("'")
data[key] = value
return data
def load_checkpoint(path: Path) -> dict:
if not path.exists():
return {
"metadata_uploaded": False,
"prefixes": [],
"clip_index": 0,
"bucketed": False,
"bucket_count": BUCKET_COUNT,
}
data = json.loads(path.read_text())
data.setdefault("metadata_uploaded", False)
data.setdefault("prefixes", [])
data.setdefault("clip_index", 0)
data.setdefault("bucketed", False)
data.setdefault("bucket_count", BUCKET_COUNT)
return data
def save_checkpoint(path: Path, data: dict) -> None:
path.write_text(json.dumps(data, indent=2))
def get_clip_files(clip_dir: Path) -> list[Path]:
files = []
for filename in clip_dir.iterdir():
if not filename.is_file():
continue
if not PREFIX_RE.match(filename.name):
continue
files.append(filename)
return sorted(files)
def bucket_for_filename(filename: str) -> str:
match = PREFIX_RE.match(filename)
if not match:
return "misc"
clip_id = int(match.group(1))
return f"{clip_id % BUCKET_COUNT:0{BUCKET_WIDTH}d}"
def bucketed_repo_path(filename: str) -> str:
bucket = bucket_for_filename(filename)
return f"clips/{bucket}/{filename}"
def create_commit_with_retry(api: HfApi, **kwargs) -> None:
for attempt in range(1, COMMIT_RETRIES + 1):
try:
api.create_commit(**kwargs)
return
except Exception as exc:
if attempt >= COMMIT_RETRIES:
raise
print(
"Commit failed, retrying "
f"({attempt}/{COMMIT_RETRIES}): {exc}"
)
time.sleep(COMMIT_SLEEP)
def migrate_root_clips(
api: HfApi, repo_id: str, checkpoint: dict
) -> None:
if checkpoint.get("bucketed"):
return
if not MIGRATE_EXISTING:
return
repo_files = api.list_repo_files(repo_id, repo_type="dataset")
root_clips = [
path
for path in repo_files
if path.startswith("clips/")
and path.count("/") == 1
and PREFIX_RE.match(Path(path).name)
]
if not root_clips:
checkpoint["bucketed"] = True
save_checkpoint(CHECKPOINT_FILE, checkpoint)
return
for start in range(0, len(root_clips), MOVE_BATCH_SIZE):
batch = root_clips[start:start + MOVE_BATCH_SIZE]
operations = []
for path in batch:
new_path = bucketed_repo_path(Path(path).name)
operations.append(
CommitOperationCopy(
src_path_in_repo=path,
path_in_repo=new_path,
)
)
operations.append(CommitOperationDelete(path_in_repo=path))
create_commit_with_retry(
api,
repo_id=repo_id,
repo_type="dataset",
operations=operations,
commit_message=(
"Move Common Voice clips into bucketed subfolders"
),
)
checkpoint["bucketed"] = True
checkpoint["bucket_count"] = BUCKET_COUNT
save_checkpoint(CHECKPOINT_FILE, checkpoint)
def main() -> None:
env = load_env(Path(".env"))
token = (
os.getenv("HF_TOKEN")
or env.get("HF_TOKEN")
or env.get("HUGGINGFACEHUB_API_TOKEN")
or env.get("HF_API_TOKEN")
)
if not token:
raise SystemExit("HF token not found in .env (HF_TOKEN)")
if not DATASET_DIR.exists():
raise SystemExit(f"Dataset dir not found: {DATASET_DIR}")
api = HfApi(token=token)
username = api.whoami()["name"]
repo_id = REPO_OVERRIDE or f"{username}/commonvoice-24-fa"
api.create_repo(repo_id, repo_type="dataset", exist_ok=True)
checkpoint = load_checkpoint(CHECKPOINT_FILE)
if int(checkpoint.get("bucket_count", BUCKET_COUNT)) != BUCKET_COUNT:
raise SystemExit(
"Bucket count mismatch. "
f"Checkpoint has {checkpoint.get('bucket_count')}, "
f"env has {BUCKET_COUNT}. "
"Set COMMONVOICE_BUCKETS to match the existing upload."
)
if not checkpoint.get("metadata_uploaded"):
api.upload_folder(
repo_id=repo_id,
repo_type="dataset",
folder_path=str(DATASET_DIR),
ignore_patterns=[
"clips/**",
".DS_Store",
"**/.DS_Store",
],
)
checkpoint["metadata_uploaded"] = True
save_checkpoint(CHECKPOINT_FILE, checkpoint)
migrate_root_clips(api, repo_id, checkpoint)
clip_dir = DATASET_DIR / "clips"
clip_files = get_clip_files(clip_dir)
total = len(clip_files)
start_index = int(checkpoint.get("clip_index", 0))
chunks_done = 0
for start in range(start_index, total, CHUNK_SIZE):
if MAX_CHUNKS and chunks_done >= MAX_CHUNKS:
break
end = min(total, start + CHUNK_SIZE)
batch = clip_files[start:end]
operations = [
CommitOperationAdd(
path_in_repo=bucketed_repo_path(path.name),
path_or_fileobj=str(path),
)
for path in batch
]
create_commit_with_retry(
api,
repo_id=repo_id,
repo_type="dataset",
operations=operations,
commit_message=f"Add clips {start + 1}-{end} of {total}",
)
checkpoint["clip_index"] = end
save_checkpoint(CHECKPOINT_FILE, checkpoint)
chunks_done += 1
uploaded = int(checkpoint.get("clip_index", 0))
if uploaded >= total:
print(
f"Dataset upload complete: https://huggingface.co/datasets/{repo_id}"
)
else:
print(
f"Uploaded {uploaded}/{total} clips so far: "
f"https://huggingface.co/datasets/{repo_id}"
)
if __name__ == "__main__":
main()