fromozu commited on
Commit
9e3e1ac
·
verified ·
1 Parent(s): 92b449a

Upload hf_backend/hub_store.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. hf_backend/hub_store.py +133 -0
hf_backend/hub_store.py ADDED
@@ -0,0 +1,133 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import io
4
+ import json
5
+ import time
6
+ from pathlib import Path
7
+
8
+ from huggingface_hub import HfApi, hf_hub_download
9
+ from huggingface_hub.errors import EntryNotFoundError, HfHubHTTPError
10
+
11
+ from hf_backend.config import AppConfig
12
+ from hf_backend.manifest import build_download_url, empty_manifest, normalize_manifest
13
+
14
+
15
+ class HubStore:
16
+ _retryable_status_codes = {429, 500, 502, 503, 504}
17
+ _commit_retry_attempts = 5
18
+ _commit_retry_delay_seconds = 1.0
19
+
20
+ def __init__(self, config: AppConfig) -> None:
21
+ self.config = config
22
+ self.api = HfApi(token=config.hf_token)
23
+
24
+ def _is_retryable_hub_error(self, exc: HfHubHTTPError) -> bool:
25
+ response = getattr(exc, "response", None)
26
+ if response is None:
27
+ return False
28
+ return response.status_code in self._retryable_status_codes
29
+
30
+ def _run_with_commit_retry(self, operation):
31
+ attempt = 0
32
+ while True:
33
+ try:
34
+ return operation()
35
+ except HfHubHTTPError as exc:
36
+ attempt += 1
37
+ if attempt >= self._commit_retry_attempts or not self._is_retryable_hub_error(exc):
38
+ raise
39
+ time.sleep(self._commit_retry_delay_seconds * attempt)
40
+
41
+ def ensure_dataset_repo(self) -> None:
42
+ try:
43
+ self.api.repo_info(
44
+ repo_id=self.config.dataset_repo_id,
45
+ repo_type="dataset",
46
+ token=self.config.hf_token,
47
+ )
48
+ return
49
+ except EntryNotFoundError:
50
+ pass
51
+ except HfHubHTTPError as exc:
52
+ if exc.response is None or exc.response.status_code != 404:
53
+ raise
54
+
55
+ self.api.create_repo(
56
+ self.config.dataset_repo_id,
57
+ repo_type="dataset",
58
+ private=False,
59
+ exist_ok=True,
60
+ )
61
+
62
+ def load_manifest(self) -> dict:
63
+ try:
64
+ manifest_path = hf_hub_download(
65
+ repo_id=self.config.dataset_repo_id,
66
+ repo_type="dataset",
67
+ filename=self.config.manifest_path,
68
+ token=self.config.hf_token,
69
+ force_download=True,
70
+ )
71
+ with open(manifest_path, encoding="utf-8") as handle:
72
+ return normalize_manifest(json.load(handle))
73
+ except (EntryNotFoundError, FileNotFoundError):
74
+ return empty_manifest()
75
+ except HfHubHTTPError as exc:
76
+ if exc.response is not None and exc.response.status_code == 404:
77
+ return empty_manifest()
78
+ raise
79
+
80
+ def save_manifest(self, manifest: dict, *, message: str = "Update job manifest") -> dict:
81
+ normalized = normalize_manifest(manifest)
82
+ data = json.dumps(normalized, ensure_ascii=False, indent=2).encode("utf-8")
83
+ self.upload_bytes(data, self.config.manifest_path, commit_message=message)
84
+ return normalized
85
+
86
+ def upload_bytes(self, content: bytes, repo_path: str, *, commit_message: str) -> str:
87
+ self._run_with_commit_retry(
88
+ lambda: self.api.upload_file(
89
+ path_or_fileobj=io.BytesIO(content),
90
+ path_in_repo=repo_path,
91
+ repo_id=self.config.dataset_repo_id,
92
+ repo_type="dataset",
93
+ token=self.config.hf_token,
94
+ commit_message=commit_message,
95
+ )
96
+ )
97
+ return build_download_url(self.config.dataset_repo_id, repo_path)
98
+
99
+ def upload_local_file(self, local_path: Path, repo_path: str, *, commit_message: str) -> str:
100
+ self._run_with_commit_retry(
101
+ lambda: self.api.upload_file(
102
+ path_or_fileobj=str(local_path),
103
+ path_in_repo=repo_path,
104
+ repo_id=self.config.dataset_repo_id,
105
+ repo_type="dataset",
106
+ token=self.config.hf_token,
107
+ commit_message=commit_message,
108
+ )
109
+ )
110
+ return build_download_url(self.config.dataset_repo_id, repo_path)
111
+
112
+ def delete_repo_file(self, repo_path: str, *, commit_message: str) -> None:
113
+ self._run_with_commit_retry(
114
+ lambda: self.api.delete_file(
115
+ path_in_repo=repo_path,
116
+ repo_id=self.config.dataset_repo_id,
117
+ repo_type="dataset",
118
+ token=self.config.hf_token,
119
+ commit_message=commit_message,
120
+ )
121
+ )
122
+
123
+ def download_to_directory(self, repo_path: str, directory: Path) -> Path:
124
+ directory.mkdir(parents=True, exist_ok=True)
125
+ downloaded = hf_hub_download(
126
+ repo_id=self.config.dataset_repo_id,
127
+ repo_type="dataset",
128
+ filename=repo_path,
129
+ token=self.config.hf_token,
130
+ local_dir=directory,
131
+ force_download=True,
132
+ )
133
+ return Path(downloaded)