pjpjq commited on
Commit
abb751f
·
1 Parent(s): 303ab39

将 Hugging Face usage 快照改为轮转保存

Browse files
Files changed (2) hide show
  1. entrypoint.sh +13 -3
  2. hf_snapshot.py +213 -18
entrypoint.sh CHANGED
@@ -26,6 +26,8 @@ MC_CONFIG_DIR="${MC_CONFIG_DIR:-${AUTH_BASE}/.mc}"
26
  USAGE_HF_TOKEN="${USAGE_HF_TOKEN:-${HF_TOKEN:-}}"
27
  USAGE_HF_REPO_ID="${USAGE_HF_REPO_ID:-pjpjq/daili-usage-state}"
28
  USAGE_HF_PATH="${USAGE_HF_PATH:-usage-export.json}"
 
 
29
  MODEL_PRICES_SOURCE_URL="${MODEL_PRICES_SOURCE_URL:-https://zhanzhong.zeabur.app/api/pricing}"
30
  MODEL_PRICES_OUTPUT_PATH="${MODEL_PRICES_OUTPUT_PATH:-${AUTH_BASE}/usage-state/model-prices.json}"
31
  MODEL_PRICES_FETCH_TIMEOUT="${MODEL_PRICES_FETCH_TIMEOUT:-20}"
@@ -155,12 +157,16 @@ download_hf_snapshot() {
155
  }
156
 
157
  upload_hf_snapshot() {
 
158
  [ "$(snapshot_mode)" = "hf" ] || return 0
159
  [ -s "$USAGE_SNAPSHOT_PATH" ] || return 0
160
  env -u HTTP_PROXY -u HTTPS_PROXY -u ALL_PROXY -u http_proxy -u https_proxy -u all_proxy \
161
  USAGE_HF_TOKEN="$USAGE_HF_TOKEN" \
162
  USAGE_HF_REPO_ID="$USAGE_HF_REPO_ID" \
163
  USAGE_HF_PATH="$USAGE_HF_PATH" \
 
 
 
164
  USAGE_SNAPSHOT_PATH="$USAGE_SNAPSHOT_PATH" \
165
  /usr/local/bin/python3 /opt/daili/hf_snapshot.py upload >/dev/null 2>&1 || true
166
  }
@@ -226,6 +232,10 @@ upload_remote_snapshot() {
226
  }
227
 
