#!/usr/bin/env python3 """ HuggingClaw workspace/state backup via huggingface_hub. This keeps OpenClaw workspace data, app state, and optional WhatsApp credentials inside a private HF dataset without embedding HF tokens in git remotes or requiring a manual HF_USERNAME secret. """ import hashlib import json import logging import os import shutil import signal import sys import tempfile import threading import time from pathlib import Path os.environ.setdefault("HF_HUB_DISABLE_PROGRESS_BARS", "1") # huggingface_hub reads HF_HUB_VERBOSITY at import time and overrides any # logging.getLogger().setLevel() we apply afterwards. Set it before import # to silence the "No files have been modified..." spam from # upload_large_folder workers (logger.warning level). os.environ.setdefault("HF_HUB_VERBOSITY", "error") from huggingface_hub import HfApi, snapshot_download, upload_folder from huggingface_hub.errors import HfHubHTTPError, RepositoryNotFoundError # Belt-and-suspenders: also raise the level after import in case the env var # wasn't honored (older hub versions, or message logged via a sub-logger). logging.getLogger("huggingface_hub").setLevel(logging.ERROR) OPENCLAW_HOME = Path("/home/node/.openclaw") WORKSPACE = OPENCLAW_HOME / "workspace" STATUS_FILE = Path("/tmp/sync-status.json") INTERVAL = int(os.environ.get("SYNC_INTERVAL", "180")) INITIAL_DELAY = int(os.environ.get("SYNC_START_DELAY", "10")) HF_TOKEN = os.environ.get("HF_TOKEN", "").strip() HF_USERNAME = os.environ.get("HF_USERNAME", "").strip() SPACE_AUTHOR_NAME = os.environ.get("SPACE_AUTHOR_NAME", "").strip() BACKUP_DATASET_NAME = os.environ.get("BACKUP_DATASET_NAME", "huggingclaw-backup").strip() WHATSAPP_ENABLED = os.environ.get("WHATSAPP_ENABLED", "").strip().lower() == "true" EXCLUDED_SYNC_DIRS = { "node_modules", ".git", "__pycache__", ".venv", "venv", ".npm", ".cache", ".yarn", "dist", "build", ".next", ".nuxt", ".turbo", ".parcel-cache", "target", ".gradle", ".mvn", } MAX_FILE_SIZE_BYTES = int(os.environ.get("SYNC_MAX_FILE_BYTES", str(50 * 1024 * 1024))) STATE_DIR = WORKSPACE / ".huggingclaw-state" OPENCLAW_STATE_BACKUP_DIR = STATE_DIR / "openclaw" EXCLUDED_STATE_NAMES = { "workspace", "openclaw-app", "gateway.log", "browser", } WHATSAPP_CREDS_DIR = OPENCLAW_HOME / "credentials" / "whatsapp" / "default" WHATSAPP_BACKUP_DIR = STATE_DIR / "credentials" / "whatsapp" / "default" RESET_MARKER = WORKSPACE / ".reset_credentials" HF_API = HfApi(token=HF_TOKEN) if HF_TOKEN else None STOP_EVENT = threading.Event() _REPO_ID_CACHE: str | None = None def write_status(status: str, message: str) -> None: payload = { "status": status, "message": message, "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), } tmp_path = STATUS_FILE.with_suffix(".tmp") tmp_path.write_text(json.dumps(payload), encoding="utf-8") tmp_path.replace(STATUS_FILE) def count_files(path: Path) -> int: if not path.exists(): return 0 return sum(1 for child in path.rglob("*") if child.is_file()) def snapshot_state_into_workspace() -> None: try: STATE_DIR.mkdir(parents=True, exist_ok=True) if OPENCLAW_STATE_BACKUP_DIR.exists(): shutil.rmtree(OPENCLAW_STATE_BACKUP_DIR, ignore_errors=True) OPENCLAW_STATE_BACKUP_DIR.mkdir(parents=True, exist_ok=True) for source_path in OPENCLAW_HOME.iterdir(): if source_path.name in EXCLUDED_STATE_NAMES: continue backup_path = OPENCLAW_STATE_BACKUP_DIR / source_path.name if source_path.is_dir(): shutil.copytree(source_path, backup_path) elif source_path.is_file(): shutil.copy2(source_path, backup_path) except Exception as exc: print(f"Warning: could not snapshot OpenClaw state: {exc}") try: if not WHATSAPP_ENABLED: return STATE_DIR.mkdir(parents=True, exist_ok=True) if RESET_MARKER.exists(): if WHATSAPP_BACKUP_DIR.exists(): shutil.rmtree(WHATSAPP_BACKUP_DIR, ignore_errors=True) print("Removed backed-up WhatsApp credentials after reset request.") RESET_MARKER.unlink(missing_ok=True) return if not WHATSAPP_CREDS_DIR.exists(): return file_count = count_files(WHATSAPP_CREDS_DIR) if file_count < 2: if file_count > 0: print(f"WhatsApp backup skipped: credentials incomplete ({file_count} files).") return WHATSAPP_BACKUP_DIR.parent.mkdir(parents=True, exist_ok=True) if WHATSAPP_BACKUP_DIR.exists(): shutil.rmtree(WHATSAPP_BACKUP_DIR, ignore_errors=True) shutil.copytree(WHATSAPP_CREDS_DIR, WHATSAPP_BACKUP_DIR) except Exception as exc: print(f"Warning: could not snapshot WhatsApp state: {exc}") def restore_embedded_state() -> None: state_backup_root = STATE_DIR / "openclaw" if state_backup_root.is_dir(): for source_path in state_backup_root.iterdir(): name = source_path.name if name in EXCLUDED_STATE_NAMES: if source_path.is_dir(): shutil.rmtree(source_path, ignore_errors=True) else: source_path.unlink(missing_ok=True) continue target_path = OPENCLAW_HOME / name shutil.rmtree(target_path, ignore_errors=True) if target_path.is_file(): target_path.unlink(missing_ok=True) target_path.parent.mkdir(parents=True, exist_ok=True) if source_path.is_dir(): shutil.copytree(source_path, target_path) else: shutil.copy2(source_path, target_path) print("OpenClaw state restored.") if WHATSAPP_ENABLED and WHATSAPP_BACKUP_DIR.is_dir(): file_count = count_files(WHATSAPP_BACKUP_DIR) if file_count >= 2: shutil.rmtree(WHATSAPP_CREDS_DIR, ignore_errors=True) WHATSAPP_CREDS_DIR.parent.mkdir(parents=True, exist_ok=True) shutil.copytree(WHATSAPP_BACKUP_DIR, WHATSAPP_CREDS_DIR) # Lock down dir tree: 0700 on directories, 0600 on every file # so the WhatsApp session secrets can't be read by other users. os.chmod(OPENCLAW_HOME / "credentials", 0o700) for path in WHATSAPP_CREDS_DIR.rglob("*"): try: if path.is_dir(): os.chmod(path, 0o700) elif path.is_file(): os.chmod(path, 0o600) except OSError: pass print("WhatsApp credentials restored.") else: print(f"Warning: saved WhatsApp credentials incomplete ({file_count} files), skipping restore.") def resolve_backup_namespace() -> str: global _REPO_ID_CACHE if _REPO_ID_CACHE: return _REPO_ID_CACHE namespace = HF_USERNAME or SPACE_AUTHOR_NAME if not namespace and HF_API is not None: whoami = HF_API.whoami() namespace = whoami.get("name") or whoami.get("user") or "" namespace = str(namespace).strip() if not namespace: raise RuntimeError( "Could not determine the Hugging Face username for backups. " "Set HF_USERNAME or use a token tied to your account." ) _REPO_ID_CACHE = f"{namespace}/{BACKUP_DATASET_NAME}" return _REPO_ID_CACHE def ensure_repo_exists() -> str: repo_id = resolve_backup_namespace() try: HF_API.repo_info(repo_id=repo_id, repo_type="dataset") except RepositoryNotFoundError: HF_API.create_repo(repo_id=repo_id, repo_type="dataset", private=True) return repo_id def _should_exclude(rel_posix: str, path: Path) -> bool: parts = Path(rel_posix).parts if any(part in EXCLUDED_SYNC_DIRS for part in parts): return True if path.is_file(): try: if path.stat().st_size > MAX_FILE_SIZE_BYTES: return True except OSError: pass return False def metadata_marker(root: Path) -> tuple[int, int, int]: if not root.exists(): return (0, 0, 0) file_count = 0 total_size = 0 newest_mtime = 0 for path in root.rglob("*"): if not path.is_file(): continue rel = path.relative_to(root).as_posix() if _should_exclude(rel, path): continue try: stat = path.stat() except OSError: continue file_count += 1 total_size += int(stat.st_size) newest_mtime = max(newest_mtime, int(stat.st_mtime_ns)) return (file_count, total_size, newest_mtime) def fingerprint_dir(root: Path) -> str: hasher = hashlib.sha256() if not root.exists(): return hasher.hexdigest() for path in sorted(p for p in root.rglob("*") if p.is_file()): rel = path.relative_to(root).as_posix() if _should_exclude(rel, path): continue hasher.update(rel.encode("utf-8")) with path.open("rb") as handle: for chunk in iter(lambda: handle.read(1024 * 1024), b""): hasher.update(chunk) return hasher.hexdigest() def create_snapshot_dir(source_root: Path) -> Path: staging_root = Path(tempfile.mkdtemp(prefix="huggingclaw-sync-")) for path in sorted(source_root.rglob("*")): rel = path.relative_to(source_root) rel_posix = rel.as_posix() if _should_exclude(rel_posix, path): continue target = staging_root / rel if path.is_dir(): target.mkdir(parents=True, exist_ok=True) continue target.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(path, target) return staging_root def restore_workspace() -> bool: if not HF_TOKEN: write_status("disabled", "HF_TOKEN is not configured.") return False repo_id = resolve_backup_namespace() write_status("restoring", f"Restoring workspace from {repo_id}") try: with tempfile.TemporaryDirectory() as tmpdir: snapshot_download( repo_id=repo_id, repo_type="dataset", token=HF_TOKEN, local_dir=tmpdir, ) tmp_path = Path(tmpdir) if not any(tmp_path.iterdir()): write_status("fresh", "Backup dataset is empty. Starting fresh.") return True WORKSPACE.mkdir(parents=True, exist_ok=True) for child in list(WORKSPACE.iterdir()): if child.name == ".git": continue if child.is_dir(): shutil.rmtree(child, ignore_errors=True) else: child.unlink(missing_ok=True) for child in tmp_path.iterdir(): if child.name == ".git": continue destination = WORKSPACE / child.name if child.is_dir(): shutil.copytree(child, destination) else: shutil.copy2(child, destination) restore_embedded_state() write_status("restored", f"Restored workspace from {repo_id}") return True except RepositoryNotFoundError: write_status("fresh", f"Backup dataset {repo_id} does not exist yet.") return True except HfHubHTTPError as exc: if exc.response is not None and exc.response.status_code == 404: write_status("fresh", f"Backup dataset {repo_id} does not exist yet.") return True write_status("error", f"Restore failed: {exc}") print(f"Restore failed: {exc}", file=sys.stderr) return False except Exception as exc: write_status("error", f"Restore failed: {exc}") print(f"Restore failed: {exc}", file=sys.stderr) return False def sync_once( last_fingerprint: str | None = None, last_marker: tuple[int, int, int] | None = None, ) -> tuple[str, tuple[int, int, int]]: if not HF_TOKEN: write_status("disabled", "HF_TOKEN is not configured.") return (last_fingerprint or "", last_marker or (0, 0, 0)) snapshot_state_into_workspace() repo_id = ensure_repo_exists() current_marker = metadata_marker(WORKSPACE) if last_marker is not None and current_marker == last_marker: write_status("synced", "No workspace changes detected.") return (last_fingerprint or "", current_marker) current_fingerprint = fingerprint_dir(WORKSPACE) if last_fingerprint is not None and current_fingerprint == last_fingerprint: write_status("synced", "No workspace changes detected.") return (last_fingerprint, current_marker) write_status("syncing", f"Uploading workspace to {repo_id}") snapshot_dir = create_snapshot_dir(WORKSPACE) try: try: HF_API.upload_large_folder( repo_id=repo_id, repo_type="dataset", folder_path=str(snapshot_dir), num_workers=2, print_report=False, ) except AttributeError: upload_folder( folder_path=str(snapshot_dir), repo_id=repo_id, repo_type="dataset", token=HF_TOKEN, commit_message=f"HuggingClaw sync {time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())}", ignore_patterns=[".git/*", ".git"], ) finally: shutil.rmtree(snapshot_dir, ignore_errors=True) write_status("success", f"Uploaded workspace to {repo_id}") return (current_fingerprint, current_marker) def handle_signal(_sig, _frame) -> None: STOP_EVENT.set() def loop() -> int: signal.signal(signal.SIGTERM, handle_signal) signal.signal(signal.SIGINT, handle_signal) try: repo_id = resolve_backup_namespace() write_status("configured", f"Backup loop active for {repo_id} with {INTERVAL}s interval.") except Exception as exc: write_status("error", str(exc)) print(f"Workspace sync error: {exc}") return 1 last_fingerprint = fingerprint_dir(WORKSPACE) last_marker = metadata_marker(WORKSPACE) time.sleep(INITIAL_DELAY) print(f"Workspace sync started: every {INTERVAL}s -> {repo_id}") while not STOP_EVENT.is_set(): try: last_fingerprint, last_marker = sync_once(last_fingerprint, last_marker) except Exception as exc: write_status("error", f"Sync failed: {exc}") print(f"Workspace sync failed: {exc}") if STOP_EVENT.wait(INTERVAL): break return 0 def main() -> int: WORKSPACE.mkdir(parents=True, exist_ok=True) if len(sys.argv) < 2: return loop() command = sys.argv[1] if command == "restore": return 0 if restore_workspace() else 1 if command == "sync-once": try: sync_once() return 0 except Exception as exc: write_status("error", f"Shutdown sync failed: {exc}") print(f"Workspace sync: shutdown sync failed: {exc}") return 1 if command == "loop": return loop() print(f"Unknown command: {command}", file=sys.stderr) return 1 if __name__ == "__main__": raise SystemExit(main())