daili / hf_snapshot.py
pjpjq's picture
将 Hugging Face usage 快照改为轮转保存
abb751f
#!/usr/bin/env python3
import json
import os
import shutil
import sys
from datetime import UTC, datetime
from pathlib import Path, PurePosixPath
from typing import Any
from huggingface_hub import CommitOperationAdd, CommitOperationDelete, HfApi, hf_hub_download
from huggingface_hub.errors import EntryNotFoundError, RepositoryNotFoundError
ROTATION_MANIFEST_TYPE = "daili_usage_rotation_v1"
DEFAULT_ROTATE_INTERVAL_SECONDS = 3600
DEFAULT_ROTATE_KEEP = 24
def require_env(name: str) -> str:
value = os.environ.get(name, "").strip()
if not value:
raise SystemExit(f"missing environment variable: {name}")
return value
def env_int(name: str, default: int) -> int:
raw = os.environ.get(name, "").strip()
if not raw:
return default
try:
return max(0, int(raw))
except ValueError:
return default
def env_bool(name: str) -> bool:
value = os.environ.get(name, "").strip().lower()
return value in {"1", "true", "yes", "on"}
def isoformat_utc(value: datetime) -> str:
return value.astimezone(UTC).strftime("%Y-%m-%dT%H:%M:%SZ")
def manifest_path_for(path_in_repo: str) -> str:
path = PurePosixPath(path_in_repo)
suffix = path.suffix or ".json"
filename = f"{path.stem}.manifest{suffix}"
if str(path.parent) == ".":
return filename
return str(path.parent / filename)
def history_dir_for(path_in_repo: str) -> str:
path = PurePosixPath(path_in_repo)
directory = f"{path.stem}.history"
if str(path.parent) == ".":
return directory
return str(path.parent / directory)
def history_path_for(path_in_repo: str, bucket_time: datetime) -> str:
base = PurePosixPath(path_in_repo)
suffix = base.suffix or ".json"
filename = bucket_time.strftime("%Y%m%dT%H%M%SZ") + suffix
return f"{history_dir_for(path_in_repo)}/{filename}"
def load_json_file(path: Path) -> dict[str, Any]:
return json.loads(path.read_text(encoding="utf-8"))
def parse_snapshot_time(payload: dict[str, Any]) -> datetime:
exported_at = payload.get("exported_at")
if isinstance(exported_at, str):
normalized = exported_at.replace("Z", "+00:00")
try:
return datetime.fromisoformat(normalized).astimezone(UTC)
except ValueError:
pass
return datetime.now(UTC)
def floor_time(value: datetime, interval_seconds: int) -> datetime:
if interval_seconds <= 0:
return value.astimezone(UTC)
timestamp = int(value.astimezone(UTC).timestamp())
bucket = timestamp - (timestamp % interval_seconds)
return datetime.fromtimestamp(bucket, UTC)
def is_rotation_manifest(payload: Any) -> bool:
return isinstance(payload, dict) and payload.get("type") == ROTATION_MANIFEST_TYPE
def download_repo_file(token: str, repo_id: str, path_in_repo: str) -> Path | None:
try:
downloaded = hf_hub_download(
repo_id=repo_id,
repo_type="dataset",
filename=path_in_repo,
token=token,
)
except (EntryNotFoundError, RepositoryNotFoundError):
return None
return Path(downloaded)
def download_rotation_snapshot(token: str, repo_id: str, manifest_path: str) -> Path | None:
manifest_file = download_repo_file(token, repo_id, manifest_path)
if manifest_file is None:
return None
try:
manifest = load_json_file(manifest_file)
except (OSError, json.JSONDecodeError):
return None
if not is_rotation_manifest(manifest):
return None
latest_path = manifest.get("latest_path")
if not isinstance(latest_path, str) or not latest_path:
return None
return download_repo_file(token, repo_id, latest_path)
def load_manifest(token: str, repo_id: str, manifest_path: str) -> dict[str, Any] | None:
manifest_file = download_repo_file(token, repo_id, manifest_path)
if manifest_file is None:
return None
try:
payload = load_json_file(manifest_file)
except (OSError, json.JSONDecodeError):
return None
if not is_rotation_manifest(payload):
return None
return payload
def upload_legacy_snapshot(
api: HfApi,
repo_id: str,
path_in_repo: str,
local_path: Path,
) -> None:
api.upload_file(
path_or_fileobj=str(local_path),
path_in_repo=path_in_repo,
repo_id=repo_id,
repo_type="dataset",
commit_message="Update daili usage snapshot",
)
def upload_rotated_snapshot(
token: str,
repo_id: str,
path_in_repo: str,
local_path: Path,
) -> None:
rotate_interval = env_int("USAGE_HF_ROTATE_INTERVAL", DEFAULT_ROTATE_INTERVAL_SECONDS)
rotate_keep = env_int("USAGE_HF_ROTATE_KEEP", DEFAULT_ROTATE_KEEP)
force_upload = env_bool("USAGE_HF_FORCE_UPLOAD")
if rotate_interval <= 0 or rotate_keep <= 0:
upload_legacy_snapshot(HfApi(token=token), repo_id, path_in_repo, local_path)
return
snapshot = load_json_file(local_path)
snapshot_time = parse_snapshot_time(snapshot)
bucket_time = snapshot_time if force_upload else floor_time(snapshot_time, rotate_interval)
rotated_path = history_path_for(path_in_repo, bucket_time)
manifest_path = manifest_path_for(path_in_repo)
manifest = load_manifest(token, repo_id, manifest_path) or {}
if not force_upload and manifest.get("latest_path") == rotated_path:
return
previous_paths = [
item
for item in manifest.get("retained_paths", [])
if isinstance(item, str) and item
]
retained_paths = [rotated_path, *[item for item in previous_paths if item != rotated_path]]
retained_paths = retained_paths[:rotate_keep]
delete_paths = [item for item in previous_paths if item not in retained_paths]
next_manifest = {
"type": ROTATION_MANIFEST_TYPE,
"version": 1,
"latest_path": rotated_path,
"updated_at": isoformat_utc(datetime.now(UTC)),
"latest_exported_at": snapshot.get("exported_at"),
"rotation_interval_seconds": rotate_interval,
"retention_count": rotate_keep,
"retained_paths": retained_paths,
}
operations = [
CommitOperationAdd(path_in_repo=rotated_path, path_or_fileobj=str(local_path)),
CommitOperationAdd(
path_in_repo=manifest_path,
path_or_fileobj=json.dumps(
next_manifest,
ensure_ascii=False,
indent=2,
sort_keys=True,
).encode("utf-8"),
),
]
operations.extend(CommitOperationDelete(path_in_repo=item) for item in delete_paths)
api = HfApi(token=token)
api.create_commit(
repo_id=repo_id,
repo_type="dataset",
operations=operations,
commit_message=f"Rotate daili usage snapshot to {PurePosixPath(rotated_path).name}",
)
def main() -> int:
if len(sys.argv) != 2 or sys.argv[1] not in {"download", "upload"}:
raise SystemExit("usage: hf_snapshot.py [download|upload]")
action = sys.argv[1]
token = require_env("USAGE_HF_TOKEN")
repo_id = require_env("USAGE_HF_REPO_ID")
path_in_repo = require_env("USAGE_HF_PATH")
local_path = Path(require_env("USAGE_SNAPSHOT_PATH"))
if action == "download":
downloaded = download_rotation_snapshot(token, repo_id, manifest_path_for(path_in_repo))
if downloaded is None:
downloaded = download_repo_file(token, repo_id, path_in_repo)
if downloaded is None:
return 0
local_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copyfile(downloaded, local_path)
return 0
if not local_path.exists() or local_path.stat().st_size == 0:
return 0
upload_rotated_snapshot(token, repo_id, path_in_repo, local_path)
return 0
if __name__ == "__main__":
raise SystemExit(main())