|
|
| """
|
| OpenClaw Full Directory Persistence for Hugging Face Spaces
|
| ========================================================
|
|
|
| This script provides atomic, complete persistence of the entire ~/.openclaw directory.
|
| It implements the comprehensive persistence plan:
|
|
|
| - Config & Credentials (openclaw.json, credentials/)
|
| - Workspace (workspace/ with AGENTS.md, SOUL.md, TOOLS.md, MEMORY.md, skills/, memory/)
|
| - Sessions (agents/*/sessions/*.jsonl)
|
| - Memory Index (memory/*.sqlite)
|
| - QMD Backend (agents/*/qmd/)
|
| - Extensions (extensions/)
|
| - All other state in ~/.openclaw
|
|
|
| Usage:
|
| # Backup (save)
|
| python3 openclaw_persist.py save
|
|
|
| # Restore (load)
|
| python3 openclaw_persist.py load
|
|
|
| Environment Variables:
|
| HF_TOKEN - Hugging Face access token with write permissions
|
| OPENCLAW_DATASET_REPO - Dataset repo ID (e.g., "username/openclaw-state")
|
| OPENCLAW_HOME - OpenClaw home directory (default: ~/.openclaw)
|
| """
|
|
|
| import os
|
| import sys
|
| import json
|
| import tarfile
|
| import tempfile
|
| import shutil
|
| import hashlib
|
| import time
|
| import signal
|
| from datetime import datetime
|
| from pathlib import Path
|
| from typing import Optional, List, Set, Dict, Any
|
|
|
| from huggingface_hub import HfApi, hf_hub_download
|
| from huggingface_hub.utils import RepositoryNotFoundError
|
|
|
|
|
|
|
|
|
|
|
|
|
| class Config:
|
| """Configuration for persistence system"""
|
|
|
|
|
| OPENCLAW_HOME = Path(os.environ.get("OPENCLAW_HOME", "~/.openclaw")).expanduser()
|
| BACKUP_FILENAME = "openclaw-full.tar.gz"
|
| BACKUP_STATE_FILE = ".persistence-state.json"
|
| LOCK_FILE = ".persistence.lock"
|
|
|
|
|
| MAX_BACKUPS = 5
|
| BACKUP_PREFIX = "backup-"
|
|
|
|
|
| EXCLUDE_PATTERNS = [
|
| "*.lock",
|
| "*.tmp",
|
| "*.pyc",
|
| "*__pycache__*",
|
| "*.socket",
|
| "*.pid",
|
| "node_modules",
|
| ".DS_Store",
|
| ".git",
|
| ]
|
|
|
|
|
| SKIP_DIRS = {
|
| ".cache",
|
| "logs",
|
| "temp",
|
| "tmp",
|
| }
|
|
|
|
|
|
|
|
|
|
|
|
|
| def log(level: str, message: str, **kwargs):
|
| """Structured logging"""
|
| timestamp = datetime.now().isoformat()
|
| log_entry = {
|
| "timestamp": timestamp,
|
| "level": level,
|
| "message": message,
|
| **kwargs
|
| }
|
| print(json.dumps(log_entry), flush=True)
|
|
|
|
|
| def calculate_file_hash(filepath: Path) -> str:
|
| """Calculate SHA256 hash of a file"""
|
| sha256 = hashlib.sha256()
|
| try:
|
| with open(filepath, "rb") as f:
|
| for chunk in iter(lambda: f.read(65536), b""):
|
| sha256.update(chunk)
|
| return sha256.hexdigest()
|
| except Exception:
|
| return ""
|
|
|
|
|
| def get_directory_size(directory: Path) -> int:
|
| """Calculate total size of directory in bytes"""
|
| total_size = 0
|
| try:
|
| for dirpath, dirnames, filenames in os.walk(directory):
|
| for filename in filenames:
|
| filepath = Path(dirpath) / filename
|
| try:
|
| total_size += filepath.stat().st_size
|
| except Exception:
|
| pass
|
| except Exception:
|
| pass
|
| return total_size
|
|
|
|
|
| def should_exclude(path: str, exclude_patterns: List[str]) -> bool:
|
| """Check if a path should be excluded based on patterns"""
|
| path_normalized = path.replace("\\", "/")
|
|
|
| for pattern in exclude_patterns:
|
| pattern = pattern.lstrip("/")
|
| if pattern.startswith("*"):
|
| suffix = pattern[1:]
|
| if path_normalized.endswith(suffix):
|
| return True
|
| elif pattern in path_normalized:
|
| return True
|
|
|
| return False
|
|
|
|
|
|
|
|
|
|
|
|
|
| class OpenClawPersistence:
|
| """
|
| Manages persistence of OpenClaw state to Hugging Face Dataset
|
|
|
| Features:
|
| - Atomic full-directory backup/restore
|
| - Proper exclusion of lock files and temporary data
|
| - Safe handling of SQLite databases
|
| - Backup rotation
|
| - Integrity verification
|
| """
|
|
|
| def __init__(self):
|
| self.api = None
|
| self.repo_id = os.environ.get("OPENCLAW_DATASET_REPO")
|
| self.token = os.environ.get("HF_TOKEN")
|
| self.home_dir = Config.OPENCLAW_HOME
|
| self.lock_file = self.home_dir / Config.LOCK_FILE
|
| self.state_file = self.home_dir / Config.BACKUP_STATE_FILE
|
|
|
|
|
| if not self.repo_id:
|
| log("ERROR", "OPENCLAW_DATASET_REPO not set")
|
| raise ValueError("OPENCLAW_DATASET_REPO environment variable required")
|
|
|
| if not self.token:
|
| log("ERROR", "HF_TOKEN not set")
|
| raise ValueError("HF_TOKEN environment variable required")
|
|
|
|
|
| self.api = HfApi(token=self.token)
|
|
|
| log("INFO", "Initialized persistence manager",
|
| repo_id=self.repo_id,
|
| home_dir=str(self.home_dir))
|
|
|
|
|
|
|
|
|
|
|
| def save(self) -> Dict[str, Any]:
|
| """
|
| Save current state to Hugging Face Dataset
|
|
|
| Creates a complete backup of ~/.openclaw directory as a tar.gz file.
|
| """
|
| operation_id = f"save-{int(time.time())}"
|
| start_time = time.time()
|
|
|
| log("INFO", "Starting save operation", operation_id=operation_id)
|
|
|
|
|
| if not self.home_dir.exists():
|
| log("WARNING", "OpenClaw home directory does not exist, creating")
|
| self.home_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
| if self.lock_file.exists():
|
| log("WARNING", "Lock file exists, another operation may be in progress")
|
|
|
|
|
|
|
| try:
|
| self.lock_file.write_text(str(os.getpid()))
|
| except Exception as e:
|
| log("WARNING", "Could not create lock file", error=str(e))
|
|
|
| try:
|
|
|
| dir_size = get_directory_size(self.home_dir)
|
| log("INFO", "Directory size calculated",
|
| size_bytes=dir_size,
|
| size_mb=f"{dir_size / (1024*1024):.2f}")
|
|
|
|
|
| with tempfile.TemporaryDirectory() as tmpdir:
|
| tar_path = Path(tmpdir) / Config.BACKUP_FILENAME
|
| manifest = self._create_tar_archive(tar_path)
|
|
|
|
|
| tar_size = tar_path.stat().st_size
|
| log("INFO", "Archive created",
|
| size_bytes=tar_size,
|
| size_mb=f"{tar_size / (1024*1024):.2f}",
|
| files_count=manifest["file_count"])
|
|
|
|
|
| remote_path = f"{Config.BACKUP_PREFIX}{datetime.now().strftime('%Y%m%d_%H%M%S')}.tar.gz"
|
| upload_result = self._upload_archive(tar_path, remote_path)
|
|
|
|
|
| self._update_state({
|
| "last_save_time": datetime.now().isoformat(),
|
| "last_save_operation": operation_id,
|
| "last_save_remote_path": remote_path,
|
| "last_save_commit": upload_result.get("commit_id"),
|
| "last_save_manifest": manifest,
|
| })
|
|
|
|
|
| self._rotate_backups()
|
|
|
| duration = time.time() - start_time
|
| log("INFO", "Save completed successfully",
|
| operation_id=operation_id,
|
| duration_seconds=f"{duration:.2f}")
|
|
|
| return {
|
| "success": True,
|
| "operation_id": operation_id,
|
| "remote_path": remote_path,
|
| "commit_id": upload_result.get("commit_id"),
|
| "duration": duration,
|
| "manifest": manifest
|
| }
|
|
|
| except Exception as e:
|
| log("ERROR", "Save operation failed",
|
| operation_id=operation_id,
|
| error=str(e),
|
| exc_info=True)
|
| return {
|
| "success": False,
|
| "operation_id": operation_id,
|
| "error": str(e)
|
| }
|
| finally:
|
|
|
| if self.lock_file.exists():
|
| try:
|
| self.lock_file.unlink()
|
| except Exception:
|
| pass
|
|
|
| def _create_tar_archive(self, tar_path: Path) -> Dict[str, Any]:
|
| """Create tar.gz archive of OpenClaw home directory"""
|
| manifest = {
|
| "created_at": datetime.now().isoformat(),
|
| "version": "1.0",
|
| "file_count": 0,
|
| "excluded_patterns": [],
|
| "included_dirs": [],
|
| "skipped_dirs": [],
|
| }
|
|
|
| excluded_count = 0
|
|
|
| def tar_filter(tarinfo: tarfile.TarInfo) -> Optional[tarfile.TarInfo]:
|
| nonlocal excluded_count, manifest
|
|
|
|
|
| if tarinfo.name.endswith(Config.LOCK_FILE):
|
| excluded_count += 1
|
| return None
|
|
|
|
|
| if tarinfo.name.endswith(Config.BACKUP_STATE_FILE):
|
| return None
|
|
|
|
|
| rel_path = tarinfo.name
|
| if rel_path.startswith("./"):
|
| rel_path = rel_path[2:]
|
|
|
|
|
| if should_exclude(rel_path, Config.EXCLUDE_PATTERNS):
|
| excluded_count += 1
|
| manifest["excluded_patterns"].append(rel_path)
|
| return None
|
|
|
|
|
| path_parts = Path(rel_path).parts
|
| if path_parts and path_parts[0] in Config.SKIP_DIRS:
|
| excluded_count += 1
|
| return None
|
|
|
|
|
| manifest["file_count"] += 1
|
| if path_parts and path_parts[0] not in manifest["included_dirs"]:
|
| manifest["included_dirs"].append(path_parts[0])
|
|
|
| return tarinfo
|
|
|
|
|
| with tarfile.open(tar_path, "w:gz") as tar:
|
| tar.add(self.home_dir, arcname=".", filter=tar_filter)
|
|
|
| manifest["excluded_count"] = excluded_count
|
| manifest["skipped_dirs"] = list(Config.SKIP_DIRS)
|
|
|
| return manifest
|
|
|
| def _upload_archive(self, local_path: Path, remote_path: str) -> Dict[str, Any]:
|
| """Upload archive to Hugging Face Dataset"""
|
| try:
|
|
|
| try:
|
| self.api.repo_info(repo_id=self.repo_id, repo_type="dataset")
|
| except RepositoryNotFoundError:
|
| log("INFO", "Creating new dataset repository")
|
| self.api.create_repo(
|
| repo_id=self.repo_id,
|
| repo_type="dataset",
|
| private=True
|
| )
|
|
|
|
|
| commit_info = self.api.upload_file(
|
| path_or_fileobj=str(local_path),
|
| path_in_repo=remote_path,
|
| repo_id=self.repo_id,
|
| repo_type="dataset",
|
| commit_message=f"OpenClaw state backup - {datetime.now().isoformat()}"
|
| )
|
|
|
| log("INFO", "File uploaded successfully",
|
| remote_path=remote_path,
|
| commit_url=commit_info.commit_url)
|
|
|
| return {
|
| "success": True,
|
| "commit_id": commit_info.oid,
|
| "commit_url": commit_info.commit_url
|
| }
|
|
|
| except Exception as e:
|
| log("ERROR", "Upload failed", error=str(e))
|
| raise
|
|
|
| def _update_state(self, state_update: Dict[str, Any]):
|
| """Update persistence state file"""
|
| try:
|
| current_state = {}
|
| if self.state_file.exists():
|
| with open(self.state_file, 'r') as f:
|
| current_state = json.load(f)
|
|
|
| current_state.update(state_update)
|
|
|
| self.state_file.parent.mkdir(parents=True, exist_ok=True)
|
| with open(self.state_file, 'w') as f:
|
| json.dump(current_state, f, indent=2)
|
|
|
| except Exception as e:
|
| log("WARNING", "Could not update state file", error=str(e))
|
|
|
| def _rotate_backups(self):
|
| """Rotate old backups, keeping only MAX_BACKUPS most recent"""
|
| try:
|
| files = self.api.list_repo_files(
|
| repo_id=self.repo_id,
|
| repo_type="dataset"
|
| )
|
|
|
|
|
| backups = [
|
| f for f in files
|
| if f.startswith(Config.BACKUP_PREFIX) and f.endswith(".tar.gz")
|
| ]
|
|
|
|
|
| backups = sorted(backups)
|
|
|
|
|
| if len(backups) > Config.MAX_BACKUPS:
|
| to_delete = backups[:-Config.MAX_BACKUPS]
|
| log("INFO", "Rotating backups",
|
| total=len(backups),
|
| keeping=Config.MAX_BACKUPS,
|
| deleting=len(to_delete))
|
|
|
| for old_backup in to_delete:
|
| try:
|
| self.api.delete_file(
|
| path_in_repo=old_backup,
|
| repo_id=self.repo_id,
|
| repo_type="dataset"
|
| )
|
| log("INFO", "Deleted old backup", file=old_backup)
|
| except Exception as e:
|
| log("WARNING", "Could not delete backup",
|
| file=old_backup,
|
| error=str(e))
|
|
|
| except Exception as e:
|
| log("WARNING", "Backup rotation failed", error=str(e))
|
|
|
|
|
|
|
|
|
|
|
| def load(self, force: bool = False) -> Dict[str, Any]:
|
| """
|
| Load state from Hugging Face Dataset
|
|
|
| Restores the most recent backup. If force is False and local state
|
| exists, it will create a backup before restoring.
|
| """
|
| operation_id = f"load-{int(time.time())}"
|
| start_time = time.time()
|
|
|
| log("INFO", "Starting load operation",
|
| operation_id=operation_id,
|
| force=force)
|
|
|
| try:
|
|
|
| backup_info = self._find_latest_backup()
|
|
|
| if not backup_info:
|
| log("WARNING", "No backups found, starting fresh")
|
|
|
| self.home_dir.mkdir(parents=True, exist_ok=True)
|
| return {
|
| "success": True,
|
| "operation_id": operation_id,
|
| "restored": False,
|
| "message": "No backups found, starting fresh"
|
| }
|
|
|
| log("INFO", "Found backup to restore",
|
| backup_file=backup_info["filename"],
|
| timestamp=backup_info.get("timestamp"))
|
|
|
|
|
| if self.home_dir.exists() and not force:
|
| backup_dir = self._create_local_backup()
|
| log("INFO", "Created local backup", backup_dir=str(backup_dir))
|
|
|
|
|
| with tempfile.TemporaryDirectory() as tmpdir:
|
| tar_path = Path(tmpdir) / "backup.tar.gz"
|
|
|
|
|
| log("INFO", "Downloading backup...")
|
| downloaded_path = hf_hub_download(
|
| repo_id=self.repo_id,
|
| filename=backup_info["filename"],
|
| repo_type="dataset",
|
| token=self.token,
|
| local_dir=tmpdir,
|
| local_dir_use_symlinks=False
|
| )
|
|
|
|
|
| log("INFO", "Extracting archive...")
|
| self._extract_archive(downloaded_path)
|
|
|
| duration = time.time() - start_time
|
| log("INFO", "Load completed successfully",
|
| operation_id=operation_id,
|
| duration_seconds=f"{duration:.2f}")
|
|
|
| return {
|
| "success": True,
|
| "operation_id": operation_id,
|
| "restored": True,
|
| "backup_file": backup_info["filename"],
|
| "duration": duration
|
| }
|
|
|
| except Exception as e:
|
| log("ERROR", "Load operation failed",
|
| operation_id=operation_id,
|
| error=str(e),
|
| exc_info=True)
|
| return {
|
| "success": False,
|
| "operation_id": operation_id,
|
| "error": str(e)
|
| }
|
|
|
| def _find_latest_backup(self) -> Optional[Dict[str, Any]]:
|
| """Find the latest backup file in the dataset"""
|
| try:
|
| files = self.api.list_repo_files(
|
| repo_id=self.repo_id,
|
| repo_type="dataset"
|
| )
|
|
|
|
|
| backups = sorted(
|
| [f for f in files if f.startswith(Config.BACKUP_PREFIX) and f.endswith(".tar.gz")],
|
| reverse=True
|
| )
|
|
|
| if not backups:
|
| return None
|
|
|
| latest = backups[0]
|
|
|
|
|
| timestamp_str = latest.replace(Config.BACKUP_PREFIX, "").replace(".tar.gz", "")
|
| try:
|
| timestamp = datetime.strptime(timestamp_str, "%Y%m%d_%H%M%S").isoformat()
|
| except ValueError:
|
| timestamp = None
|
|
|
| return {
|
| "filename": latest,
|
| "timestamp": timestamp
|
| }
|
|
|
| except Exception as e:
|
| log("ERROR", "Could not find latest backup", error=str(e))
|
| return None
|
|
|
| def _create_local_backup(self) -> Optional[Path]:
|
| """Create a backup of local state before restore"""
|
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
| backup_dir = self.home_dir.parent / f"{self.home_dir.name}_backup_{timestamp}"
|
|
|
| try:
|
| if self.home_dir.exists():
|
| shutil.copytree(self.home_dir, backup_dir)
|
| return backup_dir
|
| except Exception as e:
|
| log("WARNING", "Could not create local backup", error=str(e))
|
|
|
| return None
|
|
|
| def _extract_archive(self, tar_path: Path):
|
| """Extract tar.gz archive to home directory"""
|
|
|
| self.home_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
| with tarfile.open(tar_path, "r:gz") as tar:
|
| tar.extractall(self.home_dir)
|
|
|
| log("INFO", "Archive extracted successfully",
|
| destination=str(self.home_dir))
|
|
|
|
|
|
|
|
|
|
|
|
|
| def main():
|
| if len(sys.argv) < 2:
|
| print("Usage: python openclaw_persist.py [save|load|status]", file=sys.stderr)
|
| print("", file=sys.stderr)
|
| print("Commands:", file=sys.stderr)
|
| print(" save - Save current state to dataset", file=sys.stderr)
|
| print(" load - Load state from dataset", file=sys.stderr)
|
| print(" status - Show persistence status", file=sys.stderr)
|
| sys.exit(1)
|
|
|
| command = sys.argv[1].lower()
|
|
|
| try:
|
| manager = OpenClawPersistence()
|
|
|
| if command == "save":
|
| result = manager.save()
|
| print(json.dumps(result, indent=2))
|
| sys.exit(0 if result.get("success") else 1)
|
|
|
| elif command == "load":
|
| force = "--force" in sys.argv or "-f" in sys.argv
|
| result = manager.load(force=force)
|
| print(json.dumps(result, indent=2))
|
| sys.exit(0 if result.get("success") else 1)
|
|
|
| elif command == "status":
|
|
|
| status = {
|
| "configured": True,
|
| "repo_id": manager.repo_id,
|
| "home_dir": str(manager.home_dir),
|
| "home_exists": manager.home_dir.exists(),
|
| }
|
|
|
|
|
| if manager.state_file.exists():
|
| with open(manager.state_file, 'r') as f:
|
| state = json.load(f)
|
| status["state"] = state
|
|
|
|
|
| backups = manager._find_latest_backup()
|
| status["latest_backup"] = backups
|
|
|
| print(json.dumps(status, indent=2))
|
| sys.exit(0)
|
|
|
| else:
|
| print(f"Unknown command: {command}", file=sys.stderr)
|
| sys.exit(1)
|
|
|
| except Exception as e:
|
| print(json.dumps({
|
| "success": False,
|
| "error": str(e)
|
| }, indent=2))
|
| sys.exit(1)
|
|
|
|
|
| if __name__ == "__main__":
|
| main()
|
|
|