import json import os import re import time from pathlib import Path from huggingface_hub import ( CommitOperationAdd, CommitOperationCopy, CommitOperationDelete, HfApi, ) DATASET_DIR = Path(os.getenv("COMMONVOICE_DIR", "CommonVoice24-FA")) CHECKPOINT_FILE = Path( os.getenv("COMMONVOICE_UPLOAD_CHECKPOINT", ".commonvoice_upload_checkpoint.json") ) REPO_OVERRIDE = os.getenv("COMMONVOICE_REPO") PREFIX_RE = re.compile(r"^common_voice_fa_(\d+)\.mp3$") CHUNK_SIZE = int(os.getenv("COMMONVOICE_CHUNK_SIZE", "2000")) MAX_CHUNKS = int(os.getenv("COMMONVOICE_MAX_CHUNKS", "0")) BUCKET_COUNT = int(os.getenv("COMMONVOICE_BUCKETS", "100")) BUCKET_WIDTH = max(2, len(str(max(BUCKET_COUNT - 1, 0)))) MOVE_BATCH_SIZE = int(os.getenv("COMMONVOICE_MOVE_BATCH", "100")) MIGRATE_EXISTING = os.getenv("COMMONVOICE_MIGRATE", "1") == "1" COMMIT_RETRIES = int(os.getenv("COMMONVOICE_COMMIT_RETRIES", "3")) COMMIT_SLEEP = float(os.getenv("COMMONVOICE_COMMIT_SLEEP", "5")) def load_env(path: Path) -> dict: data = {} if not path.exists(): return data for raw in path.read_text().splitlines(): line = raw.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) key = key.strip() value = value.strip().strip('"').strip("'") data[key] = value return data def load_checkpoint(path: Path) -> dict: if not path.exists(): return { "metadata_uploaded": False, "prefixes": [], "clip_index": 0, "bucketed": False, "bucket_count": BUCKET_COUNT, } data = json.loads(path.read_text()) data.setdefault("metadata_uploaded", False) data.setdefault("prefixes", []) data.setdefault("clip_index", 0) data.setdefault("bucketed", False) data.setdefault("bucket_count", BUCKET_COUNT) return data def save_checkpoint(path: Path, data: dict) -> None: path.write_text(json.dumps(data, indent=2)) def get_clip_files(clip_dir: Path) -> list[Path]: files = [] for filename in clip_dir.iterdir(): if not filename.is_file(): continue if not PREFIX_RE.match(filename.name): continue files.append(filename) return sorted(files) def bucket_for_filename(filename: str) -> str: match = PREFIX_RE.match(filename) if not match: return "misc" clip_id = int(match.group(1)) return f"{clip_id % BUCKET_COUNT:0{BUCKET_WIDTH}d}" def bucketed_repo_path(filename: str) -> str: bucket = bucket_for_filename(filename) return f"clips/{bucket}/{filename}" def create_commit_with_retry(api: HfApi, **kwargs) -> None: for attempt in range(1, COMMIT_RETRIES + 1): try: api.create_commit(**kwargs) return except Exception as exc: if attempt >= COMMIT_RETRIES: raise print( "Commit failed, retrying " f"({attempt}/{COMMIT_RETRIES}): {exc}" ) time.sleep(COMMIT_SLEEP) def migrate_root_clips( api: HfApi, repo_id: str, checkpoint: dict ) -> None: if checkpoint.get("bucketed"): return if not MIGRATE_EXISTING: return repo_files = api.list_repo_files(repo_id, repo_type="dataset") root_clips = [ path for path in repo_files if path.startswith("clips/") and path.count("/") == 1 and PREFIX_RE.match(Path(path).name) ] if not root_clips: checkpoint["bucketed"] = True save_checkpoint(CHECKPOINT_FILE, checkpoint) return for start in range(0, len(root_clips), MOVE_BATCH_SIZE): batch = root_clips[start:start + MOVE_BATCH_SIZE] operations = [] for path in batch: new_path = bucketed_repo_path(Path(path).name) operations.append( CommitOperationCopy( src_path_in_repo=path, path_in_repo=new_path, ) ) operations.append(CommitOperationDelete(path_in_repo=path)) create_commit_with_retry( api, repo_id=repo_id, repo_type="dataset", operations=operations, commit_message=( "Move Common Voice clips into bucketed subfolders" ), ) checkpoint["bucketed"] = True checkpoint["bucket_count"] = BUCKET_COUNT save_checkpoint(CHECKPOINT_FILE, checkpoint) def main() -> None: env = load_env(Path(".env")) token = ( os.getenv("HF_TOKEN") or env.get("HF_TOKEN") or env.get("HUGGINGFACEHUB_API_TOKEN") or env.get("HF_API_TOKEN") ) if not token: raise SystemExit("HF token not found in .env (HF_TOKEN)") if not DATASET_DIR.exists(): raise SystemExit(f"Dataset dir not found: {DATASET_DIR}") api = HfApi(token=token) username = api.whoami()["name"] repo_id = REPO_OVERRIDE or f"{username}/commonvoice-24-fa" api.create_repo(repo_id, repo_type="dataset", exist_ok=True) checkpoint = load_checkpoint(CHECKPOINT_FILE) if int(checkpoint.get("bucket_count", BUCKET_COUNT)) != BUCKET_COUNT: raise SystemExit( "Bucket count mismatch. " f"Checkpoint has {checkpoint.get('bucket_count')}, " f"env has {BUCKET_COUNT}. " "Set COMMONVOICE_BUCKETS to match the existing upload." ) if not checkpoint.get("metadata_uploaded"): api.upload_folder( repo_id=repo_id, repo_type="dataset", folder_path=str(DATASET_DIR), ignore_patterns=[ "clips/**", ".DS_Store", "**/.DS_Store", ], ) checkpoint["metadata_uploaded"] = True save_checkpoint(CHECKPOINT_FILE, checkpoint) migrate_root_clips(api, repo_id, checkpoint) clip_dir = DATASET_DIR / "clips" clip_files = get_clip_files(clip_dir) total = len(clip_files) start_index = int(checkpoint.get("clip_index", 0)) chunks_done = 0 for start in range(start_index, total, CHUNK_SIZE): if MAX_CHUNKS and chunks_done >= MAX_CHUNKS: break end = min(total, start + CHUNK_SIZE) batch = clip_files[start:end] operations = [ CommitOperationAdd( path_in_repo=bucketed_repo_path(path.name), path_or_fileobj=str(path), ) for path in batch ] create_commit_with_retry( api, repo_id=repo_id, repo_type="dataset", operations=operations, commit_message=f"Add clips {start + 1}-{end} of {total}", ) checkpoint["clip_index"] = end save_checkpoint(CHECKPOINT_FILE, checkpoint) chunks_done += 1 uploaded = int(checkpoint.get("clip_index", 0)) if uploaded >= total: print( f"Dataset upload complete: https://huggingface.co/datasets/{repo_id}" ) else: print( f"Uploaded {uploaded}/{total} clips so far: " f"https://huggingface.co/datasets/{repo_id}" ) if __name__ == "__main__": main()