228
  backup_usage() {
 
 
 
 
229
  [ -n "$GATEWAY_PID" ] || return 0
230
  kill -0 "$GATEWAY_PID" 2>/dev/null || return 0
231
  wait_for_management || return 0
@@ -235,7 +245,7 @@ backup_usage() {
235
  if curl -fsS -H "X-Management-Key: $MGMT_KEY_VALUE" "http://127.0.0.1:${APP_PORT}/v0/management/usage/export" >"$tmp_path"; then
236
  mv "$tmp_path" "$USAGE_SNAPSHOT_PATH"
237
  upload_r2_snapshot
238
- upload_hf_snapshot
239
  upload_remote_snapshot
240
  else
241
  rm -f "$tmp_path"
@@ -289,7 +299,7 @@ stop_background_jobs() {
289
 
290
  shutdown() {
291
  stop_background_jobs
292
- backup_usage
293
  if [ -n "$GATEWAY_PID" ] && kill -0 "$GATEWAY_PID" 2>/dev/null; then
294
  kill "$GATEWAY_PID" 2>/dev/null || true
295
  wait "$GATEWAY_PID" 2>/dev/null || true
@@ -311,5 +321,5 @@ wait "$GATEWAY_PID"
311
  status="$?"
312
  stop_background_jobs
313
  refresh_model_prices
314
- backup_usage
315
  exit "$status"
 
26
  USAGE_HF_TOKEN="${USAGE_HF_TOKEN:-${HF_TOKEN:-}}"
27
  USAGE_HF_REPO_ID="${USAGE_HF_REPO_ID:-pjpjq/daili-usage-state}"
28
  USAGE_HF_PATH="${USAGE_HF_PATH:-usage-export.json}"
29
+ USAGE_HF_ROTATE_INTERVAL="${USAGE_HF_ROTATE_INTERVAL:-3600}"
30
+ USAGE_HF_ROTATE_KEEP="${USAGE_HF_ROTATE_KEEP:-24}"
31
  MODEL_PRICES_SOURCE_URL="${MODEL_PRICES_SOURCE_URL:-https://zhanzhong.zeabur.app/api/pricing}"
32
  MODEL_PRICES_OUTPUT_PATH="${MODEL_PRICES_OUTPUT_PATH:-${AUTH_BASE}/usage-state/model-prices.json}"
33
  MODEL_PRICES_FETCH_TIMEOUT="${MODEL_PRICES_FETCH_TIMEOUT:-20}"
 
157
  }
158
 
159
  upload_hf_snapshot() {
160
+ force_upload="${1:-0}"
161
  [ "$(snapshot_mode)" = "hf" ] || return 0
162
  [ -s "$USAGE_SNAPSHOT_PATH" ] || return 0
163
  env -u HTTP_PROXY -u HTTPS_PROXY -u ALL_PROXY -u http_proxy -u https_proxy -u all_proxy \
164
  USAGE_HF_TOKEN="$USAGE_HF_TOKEN" \
165
  USAGE_HF_REPO_ID="$USAGE_HF_REPO_ID" \
166
  USAGE_HF_PATH="$USAGE_HF_PATH" \
167
+ USAGE_HF_ROTATE_INTERVAL="$USAGE_HF_ROTATE_INTERVAL" \
168
+ USAGE_HF_ROTATE_KEEP="$USAGE_HF_ROTATE_KEEP" \
169
+ USAGE_HF_FORCE_UPLOAD="$force_upload" \
170
  USAGE_SNAPSHOT_PATH="$USAGE_SNAPSHOT_PATH" \
171
  /usr/local/bin/python3 /opt/daili/hf_snapshot.py upload >/dev/null 2>&1 || true
172
  }
 
232
  }
233
 
234
  backup_usage() {
235
+ force_upload="0"
236
+ if [ "${1:-}" = "force" ]; then
237
+ force_upload="1"
238
+ fi
239
  [ -n "$GATEWAY_PID" ] || return 0
240
  kill -0 "$GATEWAY_PID" 2>/dev/null || return 0
241
  wait_for_management || return 0
 
245
  if curl -fsS -H "X-Management-Key: $MGMT_KEY_VALUE" "http://127.0.0.1:${APP_PORT}/v0/management/usage/export" >"$tmp_path"; then
246
  mv "$tmp_path" "$USAGE_SNAPSHOT_PATH"
247
  upload_r2_snapshot
248
+ upload_hf_snapshot "$force_upload"
249
  upload_remote_snapshot
250
  else
251
  rm -f "$tmp_path"
 
299
 
300
  shutdown() {
301
  stop_background_jobs
302
+ backup_usage force
303
  if [ -n "$GATEWAY_PID" ] && kill -0 "$GATEWAY_PID" 2>/dev/null; then
304
  kill "$GATEWAY_PID" 2>/dev/null || true
305
  wait "$GATEWAY_PID" 2>/dev/null || true
 
321
  status="$?"
322
  stop_background_jobs
323
  refresh_model_prices
324
+ backup_usage force
325
  exit "$status"
hf_snapshot.py CHANGED
@@ -1,14 +1,22 @@
1
  #!/usr/bin/env python3
2
 
 
3
  import os
4
  import shutil
5
  import sys
6
- from pathlib import Path
 
 
7
 
8
- from huggingface_hub import HfApi, hf_hub_download
9
  from huggingface_hub.errors import EntryNotFoundError, RepositoryNotFoundError
10
 
11
 
 
 
 
 
 
12
  def require_env(name: str) -> str:
13
  value = os.environ.get(name, "").strip()
14
  if not value:
@@ -16,6 +24,204 @@ def require_env(name: str) -> str:
16
  return value
17
 
18
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
19
  def main() -> int:
20
  if len(sys.argv) != 2 or sys.argv[1] not in {"download", "upload"}:
21
  raise SystemExit("usage: hf_snapshot.py [download|upload]")
@@ -27,14 +233,10 @@ def main() -> int:
27
  local_path = Path(require_env("USAGE_SNAPSHOT_PATH"))
28
 
29
  if action == "download":
30
- try:
31
- downloaded = hf_hub_download(
32
- repo_id=repo_id,
33
- repo_type="dataset",
34
- filename=path_in_repo,
35
- token=token,
36
- )
37
- except (EntryNotFoundError, RepositoryNotFoundError):
38
  return 0
39
  local_path.parent.mkdir(parents=True, exist_ok=True)
40
  shutil.copyfile(downloaded, local_path)
@@ -43,14 +245,7 @@ def main() -> int:
43
  if not local_path.exists() or local_path.stat().st_size == 0:
44
  return 0
45
 
46
- api = HfApi(token=token)
47
- api.upload_file(
48
- path_or_fileobj=str(local_path),
49
- path_in_repo=path_in_repo,
50
- repo_id=repo_id,
51
- repo_type="dataset",
52
- commit_message="Update daili usage snapshot",
53
- )
54
  return 0
55
 
56
 
 
1
  #!/usr/bin/env python3
2
 
3
+ import json
4
  import os
5
  import shutil
6
  import sys
7
+ from datetime import UTC, datetime
8
+ from pathlib import Path, PurePosixPath
9
+ from typing import Any
10
 
11
+ from huggingface_hub import CommitOperationAdd, CommitOperationDelete, HfApi, hf_hub_download
12
  from huggingface_hub.errors import EntryNotFoundError, RepositoryNotFoundError
13
 
14
 
15
+ ROTATION_MANIFEST_TYPE = "daili_usage_rotation_v1"
16
+ DEFAULT_ROTATE_INTERVAL_SECONDS = 3600
17
+ DEFAULT_ROTATE_KEEP = 24
18
+
19
+
20
  def require_env(name: str) -> str:
21
  value = os.environ.get(name, "").strip()
22
  if not value:
 
24
  return value
25
 
26
 
27
+ def env_int(name: str, default: int) -> int:
28
+ raw = os.environ.get(name, "").strip()
29
+ if not raw:
30
+ return default
31
+ try:
32
+ return max(0, int(raw))
33
+ except ValueError:
34
+ return default
35
+
36
+
37
+ def env_bool(name: str) -> bool:
38
+ value = os.environ.get(name, "").strip().lower()
39
+ return value in {"1", "true", "yes", "on"}
40
+
41
+
42
+ def isoformat_utc(value: datetime) -> str:
43
+ return value.astimezone(UTC).strftime("%Y-%m-%dT%H:%M:%SZ")
44
+
45
+
46
+ def manifest_path_for(path_in_repo: str) -> str:
47
+ path = PurePosixPath(path_in_repo)
48
+ suffix = path.suffix or ".json"
49
+ filename = f"{path.stem}.manifest{suffix}"
50
+ if str(path.parent) == ".":
51
+ return filename
52
+ return str(path.parent / filename)
53
+
54
+
55
+ def history_dir_for(path_in_repo: str) -> str:
56
+ path = PurePosixPath(path_in_repo)
57
+ directory = f"{path.stem}.history"
58
+ if str(path.parent) == ".":
59
+ return directory
60
+ return str(path.parent / directory)
61
+
62
+
63
+ def history_path_for(path_in_repo: str, bucket_time: datetime) -> str:
64
+ base = PurePosixPath(path_in_repo)
65
+ suffix = base.suffix or ".json"
66
+ filename = bucket_time.strftime("%Y%m%dT%H%M%SZ") + suffix
67
+ return f"{history_dir_for(path_in_repo)}/{filename}"
68
+
69
+
70
+ def load_json_file(path: Path) -> dict[str, Any]:
71
+ return json.loads(path.read_text(encoding="utf-8"))
72
+
73
+
74
+ def parse_snapshot_time(payload: dict[str, Any]) -> datetime:
75
+ exported_at = payload.get("exported_at")
76
+ if isinstance(exported_at, str):
77
+ normalized = exported_at.replace("Z", "+00:00")
78
+ try:
79
+ return datetime.fromisoformat(normalized).astimezone(UTC)
80
+ except ValueError:
81
+ pass
82
+ return datetime.now(UTC)
83
+
84
+
85
+ def floor_time(value: datetime, interval_seconds: int) -> datetime:
86
+ if interval_seconds <= 0:
87
+ return value.astimezone(UTC)
88
+ timestamp = int(value.astimezone(UTC).timestamp())
89
+ bucket = timestamp - (timestamp % interval_seconds)
90
+ return datetime.fromtimestamp(bucket, UTC)
91
+
92
+
93
+ def is_rotation_manifest(payload: Any) -> bool:
94
+ return isinstance(payload, dict) and payload.get("type") == ROTATION_MANIFEST_TYPE
95
+
96
+
97
+ def download_repo_file(token: str, repo_id: str, path_in_repo: str) -> Path | None:
98
+ try:
99
+ downloaded = hf_hub_download(
100
+ repo_id=repo_id,
101
+ repo_type="dataset",
102
+ filename=path_in_repo,
103
+ token=token,
104
+ )
105
+ except (EntryNotFoundError, RepositoryNotFoundError):
106
+ return None
107
+ return Path(downloaded)
108
+
109
+
110
+ def download_rotation_snapshot(token: str, repo_id: str, manifest_path: str) -> Path | None:
111
+ manifest_file = download_repo_file(token, repo_id, manifest_path)
112
+ if manifest_file is None:
113
+ return None
114
+
115
+ try:
116
+ manifest = load_json_file(manifest_file)
117
+ except (OSError, json.JSONDecodeError):
118
+ return None
119
+
120
+ if not is_rotation_manifest(manifest):
121
+ return None
122
+
123
+ latest_path = manifest.get("latest_path")
124
+ if not isinstance(latest_path, str) or not latest_path:
125
+ return None
126
+
127
+ return download_repo_file(token, repo_id, latest_path)
128
+
129
+
130
+ def load_manifest(token: str, repo_id: str, manifest_path: str) -> dict[str, Any] | None:
131
+ manifest_file = download_repo_file(token, repo_id, manifest_path)
132
+ if manifest_file is None:
133
+ return None
134
+ try:
135
+ payload = load_json_file(manifest_file)
136
+ except (OSError, json.JSONDecodeError):
137
+ return None
138
+ if not is_rotation_manifest(payload):
139
+ return None
140
+ return payload
141
+
142
+
143
+ def upload_legacy_snapshot(
144
+ api: HfApi,
145
+ repo_id: str,
146
+ path_in_repo: str,
147
+ local_path: Path,
148
+ ) -> None:
149
+ api.upload_file(
150
+ path_or_fileobj=str(local_path),
151
+ path_in_repo=path_in_repo,
152
+ repo_id=repo_id,
153
+ repo_type="dataset",
154
+ commit_message="Update daili usage snapshot",
155
+ )
156
+
157
+
158
+ def upload_rotated_snapshot(
159
+ token: str,
160
+ repo_id: str,
161
+ path_in_repo: str,
162
+ local_path: Path,
163
+ ) -> None:
164
+ rotate_interval = env_int("USAGE_HF_ROTATE_INTERVAL", DEFAULT_ROTATE_INTERVAL_SECONDS)
165
+ rotate_keep = env_int("USAGE_HF_ROTATE_KEEP", DEFAULT_ROTATE_KEEP)
166
+ force_upload = env_bool("USAGE_HF_FORCE_UPLOAD")
167
+
168
+ if rotate_interval <= 0 or rotate_keep <= 0:
169
+ upload_legacy_snapshot(HfApi(token=token), repo_id, path_in_repo, local_path)
170
+ return
171
+
172
+ snapshot = load_json_file(local_path)
173
+ snapshot_time = parse_snapshot_time(snapshot)
174
+ bucket_time = snapshot_time if force_upload else floor_time(snapshot_time, rotate_interval)
175
+ rotated_path = history_path_for(path_in_repo, bucket_time)
176
+ manifest_path = manifest_path_for(path_in_repo)
177
+ manifest = load_manifest(token, repo_id, manifest_path) or {}
178
+
179
+ if not force_upload and manifest.get("latest_path") == rotated_path:
180
+ return
181
+
182
+ previous_paths = [
183
+ item
184
+ for item in manifest.get("retained_paths", [])
185
+ if isinstance(item, str) and item
186
+ ]
187
+ retained_paths = [rotated_path, *[item for item in previous_paths if item != rotated_path]]
188
+ retained_paths = retained_paths[:rotate_keep]
189
+ delete_paths = [item for item in previous_paths if item not in retained_paths]
190
+
191
+ next_manifest = {
192
+ "type": ROTATION_MANIFEST_TYPE,
193
+ "version": 1,
194
+ "latest_path": rotated_path,
195
+ "updated_at": isoformat_utc(datetime.now(UTC)),
196
+ "latest_exported_at": snapshot.get("exported_at"),
197
+ "rotation_interval_seconds": rotate_interval,
198
+ "retention_count": rotate_keep,
199
+ "retained_paths": retained_paths,
200
+ }
201
+
202
+ operations = [
203
+ CommitOperationAdd(path_in_repo=rotated_path, path_or_fileobj=str(local_path)),
204
+ CommitOperationAdd(
205
+ path_in_repo=manifest_path,
206
+ path_or_fileobj=json.dumps(
207
+ next_manifest,
208
+ ensure_ascii=False,
209
+ indent=2,
210
+ sort_keys=True,
211
+ ).encode("utf-8"),
212
+ ),
213
+ ]
214
+ operations.extend(CommitOperationDelete(path_in_repo=item) for item in delete_paths)
215
+
216
+ api = HfApi(token=token)
217
+ api.create_commit(
218
+ repo_id=repo_id,
219
+ repo_type="dataset",
220
+ operations=operations,
221
+ commit_message=f"Rotate daili usage snapshot to {PurePosixPath(rotated_path).name}",
222
+ )
223
+
224
+
225
  def main() -> int:
226
  if len(sys.argv) != 2 or sys.argv[1] not in {"download", "upload"}:
227
  raise SystemExit("usage: hf_snapshot.py [download|upload]")
 
233
  local_path = Path(require_env("USAGE_SNAPSHOT_PATH"))
234
 
235
  if action == "download":
236
+ downloaded = download_rotation_snapshot(token, repo_id, manifest_path_for(path_in_repo))
237
+ if downloaded is None:
238
+ downloaded = download_repo_file(token, repo_id, path_in_repo)
239
+ if downloaded is None:
 
 
 
 
240
  return 0
241
  local_path.parent.mkdir(parents=True, exist_ok=True)
242
  shutil.copyfile(downloaded, local_path)
 
245
  if not local_path.exists() or local_path.stat().st_size == 0:
246
  return 0
247
 
248
+ upload_rotated_snapshot(token, repo_id, path_in_repo, local_path)
 
 
 
 
 
 
 
249
  return 0
250
 
251