Spaces:
Running
on
Zero
Running
on
Zero
| import json | |
| import os | |
| import re | |
| import time | |
| from pathlib import Path | |
| from huggingface_hub import ( | |
| CommitOperationAdd, | |
| CommitOperationCopy, | |
| CommitOperationDelete, | |
| HfApi, | |
| ) | |
| DATASET_DIR = Path(os.getenv("COMMONVOICE_DIR", "CommonVoice24-FA")) | |
| CHECKPOINT_FILE = Path( | |
| os.getenv("COMMONVOICE_UPLOAD_CHECKPOINT", ".commonvoice_upload_checkpoint.json") | |
| ) | |
| REPO_OVERRIDE = os.getenv("COMMONVOICE_REPO") | |
| PREFIX_RE = re.compile(r"^common_voice_fa_(\d+)\.mp3$") | |
| CHUNK_SIZE = int(os.getenv("COMMONVOICE_CHUNK_SIZE", "2000")) | |
| MAX_CHUNKS = int(os.getenv("COMMONVOICE_MAX_CHUNKS", "0")) | |
| BUCKET_COUNT = int(os.getenv("COMMONVOICE_BUCKETS", "100")) | |
| BUCKET_WIDTH = max(2, len(str(max(BUCKET_COUNT - 1, 0)))) | |
| MOVE_BATCH_SIZE = int(os.getenv("COMMONVOICE_MOVE_BATCH", "100")) | |
| MIGRATE_EXISTING = os.getenv("COMMONVOICE_MIGRATE", "1") == "1" | |
| COMMIT_RETRIES = int(os.getenv("COMMONVOICE_COMMIT_RETRIES", "3")) | |
| COMMIT_SLEEP = float(os.getenv("COMMONVOICE_COMMIT_SLEEP", "5")) | |
| def load_env(path: Path) -> dict: | |
| data = {} | |
| if not path.exists(): | |
| return data | |
| for raw in path.read_text().splitlines(): | |
| line = raw.strip() | |
| if not line or line.startswith("#") or "=" not in line: | |
| continue | |
| key, value = line.split("=", 1) | |
| key = key.strip() | |
| value = value.strip().strip('"').strip("'") | |
| data[key] = value | |
| return data | |
| def load_checkpoint(path: Path) -> dict: | |
| if not path.exists(): | |
| return { | |
| "metadata_uploaded": False, | |
| "prefixes": [], | |
| "clip_index": 0, | |
| "bucketed": False, | |
| "bucket_count": BUCKET_COUNT, | |
| } | |
| data = json.loads(path.read_text()) | |
| data.setdefault("metadata_uploaded", False) | |
| data.setdefault("prefixes", []) | |
| data.setdefault("clip_index", 0) | |
| data.setdefault("bucketed", False) | |
| data.setdefault("bucket_count", BUCKET_COUNT) | |
| return data | |
| def save_checkpoint(path: Path, data: dict) -> None: | |
| path.write_text(json.dumps(data, indent=2)) | |
| def get_clip_files(clip_dir: Path) -> list[Path]: | |
| files = [] | |
| for filename in clip_dir.iterdir(): | |
| if not filename.is_file(): | |
| continue | |
| if not PREFIX_RE.match(filename.name): | |
| continue | |
| files.append(filename) | |
| return sorted(files) | |
| def bucket_for_filename(filename: str) -> str: | |
| match = PREFIX_RE.match(filename) | |
| if not match: | |
| return "misc" | |
| clip_id = int(match.group(1)) | |
| return f"{clip_id % BUCKET_COUNT:0{BUCKET_WIDTH}d}" | |
| def bucketed_repo_path(filename: str) -> str: | |
| bucket = bucket_for_filename(filename) | |
| return f"clips/{bucket}/{filename}" | |
| def create_commit_with_retry(api: HfApi, **kwargs) -> None: | |
| for attempt in range(1, COMMIT_RETRIES + 1): | |
| try: | |
| api.create_commit(**kwargs) | |
| return | |
| except Exception as exc: | |
| if attempt >= COMMIT_RETRIES: | |
| raise | |
| print( | |
| "Commit failed, retrying " | |
| f"({attempt}/{COMMIT_RETRIES}): {exc}" | |
| ) | |
| time.sleep(COMMIT_SLEEP) | |
| def migrate_root_clips( | |
| api: HfApi, repo_id: str, checkpoint: dict | |
| ) -> None: | |
| if checkpoint.get("bucketed"): | |
| return | |
| if not MIGRATE_EXISTING: | |
| return | |
| repo_files = api.list_repo_files(repo_id, repo_type="dataset") | |
| root_clips = [ | |
| path | |
| for path in repo_files | |
| if path.startswith("clips/") | |
| and path.count("/") == 1 | |
| and PREFIX_RE.match(Path(path).name) | |
| ] | |
| if not root_clips: | |
| checkpoint["bucketed"] = True | |
| save_checkpoint(CHECKPOINT_FILE, checkpoint) | |
| return | |
| for start in range(0, len(root_clips), MOVE_BATCH_SIZE): | |
| batch = root_clips[start:start + MOVE_BATCH_SIZE] | |
| operations = [] | |
| for path in batch: | |
| new_path = bucketed_repo_path(Path(path).name) | |
| operations.append( | |
| CommitOperationCopy( | |
| src_path_in_repo=path, | |
| path_in_repo=new_path, | |
| ) | |
| ) | |
| operations.append(CommitOperationDelete(path_in_repo=path)) | |
| create_commit_with_retry( | |
| api, | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| operations=operations, | |
| commit_message=( | |
| "Move Common Voice clips into bucketed subfolders" | |
| ), | |
| ) | |
| checkpoint["bucketed"] = True | |
| checkpoint["bucket_count"] = BUCKET_COUNT | |
| save_checkpoint(CHECKPOINT_FILE, checkpoint) | |
| def main() -> None: | |
| env = load_env(Path(".env")) | |
| token = ( | |
| os.getenv("HF_TOKEN") | |
| or env.get("HF_TOKEN") | |
| or env.get("HUGGINGFACEHUB_API_TOKEN") | |
| or env.get("HF_API_TOKEN") | |
| ) | |
| if not token: | |
| raise SystemExit("HF token not found in .env (HF_TOKEN)") | |
| if not DATASET_DIR.exists(): | |
| raise SystemExit(f"Dataset dir not found: {DATASET_DIR}") | |
| api = HfApi(token=token) | |
| username = api.whoami()["name"] | |
| repo_id = REPO_OVERRIDE or f"{username}/commonvoice-24-fa" | |
| api.create_repo(repo_id, repo_type="dataset", exist_ok=True) | |
| checkpoint = load_checkpoint(CHECKPOINT_FILE) | |
| if int(checkpoint.get("bucket_count", BUCKET_COUNT)) != BUCKET_COUNT: | |
| raise SystemExit( | |
| "Bucket count mismatch. " | |
| f"Checkpoint has {checkpoint.get('bucket_count')}, " | |
| f"env has {BUCKET_COUNT}. " | |
| "Set COMMONVOICE_BUCKETS to match the existing upload." | |
| ) | |
| if not checkpoint.get("metadata_uploaded"): | |
| api.upload_folder( | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| folder_path=str(DATASET_DIR), | |
| ignore_patterns=[ | |
| "clips/**", | |
| ".DS_Store", | |
| "**/.DS_Store", | |
| ], | |
| ) | |
| checkpoint["metadata_uploaded"] = True | |
| save_checkpoint(CHECKPOINT_FILE, checkpoint) | |
| migrate_root_clips(api, repo_id, checkpoint) | |
| clip_dir = DATASET_DIR / "clips" | |
| clip_files = get_clip_files(clip_dir) | |
| total = len(clip_files) | |
| start_index = int(checkpoint.get("clip_index", 0)) | |
| chunks_done = 0 | |
| for start in range(start_index, total, CHUNK_SIZE): | |
| if MAX_CHUNKS and chunks_done >= MAX_CHUNKS: | |
| break | |
| end = min(total, start + CHUNK_SIZE) | |
| batch = clip_files[start:end] | |
| operations = [ | |
| CommitOperationAdd( | |
| path_in_repo=bucketed_repo_path(path.name), | |
| path_or_fileobj=str(path), | |
| ) | |
| for path in batch | |
| ] | |
| create_commit_with_retry( | |
| api, | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| operations=operations, | |
| commit_message=f"Add clips {start + 1}-{end} of {total}", | |
| ) | |
| checkpoint["clip_index"] = end | |
| save_checkpoint(CHECKPOINT_FILE, checkpoint) | |
| chunks_done += 1 | |
| uploaded = int(checkpoint.get("clip_index", 0)) | |
| if uploaded >= total: | |
| print( | |
| f"Dataset upload complete: https://huggingface.co/datasets/{repo_id}" | |
| ) | |
| else: | |
| print( | |
| f"Uploaded {uploaded}/{total} clips so far: " | |
| f"https://huggingface.co/datasets/{repo_id}" | |
| ) | |
| if __name__ == "__main__": | |
| main() | |