Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| HuggingClaw workspace/state backup via huggingface_hub. | |
| This keeps OpenClaw workspace data, app state, and optional WhatsApp | |
| credentials inside a private HF dataset without embedding HF tokens in git | |
| remotes or requiring a manual HF_USERNAME secret. | |
| """ | |
| import hashlib | |
| import json | |
| import logging | |
| import os | |
| import shutil | |
| import signal | |
| import sys | |
| import tempfile | |
| import threading | |
| import time | |
| from pathlib import Path | |
| os.environ.setdefault("HF_HUB_DISABLE_PROGRESS_BARS", "1") | |
| # huggingface_hub reads HF_HUB_VERBOSITY at import time and overrides any | |
| # logging.getLogger().setLevel() we apply afterwards. Set it before import | |
| # to silence the "No files have been modified..." spam from | |
| # upload_large_folder workers (logger.warning level). | |
| os.environ.setdefault("HF_HUB_VERBOSITY", "error") | |
| from huggingface_hub import HfApi, snapshot_download, upload_folder | |
| from huggingface_hub.errors import HfHubHTTPError, RepositoryNotFoundError | |
| # Belt-and-suspenders: also raise the level after import in case the env var | |
| # wasn't honored (older hub versions, or message logged via a sub-logger). | |
| logging.getLogger("huggingface_hub").setLevel(logging.ERROR) | |
| OPENCLAW_HOME = Path("/home/node/.openclaw") | |
| WORKSPACE = OPENCLAW_HOME / "workspace" | |
| STATUS_FILE = Path("/tmp/sync-status.json") | |
| INTERVAL = int(os.environ.get("SYNC_INTERVAL", "180")) | |
| INITIAL_DELAY = int(os.environ.get("SYNC_START_DELAY", "10")) | |
| HF_TOKEN = os.environ.get("HF_TOKEN", "").strip() | |
| HF_USERNAME = os.environ.get("HF_USERNAME", "").strip() | |
| SPACE_AUTHOR_NAME = os.environ.get("SPACE_AUTHOR_NAME", "").strip() | |
| BACKUP_DATASET_NAME = os.environ.get("BACKUP_DATASET_NAME", "huggingclaw-backup").strip() | |
| WHATSAPP_ENABLED = os.environ.get("WHATSAPP_ENABLED", "").strip().lower() == "true" | |
| EXCLUDED_SYNC_DIRS = { | |
| "node_modules", ".git", "__pycache__", ".venv", "venv", | |
| ".npm", ".cache", ".yarn", "dist", "build", ".next", ".nuxt", | |
| ".turbo", ".parcel-cache", "target", ".gradle", ".mvn", | |
| } | |
| MAX_FILE_SIZE_BYTES = int(os.environ.get("SYNC_MAX_FILE_BYTES", str(50 * 1024 * 1024))) | |
| STATE_DIR = WORKSPACE / ".huggingclaw-state" | |
| OPENCLAW_STATE_BACKUP_DIR = STATE_DIR / "openclaw" | |
| EXCLUDED_STATE_NAMES = { | |
| "workspace", | |
| "openclaw-app", | |
| "gateway.log", | |
| "browser", | |
| } | |
| WHATSAPP_CREDS_DIR = OPENCLAW_HOME / "credentials" / "whatsapp" / "default" | |
| WHATSAPP_BACKUP_DIR = STATE_DIR / "credentials" / "whatsapp" / "default" | |
| RESET_MARKER = WORKSPACE / ".reset_credentials" | |
| HF_API = HfApi(token=HF_TOKEN) if HF_TOKEN else None | |
| STOP_EVENT = threading.Event() | |
| _REPO_ID_CACHE: str | None = None | |
| def write_status(status: str, message: str) -> None: | |
| payload = { | |
| "status": status, | |
| "message": message, | |
| "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), | |
| } | |
| tmp_path = STATUS_FILE.with_suffix(".tmp") | |
| tmp_path.write_text(json.dumps(payload), encoding="utf-8") | |
| tmp_path.replace(STATUS_FILE) | |
| def count_files(path: Path) -> int: | |
| if not path.exists(): | |
| return 0 | |
| return sum(1 for child in path.rglob("*") if child.is_file()) | |
| def snapshot_state_into_workspace() -> None: | |
| try: | |
| STATE_DIR.mkdir(parents=True, exist_ok=True) | |
| if OPENCLAW_STATE_BACKUP_DIR.exists(): | |
| shutil.rmtree(OPENCLAW_STATE_BACKUP_DIR, ignore_errors=True) | |
| OPENCLAW_STATE_BACKUP_DIR.mkdir(parents=True, exist_ok=True) | |
| for source_path in OPENCLAW_HOME.iterdir(): | |
| if source_path.name in EXCLUDED_STATE_NAMES: | |
| continue | |
| backup_path = OPENCLAW_STATE_BACKUP_DIR / source_path.name | |
| if source_path.is_dir(): | |
| shutil.copytree(source_path, backup_path) | |
| elif source_path.is_file(): | |
| shutil.copy2(source_path, backup_path) | |
| except Exception as exc: | |
| print(f"Warning: could not snapshot OpenClaw state: {exc}") | |
| try: | |
| if not WHATSAPP_ENABLED: | |
| return | |
| STATE_DIR.mkdir(parents=True, exist_ok=True) | |
| if RESET_MARKER.exists(): | |
| if WHATSAPP_BACKUP_DIR.exists(): | |
| shutil.rmtree(WHATSAPP_BACKUP_DIR, ignore_errors=True) | |
| print("Removed backed-up WhatsApp credentials after reset request.") | |
| RESET_MARKER.unlink(missing_ok=True) | |
| return | |
| if not WHATSAPP_CREDS_DIR.exists(): | |
| return | |
| file_count = count_files(WHATSAPP_CREDS_DIR) | |
| if file_count < 2: | |
| if file_count > 0: | |
| print(f"WhatsApp backup skipped: credentials incomplete ({file_count} files).") | |
| return | |
| WHATSAPP_BACKUP_DIR.parent.mkdir(parents=True, exist_ok=True) | |
| if WHATSAPP_BACKUP_DIR.exists(): | |
| shutil.rmtree(WHATSAPP_BACKUP_DIR, ignore_errors=True) | |
| shutil.copytree(WHATSAPP_CREDS_DIR, WHATSAPP_BACKUP_DIR) | |
| except Exception as exc: | |
| print(f"Warning: could not snapshot WhatsApp state: {exc}") | |
| def restore_embedded_state() -> None: | |
| state_backup_root = STATE_DIR / "openclaw" | |
| if state_backup_root.is_dir(): | |
| for source_path in state_backup_root.iterdir(): | |
| name = source_path.name | |
| if name in EXCLUDED_STATE_NAMES: | |
| if source_path.is_dir(): | |
| shutil.rmtree(source_path, ignore_errors=True) | |
| else: | |
| source_path.unlink(missing_ok=True) | |
| continue | |
| target_path = OPENCLAW_HOME / name | |
| shutil.rmtree(target_path, ignore_errors=True) | |
| if target_path.is_file(): | |
| target_path.unlink(missing_ok=True) | |
| target_path.parent.mkdir(parents=True, exist_ok=True) | |
| if source_path.is_dir(): | |
| shutil.copytree(source_path, target_path) | |
| else: | |
| shutil.copy2(source_path, target_path) | |
| print("OpenClaw state restored.") | |
| if WHATSAPP_ENABLED and WHATSAPP_BACKUP_DIR.is_dir(): | |
| file_count = count_files(WHATSAPP_BACKUP_DIR) | |
| if file_count >= 2: | |
| shutil.rmtree(WHATSAPP_CREDS_DIR, ignore_errors=True) | |
| WHATSAPP_CREDS_DIR.parent.mkdir(parents=True, exist_ok=True) | |
| shutil.copytree(WHATSAPP_BACKUP_DIR, WHATSAPP_CREDS_DIR) | |
| # Lock down dir tree: 0700 on directories, 0600 on every file | |
| # so the WhatsApp session secrets can't be read by other users. | |
| os.chmod(OPENCLAW_HOME / "credentials", 0o700) | |
| for path in WHATSAPP_CREDS_DIR.rglob("*"): | |
| try: | |
| if path.is_dir(): | |
| os.chmod(path, 0o700) | |
| elif path.is_file(): | |
| os.chmod(path, 0o600) | |
| except OSError: | |
| pass | |
| print("WhatsApp credentials restored.") | |
| else: | |
| print(f"Warning: saved WhatsApp credentials incomplete ({file_count} files), skipping restore.") | |
| def resolve_backup_namespace() -> str: | |
| global _REPO_ID_CACHE | |
| if _REPO_ID_CACHE: | |
| return _REPO_ID_CACHE | |
| namespace = HF_USERNAME or SPACE_AUTHOR_NAME | |
| if not namespace and HF_API is not None: | |
| whoami = HF_API.whoami() | |
| namespace = whoami.get("name") or whoami.get("user") or "" | |
| namespace = str(namespace).strip() | |
| if not namespace: | |
| raise RuntimeError( | |
| "Could not determine the Hugging Face username for backups. " | |
| "Set HF_USERNAME or use a token tied to your account." | |
| ) | |
| _REPO_ID_CACHE = f"{namespace}/{BACKUP_DATASET_NAME}" | |
| return _REPO_ID_CACHE | |
| def ensure_repo_exists() -> str: | |
| repo_id = resolve_backup_namespace() | |
| try: | |
| HF_API.repo_info(repo_id=repo_id, repo_type="dataset") | |
| except RepositoryNotFoundError: | |
| HF_API.create_repo(repo_id=repo_id, repo_type="dataset", private=True) | |
| return repo_id | |
| def _should_exclude(rel_posix: str, path: Path) -> bool: | |
| parts = Path(rel_posix).parts | |
| if any(part in EXCLUDED_SYNC_DIRS for part in parts): | |
| return True | |
| if path.is_file(): | |
| try: | |
| if path.stat().st_size > MAX_FILE_SIZE_BYTES: | |
| return True | |
| except OSError: | |
| pass | |
| return False | |
| def metadata_marker(root: Path) -> tuple[int, int, int]: | |
| if not root.exists(): | |
| return (0, 0, 0) | |
| file_count = 0 | |
| total_size = 0 | |
| newest_mtime = 0 | |
| for path in root.rglob("*"): | |
| if not path.is_file(): | |
| continue | |
| rel = path.relative_to(root).as_posix() | |
| if _should_exclude(rel, path): | |
| continue | |
| try: | |
| stat = path.stat() | |
| except OSError: | |
| continue | |
| file_count += 1 | |
| total_size += int(stat.st_size) | |
| newest_mtime = max(newest_mtime, int(stat.st_mtime_ns)) | |
| return (file_count, total_size, newest_mtime) | |
| def fingerprint_dir(root: Path) -> str: | |
| hasher = hashlib.sha256() | |
| if not root.exists(): | |
| return hasher.hexdigest() | |
| for path in sorted(p for p in root.rglob("*") if p.is_file()): | |
| rel = path.relative_to(root).as_posix() | |
| if _should_exclude(rel, path): | |
| continue | |
| hasher.update(rel.encode("utf-8")) | |
| with path.open("rb") as handle: | |
| for chunk in iter(lambda: handle.read(1024 * 1024), b""): | |
| hasher.update(chunk) | |
| return hasher.hexdigest() | |
| def create_snapshot_dir(source_root: Path) -> Path: | |
| staging_root = Path(tempfile.mkdtemp(prefix="huggingclaw-sync-")) | |
| for path in sorted(source_root.rglob("*")): | |
| rel = path.relative_to(source_root) | |
| rel_posix = rel.as_posix() | |
| if _should_exclude(rel_posix, path): | |
| continue | |
| target = staging_root / rel | |
| if path.is_dir(): | |
| target.mkdir(parents=True, exist_ok=True) | |
| continue | |
| target.parent.mkdir(parents=True, exist_ok=True) | |
| shutil.copy2(path, target) | |
| return staging_root | |
| def restore_workspace() -> bool: | |
| if not HF_TOKEN: | |
| write_status("disabled", "HF_TOKEN is not configured.") | |
| return False | |
| repo_id = resolve_backup_namespace() | |
| write_status("restoring", f"Restoring workspace from {repo_id}") | |
| try: | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| snapshot_download( | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| local_dir=tmpdir, | |
| ) | |
| tmp_path = Path(tmpdir) | |
| if not any(tmp_path.iterdir()): | |
| write_status("fresh", "Backup dataset is empty. Starting fresh.") | |
| return True | |
| WORKSPACE.mkdir(parents=True, exist_ok=True) | |
| for child in list(WORKSPACE.iterdir()): | |
| if child.name == ".git": | |
| continue | |
| if child.is_dir(): | |
| shutil.rmtree(child, ignore_errors=True) | |
| else: | |
| child.unlink(missing_ok=True) | |
| for child in tmp_path.iterdir(): | |
| if child.name == ".git": | |
| continue | |
| destination = WORKSPACE / child.name | |
| if child.is_dir(): | |
| shutil.copytree(child, destination) | |
| else: | |
| shutil.copy2(child, destination) | |
| restore_embedded_state() | |
| write_status("restored", f"Restored workspace from {repo_id}") | |
| return True | |
| except RepositoryNotFoundError: | |
| write_status("fresh", f"Backup dataset {repo_id} does not exist yet.") | |
| return True | |
| except HfHubHTTPError as exc: | |
| if exc.response is not None and exc.response.status_code == 404: | |
| write_status("fresh", f"Backup dataset {repo_id} does not exist yet.") | |
| return True | |
| write_status("error", f"Restore failed: {exc}") | |
| print(f"Restore failed: {exc}", file=sys.stderr) | |
| return False | |
| except Exception as exc: | |
| write_status("error", f"Restore failed: {exc}") | |
| print(f"Restore failed: {exc}", file=sys.stderr) | |
| return False | |
| def sync_once( | |
| last_fingerprint: str | None = None, | |
| last_marker: tuple[int, int, int] | None = None, | |
| ) -> tuple[str, tuple[int, int, int]]: | |
| if not HF_TOKEN: | |
| write_status("disabled", "HF_TOKEN is not configured.") | |
| return (last_fingerprint or "", last_marker or (0, 0, 0)) | |
| snapshot_state_into_workspace() | |
| repo_id = ensure_repo_exists() | |
| current_marker = metadata_marker(WORKSPACE) | |
| if last_marker is not None and current_marker == last_marker: | |
| write_status("synced", "No workspace changes detected.") | |
| return (last_fingerprint or "", current_marker) | |
| current_fingerprint = fingerprint_dir(WORKSPACE) | |
| if last_fingerprint is not None and current_fingerprint == last_fingerprint: | |
| write_status("synced", "No workspace changes detected.") | |
| return (last_fingerprint, current_marker) | |
| write_status("syncing", f"Uploading workspace to {repo_id}") | |
| snapshot_dir = create_snapshot_dir(WORKSPACE) | |
| try: | |
| try: | |
| HF_API.upload_large_folder( | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| folder_path=str(snapshot_dir), | |
| num_workers=2, | |
| print_report=False, | |
| ) | |
| except AttributeError: | |
| upload_folder( | |
| folder_path=str(snapshot_dir), | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| commit_message=f"HuggingClaw sync {time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())}", | |
| ignore_patterns=[".git/*", ".git"], | |
| ) | |
| finally: | |
| shutil.rmtree(snapshot_dir, ignore_errors=True) | |
| write_status("success", f"Uploaded workspace to {repo_id}") | |
| return (current_fingerprint, current_marker) | |
| def handle_signal(_sig, _frame) -> None: | |
| STOP_EVENT.set() | |
| def loop() -> int: | |
| signal.signal(signal.SIGTERM, handle_signal) | |
| signal.signal(signal.SIGINT, handle_signal) | |
| try: | |
| repo_id = resolve_backup_namespace() | |
| write_status("configured", f"Backup loop active for {repo_id} with {INTERVAL}s interval.") | |
| except Exception as exc: | |
| write_status("error", str(exc)) | |
| print(f"Workspace sync error: {exc}") | |
| return 1 | |
| last_fingerprint = fingerprint_dir(WORKSPACE) | |
| last_marker = metadata_marker(WORKSPACE) | |
| time.sleep(INITIAL_DELAY) | |
| print(f"Workspace sync started: every {INTERVAL}s -> {repo_id}") | |
| while not STOP_EVENT.is_set(): | |
| try: | |
| last_fingerprint, last_marker = sync_once(last_fingerprint, last_marker) | |
| except Exception as exc: | |
| write_status("error", f"Sync failed: {exc}") | |
| print(f"Workspace sync failed: {exc}") | |
| if STOP_EVENT.wait(INTERVAL): | |
| break | |
| return 0 | |
| def main() -> int: | |
| WORKSPACE.mkdir(parents=True, exist_ok=True) | |
| if len(sys.argv) < 2: | |
| return loop() | |
| command = sys.argv[1] | |
| if command == "restore": | |
| return 0 if restore_workspace() else 1 | |
| if command == "sync-once": | |
| try: | |
| sync_once() | |
| return 0 | |
| except Exception as exc: | |
| write_status("error", f"Shutdown sync failed: {exc}") | |
| print(f"Workspace sync: shutdown sync failed: {exc}") | |
| return 1 | |
| if command == "loop": | |
| return loop() | |
| print(f"Unknown command: {command}", file=sys.stderr) | |
| return 1 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) | |