Spaces:
Running
Running
| from __future__ import annotations | |
| import asyncio | |
| import contextlib | |
| from pathlib import Path | |
| from huggingface_hub import HfApi | |
| class HfShardUploader: | |
| def __init__( | |
| self, | |
| *, | |
| repo_id: str, | |
| token: str, | |
| repo_type: str = "dataset", | |
| private_repo: bool = False, | |
| path_prefix: str = "crawl_shards", | |
| ) -> None: | |
| self.repo_id = repo_id.strip() | |
| self.token = token.strip() | |
| self.repo_type = repo_type | |
| self.private_repo = bool(private_repo) | |
| self.path_prefix = path_prefix.strip("/") | |
| self.api: HfApi | None = None | |
| async def initialize(self) -> None: | |
| self.api = HfApi(token=self.token or None) | |
| await asyncio.to_thread( | |
| self.api.create_repo, | |
| repo_id=self.repo_id, | |
| repo_type=self.repo_type, | |
| private=self.private_repo, | |
| exist_ok=True, | |
| ) | |
| async def upload_and_delete(self, shard_path: Path, rows: int) -> bool: | |
| if self.api is None: | |
| raise RuntimeError("Uploader was not initialized.") | |
| if self.path_prefix: | |
| path_in_repo = f"{self.path_prefix}/{shard_path.name}" | |
| else: | |
| path_in_repo = shard_path.name | |
| try: | |
| await asyncio.to_thread( | |
| self.api.upload_file, | |
| path_or_fileobj=str(shard_path), | |
| path_in_repo=path_in_repo, | |
| repo_id=self.repo_id, | |
| repo_type=self.repo_type, | |
| commit_message=f"Add crawl shard {shard_path.name} ({rows} rows)", | |
| ) | |
| except Exception: | |
| return False | |
| with contextlib.suppress(FileNotFoundError): | |
| shard_path.unlink() | |
| return True | |