| import os |
| import sys |
| import tarfile |
| import tempfile |
| from pathlib import Path |
| from huggingface_hub import HfApi, hf_hub_download |
| from huggingface_hub.utils import EntryNotFoundError, RepositoryNotFoundError, HfHubHTTPError |
|
|
| api = HfApi() |
|
|
| REPO_ID = os.getenv("HF_DATASET") |
| TOKEN = os.getenv("HF_TOKEN") |
| FILENAME = "latest_backup.tar.gz" |
| BASE_DIR = Path("/root/.openclaw") |
| PATHS_TO_BACKUP = [ |
| BASE_DIR / "sessions", |
| BASE_DIR / "agents" / "main" / "sessions", |
| BASE_DIR / "openclaw.json", |
| ] |
|
|
|
|
| def log(msg: str) -> None: |
| print(msg, flush=True) |
|
|
|
|
| def log_err(msg: str) -> None: |
| print(msg, file=sys.stderr, flush=True) |
|
|
|
|
| def is_subpath(child: Path, parent: Path) -> bool: |
| try: |
| child.resolve().relative_to(parent.resolve()) |
| return True |
| except ValueError: |
| return False |
|
|
|
|
| def safe_extract(tar: tarfile.TarFile, target_dir: Path) -> None: |
| target_dir = target_dir.resolve() |
|
|
| for member in tar.getmembers(): |
| member_path = target_dir / member.name |
| if not is_subpath(member_path, target_dir): |
| raise RuntimeError(f"Unsafe path detected in archive: {member.name}") |
|
|
| tar.extractall(path=target_dir) |
|
|
|
|
| def restore() -> bool: |
| if not REPO_ID or not TOKEN: |
| log("[RESTORE] Skip: HF_DATASET or HF_TOKEN not set") |
| return False |
|
|
| try: |
| log(f"[RESTORE] Downloading {FILENAME} from dataset repo: {REPO_ID}") |
| path = hf_hub_download( |
| repo_id=REPO_ID, |
| filename=FILENAME, |
| repo_type="dataset", |
| token=TOKEN, |
| ) |
|
|
| BASE_DIR.mkdir(parents=True, exist_ok=True) |
|
|
| with tarfile.open(path, "r:gz") as tar: |
| safe_extract(tar, BASE_DIR) |
|
|
| log(f"[RESTORE] Success: restored from {FILENAME}") |
| return True |
|
|
| except EntryNotFoundError: |
| log(f"[RESTORE] Note: {FILENAME} not found in repo, probably first run") |
| return False |
| except RepositoryNotFoundError: |
| log_err(f"[RESTORE] Error: dataset repo not found: {REPO_ID}") |
| return False |
| except HfHubHTTPError as e: |
| log_err(f"[RESTORE] Hub HTTP error: {e}") |
| return False |
| except tarfile.TarError as e: |
| log_err(f"[RESTORE] Invalid tar archive: {e}") |
| return False |
| except Exception as e: |
| log_err(f"[RESTORE] Unexpected error: {e}") |
| return False |
|
|
|
|
| def backup() -> bool: |
| if not REPO_ID or not TOKEN: |
| log("[BACKUP] Skip: HF_DATASET or HF_TOKEN not set") |
| return False |
|
|
| existing_paths = [p for p in PATHS_TO_BACKUP if p.exists()] |
| if not existing_paths: |
| log("[BACKUP] Skip: no paths to backup") |
| return False |
|
|
| temp_path = None |
|
|
| try: |
| with tempfile.NamedTemporaryFile(suffix=".tar.gz", delete=False) as tmp: |
| temp_path = Path(tmp.name) |
|
|
| with tarfile.open(temp_path, "w:gz") as tar: |
| for p in existing_paths: |
| if p.is_relative_to(BASE_DIR): |
| arcname = p.relative_to(BASE_DIR) |
| else: |
| arcname = p.name |
| tar.add(str(p), arcname=str(arcname)) |
| log(f"[BACKUP] Added: {p} -> {arcname}") |
|
|
| api.upload_file( |
| path_or_fileobj=str(temp_path), |
| path_in_repo=FILENAME, |
| repo_id=REPO_ID, |
| repo_type="dataset", |
| token=TOKEN, |
| commit_message=f"Update {FILENAME}", |
| ) |
|
|
| log(f"[BACKUP] Success: uploaded {FILENAME}") |
| return True |
|
|
| except RepositoryNotFoundError: |
| log_err(f"[BACKUP] Error: dataset repo not found: {REPO_ID}") |
| return False |
| except HfHubHTTPError as e: |
| log_err(f"[BACKUP] Hub HTTP error: {e}") |
| return False |
| except tarfile.TarError as e: |
| log_err(f"[BACKUP] Tar error: {e}") |
| return False |
| except Exception as e: |
| log_err(f"[BACKUP] Unexpected error: {e}") |
| return False |
| finally: |
| if temp_path and temp_path.exists(): |
| try: |
| temp_path.unlink() |
| except Exception as e: |
| log_err(f"[BACKUP] Warning: failed to delete temp file {temp_path}: {e}") |
|
|
|
|
| if __name__ == "__main__": |
| action = sys.argv[1].strip().lower() if len(sys.argv) > 1 else "restore" |
|
|
| if action == "backup": |
| ok = backup() |
| sys.exit(0 if ok else 1) |
| else: |
| ok = restore() |
| sys.exit(0 if ok else 1) |