Spaces:
Runtime error
Runtime error
| from dataclasses import dataclass | |
| from huggingface_hub import HfApi, hf_hub_download, upload_file | |
| import pyarrow.parquet as pq | |
| from datetime import datetime, timezone | |
| import tempfile | |
| import hashlib | |
| import toml | |
| import os | |
| _ROOT = os.path.dirname(os.path.abspath(__file__)) | |
| ROOT = os.path.dirname(_ROOT) | |
| GR_CONF = { | |
| "title": "OpenVideo", | |
| "css": f"{ROOT}/assets/design.css", | |
| "js": f"{ROOT}/assets/design.js", | |
| "theme": f"{ROOT}/assets/theme.json", | |
| } | |
| class HFUser: | |
| token: str | |
| name: str | |
| avatar: str | |
| hf_api: HfApi | |
| repo: str = None | |
| def from_token(cls, _token: str) -> "HFUser": | |
| _api = HfApi(token=_token, endpoint="https://huggingface.co") | |
| raw = _api.whoami() | |
| _name = raw["name"] | |
| _avatar = f"https://huggingface.co{raw["avatarUrl"]}" | |
| return cls(token=_token, name=_name, avatar=_avatar, hf_api=_api) | |
| def empty(cls) -> "HFUser": | |
| return cls(token=None, name=None, avatar=None, hf_api=None) | |
| def auth(self, _token: str) -> "HFUser": | |
| _api = HfApi(token=_token, endpoint="https://huggingface.co") | |
| raw = _api.whoami() | |
| self.token = _token | |
| self.hf_api = _api | |
| self.name = raw["name"] | |
| self._avatar = f"https://huggingface.co{raw["avatarUrl"]}" | |
| return self.name | |
| def ping(self, name) -> str: | |
| if(name == self.name): | |
| _utc = datetime.now(timezone.utc) | |
| return _utc.strftime('%Y-%m-%d') | |
| return "Unauthorized OP" | |
| def list_dataset(self, repo, path="data"): | |
| self.repo = repo | |
| _raw = self.hf_api.list_repo_files(repo, repo_type="dataset") | |
| if repo == "OpenVideo/Sample-2k": | |
| files = filter(lambda f: f.endswith(".mp4"), _raw) | |
| else: | |
| files = filter(lambda f: f.startswith(path + os.sep), _raw) | |
| return list(files) | |
| def fetch_file(self, fname, local_dir="/dev/shm"): | |
| os.environ["TOKIO_WORKER_THREADS"] = "8" | |
| os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1" | |
| file = hf_hub_download(repo_id=self.repo, filename=fname, repo_type="dataset", local_dir=local_dir, token=self.token) | |
| return file | |
| def split_parquet(self, pq_path, batch_size=1): | |
| tasks = [] | |
| print(pq_path, batch_size) | |
| pf = pq.ParquetFile(pq_path) | |
| for batch in pf.iter_batches(batch_size): | |
| _chunk = [] | |
| df = batch.to_pandas() | |
| for binary in df["video"]: | |
| if(binary): | |
| _v = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) | |
| with open(_v.name, "wb") as f: | |
| _ = f.write(binary) | |
| _chunk.append(_v.name) | |
| tasks.append(_chunk) | |
| del df | |
| return tasks | |
| def fetch_file_slice(self, fname, batch_size): | |
| pq_path = self.fetch_file(self.repo, fname, local_dir="/tmp") | |
| file_slice = self.split_parquet(pq_path, batch_size) | |
| return file_slice | |
| def dump_video(self, binary): | |
| vt = tempfile.NamedTemporaryFile(suffix=".mp4") | |
| with open(vt.name, "wb") as f: | |
| _ = f.write(binary) | |
| return vt.name | |
| def bytes_md5(self, binary): | |
| md5_hash = hashlib.md5(binary).hexdigest() | |
| return md5_hash | |
| def tag(self, binary, tags): | |
| md5_hash = self.bytes_md5(binary) | |
| _info = { | |
| "md5": md5_hash, | |
| "tags": tags, | |
| "operator_name": self.name, | |
| } | |
| return _info | |
| def push_tag(self, info, target_repo): | |
| md5 = info["md5"] | |
| _raw = f"raw/{md5}/{md5}.mp4" | |
| idx_path = f"index/{md5}/{md5}.toml" | |
| with open(idx_path, "w") as f: | |
| toml.dump(info, f) | |
| _repo = upload_file( | |
| path_or_fileobj=idx_path, | |
| path_in_repo=idx_path, | |
| repo_id = target_repo, | |
| token=self.token, | |
| repo_type="dataset" | |
| ) | |
| return _repo | |
| if __name__ == "__main__": | |
| u = HFUser.from_token("hf_xxxxx") | |
| _patch = u.split_parquet("/opt/000000.parquet", batch_size=4) |