| |
|
|
| import json |
| import os |
| import shutil |
| import sys |
| from datetime import UTC, datetime |
| from pathlib import Path, PurePosixPath |
| from typing import Any |
|
|
| from huggingface_hub import CommitOperationAdd, CommitOperationDelete, HfApi, hf_hub_download |
| from huggingface_hub.errors import EntryNotFoundError, RepositoryNotFoundError |
|
|
|
|
| ROTATION_MANIFEST_TYPE = "daili_usage_rotation_v1" |
| DEFAULT_ROTATE_INTERVAL_SECONDS = 3600 |
| DEFAULT_ROTATE_KEEP = 24 |
|
|
|
|
| def require_env(name: str) -> str: |
| value = os.environ.get(name, "").strip() |
| if not value: |
| raise SystemExit(f"missing environment variable: {name}") |
| return value |
|
|
|
|
| def env_int(name: str, default: int) -> int: |
| raw = os.environ.get(name, "").strip() |
| if not raw: |
| return default |
| try: |
| return max(0, int(raw)) |
| except ValueError: |
| return default |
|
|
|
|
| def env_bool(name: str) -> bool: |
| value = os.environ.get(name, "").strip().lower() |
| return value in {"1", "true", "yes", "on"} |
|
|
|
|
| def isoformat_utc(value: datetime) -> str: |
| return value.astimezone(UTC).strftime("%Y-%m-%dT%H:%M:%SZ") |
|
|
|
|
| def manifest_path_for(path_in_repo: str) -> str: |
| path = PurePosixPath(path_in_repo) |
| suffix = path.suffix or ".json" |
| filename = f"{path.stem}.manifest{suffix}" |
| if str(path.parent) == ".": |
| return filename |
| return str(path.parent / filename) |
|
|
|
|
| def history_dir_for(path_in_repo: str) -> str: |
| path = PurePosixPath(path_in_repo) |
| directory = f"{path.stem}.history" |
| if str(path.parent) == ".": |
| return directory |
| return str(path.parent / directory) |
|
|
|
|
| def history_path_for(path_in_repo: str, bucket_time: datetime) -> str: |
| base = PurePosixPath(path_in_repo) |
| suffix = base.suffix or ".json" |
| filename = bucket_time.strftime("%Y%m%dT%H%M%SZ") + suffix |
| return f"{history_dir_for(path_in_repo)}/{filename}" |
|
|
|
|
| def load_json_file(path: Path) -> dict[str, Any]: |
| return json.loads(path.read_text(encoding="utf-8")) |
|
|
|
|
| def parse_snapshot_time(payload: dict[str, Any]) -> datetime: |
| exported_at = payload.get("exported_at") |
| if isinstance(exported_at, str): |
| normalized = exported_at.replace("Z", "+00:00") |
| try: |
| return datetime.fromisoformat(normalized).astimezone(UTC) |
| except ValueError: |
| pass |
| return datetime.now(UTC) |
|
|
|
|
| def floor_time(value: datetime, interval_seconds: int) -> datetime: |
| if interval_seconds <= 0: |
| return value.astimezone(UTC) |
| timestamp = int(value.astimezone(UTC).timestamp()) |
| bucket = timestamp - (timestamp % interval_seconds) |
| return datetime.fromtimestamp(bucket, UTC) |
|
|
|
|
| def is_rotation_manifest(payload: Any) -> bool: |
| return isinstance(payload, dict) and payload.get("type") == ROTATION_MANIFEST_TYPE |
|
|
|
|
| def download_repo_file(token: str, repo_id: str, path_in_repo: str) -> Path | None: |
| try: |
| downloaded = hf_hub_download( |
| repo_id=repo_id, |
| repo_type="dataset", |
| filename=path_in_repo, |
| token=token, |
| ) |
| except (EntryNotFoundError, RepositoryNotFoundError): |
| return None |
| return Path(downloaded) |
|
|
|
|
| def download_rotation_snapshot(token: str, repo_id: str, manifest_path: str) -> Path | None: |
| manifest_file = download_repo_file(token, repo_id, manifest_path) |
| if manifest_file is None: |
| return None |
|
|
| try: |
| manifest = load_json_file(manifest_file) |
| except (OSError, json.JSONDecodeError): |
| return None |
|
|
| if not is_rotation_manifest(manifest): |
| return None |
|
|
| latest_path = manifest.get("latest_path") |
| if not isinstance(latest_path, str) or not latest_path: |
| return None |
|
|
| return download_repo_file(token, repo_id, latest_path) |
|
|
|
|
| def load_manifest(token: str, repo_id: str, manifest_path: str) -> dict[str, Any] | None: |
| manifest_file = download_repo_file(token, repo_id, manifest_path) |
| if manifest_file is None: |
| return None |
| try: |
| payload = load_json_file(manifest_file) |
| except (OSError, json.JSONDecodeError): |
| return None |
| if not is_rotation_manifest(payload): |
| return None |
| return payload |
|
|
|
|
| def upload_legacy_snapshot( |
| api: HfApi, |
| repo_id: str, |
| path_in_repo: str, |
| local_path: Path, |
| ) -> None: |
| api.upload_file( |
| path_or_fileobj=str(local_path), |
| path_in_repo=path_in_repo, |
| repo_id=repo_id, |
| repo_type="dataset", |
| commit_message="Update daili usage snapshot", |
| ) |
|
|
|
|
| def upload_rotated_snapshot( |
| token: str, |
| repo_id: str, |
| path_in_repo: str, |
| local_path: Path, |
| ) -> None: |
| rotate_interval = env_int("USAGE_HF_ROTATE_INTERVAL", DEFAULT_ROTATE_INTERVAL_SECONDS) |
| rotate_keep = env_int("USAGE_HF_ROTATE_KEEP", DEFAULT_ROTATE_KEEP) |
| force_upload = env_bool("USAGE_HF_FORCE_UPLOAD") |
|
|
| if rotate_interval <= 0 or rotate_keep <= 0: |
| upload_legacy_snapshot(HfApi(token=token), repo_id, path_in_repo, local_path) |
| return |
|
|
| snapshot = load_json_file(local_path) |
| snapshot_time = parse_snapshot_time(snapshot) |
| bucket_time = snapshot_time if force_upload else floor_time(snapshot_time, rotate_interval) |
| rotated_path = history_path_for(path_in_repo, bucket_time) |
| manifest_path = manifest_path_for(path_in_repo) |
| manifest = load_manifest(token, repo_id, manifest_path) or {} |
|
|
| if not force_upload and manifest.get("latest_path") == rotated_path: |
| return |
|
|
| previous_paths = [ |
| item |
| for item in manifest.get("retained_paths", []) |
| if isinstance(item, str) and item |
| ] |
| retained_paths = [rotated_path, *[item for item in previous_paths if item != rotated_path]] |
| retained_paths = retained_paths[:rotate_keep] |
| delete_paths = [item for item in previous_paths if item not in retained_paths] |
|
|
| next_manifest = { |
| "type": ROTATION_MANIFEST_TYPE, |
| "version": 1, |
| "latest_path": rotated_path, |
| "updated_at": isoformat_utc(datetime.now(UTC)), |
| "latest_exported_at": snapshot.get("exported_at"), |
| "rotation_interval_seconds": rotate_interval, |
| "retention_count": rotate_keep, |
| "retained_paths": retained_paths, |
| } |
|
|
| operations = [ |
| CommitOperationAdd(path_in_repo=rotated_path, path_or_fileobj=str(local_path)), |
| CommitOperationAdd( |
| path_in_repo=manifest_path, |
| path_or_fileobj=json.dumps( |
| next_manifest, |
| ensure_ascii=False, |
| indent=2, |
| sort_keys=True, |
| ).encode("utf-8"), |
| ), |
| ] |
| operations.extend(CommitOperationDelete(path_in_repo=item) for item in delete_paths) |
|
|
| api = HfApi(token=token) |
| api.create_commit( |
| repo_id=repo_id, |
| repo_type="dataset", |
| operations=operations, |
| commit_message=f"Rotate daili usage snapshot to {PurePosixPath(rotated_path).name}", |
| ) |
|
|
|
|
| def main() -> int: |
| if len(sys.argv) != 2 or sys.argv[1] not in {"download", "upload"}: |
| raise SystemExit("usage: hf_snapshot.py [download|upload]") |
|
|
| action = sys.argv[1] |
| token = require_env("USAGE_HF_TOKEN") |
| repo_id = require_env("USAGE_HF_REPO_ID") |
| path_in_repo = require_env("USAGE_HF_PATH") |
| local_path = Path(require_env("USAGE_SNAPSHOT_PATH")) |
|
|
| if action == "download": |
| downloaded = download_rotation_snapshot(token, repo_id, manifest_path_for(path_in_repo)) |
| if downloaded is None: |
| downloaded = download_repo_file(token, repo_id, path_in_repo) |
| if downloaded is None: |
| return 0 |
| local_path.parent.mkdir(parents=True, exist_ok=True) |
| shutil.copyfile(downloaded, local_path) |
| return 0 |
|
|
| if not local_path.exists() or local_path.stat().st_size == 0: |
| return 0 |
|
|
| upload_rotated_snapshot(token, repo_id, path_in_repo, local_path) |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|