| |
| """ |
| Atomic Dataset Persistence for OpenClaw AI |
| Save state to Hugging Face Dataset with atomic operations |
| """ |
|
|
| import os |
| import sys |
| import json |
| import hashlib |
| import time |
| import tarfile |
| import tempfile |
| import shutil |
| from datetime import datetime |
| from pathlib import Path |
| from typing import Dict, Any, Optional, List |
| import requests |
| import logging |
|
|
| from huggingface_hub import HfApi, CommitOperationAdd |
| from huggingface_hub.utils import RepositoryNotFoundError |
| from huggingface_hub import hf_hub_download |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format='{"timestamp": "%(asctime)s", "level": "%(levelname)s", "module": "atomic-save", "message": "%(message)s"}' |
| ) |
| logger = logging.getLogger(__name__) |
|
|
| class AtomicDatasetSaver: |
| """Atomic dataset persistence with proper error handling and retries""" |
| |
| def __init__(self, repo_id: str, dataset_path: str = "state"): |
| self.repo_id = repo_id |
| self.dataset_path = Path(dataset_path) |
| self.api = HfApi() |
| self.max_retries = 3 |
| self.base_delay = 1.0 |
| self.max_backups = 3 |
| |
| logger.info("init", { |
| "repo_id": repo_id, |
| "dataset_path": dataset_path, |
| "max_retries": self.max_retries, |
| "max_backups": self.max_backups |
| }) |
| |
| def calculate_checksum(self, file_path: Path) -> str: |
| """Calculate SHA256 checksum of file""" |
| sha256_hash = hashlib.sha256() |
| with open(file_path, "rb") as f: |
| for chunk in iter(lambda: f.read(4096), b""): |
| sha256_hash.update(chunk) |
| return sha256_hash.hexdigest() |
| |
| def create_backup(self, current_commit: Optional[str] = None) -> Optional[str]: |
| """Create backup of current state before overwriting""" |
| try: |
| if not current_commit: |
| return None |
| |
| |
| files = self.api.list_repo_files( |
| repo_id=self.repo_id, |
| repo_type="dataset", |
| revision=current_commit |
| ) |
| |
| |
| state_files = [f for f in files if f.startswith(str(self.dataset_path))] |
| if not state_files: |
| return None |
| |
| |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| backup_path = f"backups/state_{timestamp}" |
| |
| logger.info("creating_backup", { |
| "current_commit": current_commit, |
| "backup_path": backup_path, |
| "files_count": len(state_files) |
| }) |
| |
| |
| with tempfile.TemporaryDirectory() as tmpdir: |
| tmpdir_path = Path(tmpdir) |
| |
| |
| for file_path in state_files: |
| file_content = hf_hub_download( |
| repo_id=self.repo_id, |
| repo_type="dataset", |
| filename=file_path, |
| revision=current_commit, |
| local_files_only=False |
| ) |
| if file_content: |
| shutil.copy2(file_content, tmpdir_path / Path(file_path).name) |
| |
| |
| backup_files = [] |
| for file_path in state_files: |
| local_path = tmpdir_path / file_path |
| if local_path.exists(): |
| backup_file_path = f"{backup_path}/{Path(file_path).name}" |
| backup_files.append( |
| CommitOperationAdd( |
| path_in_repo=backup_file_path, |
| path_or_fileobj=str(local_path) |
| ) |
| ) |
| |
| if backup_files: |
| |
| commit_info = self.api.create_commit( |
| repo_id=self.repo_id, |
| repo_type="dataset", |
| operations=backup_files, |
| commit_message=f"Backup state before update - {timestamp}", |
| parent_commit=current_commit |
| ) |
| |
| logger.info("backup_created", { |
| "backup_commit": commit_info.oid, |
| "backup_path": backup_path |
| }) |
| |
| return commit_info.oid |
| |
| except Exception as e: |
| logger.error("backup_failed", {"error": str(e), "current_commit": current_commit}) |
| return None |
| |
| def cleanup_old_backups(self, current_commit: Optional[str] = None) -> None: |
| """Clean up old backups, keeping only the most recent ones""" |
| try: |
| if not current_commit: |
| return |
| |
| |
| files = self.api.list_repo_files( |
| repo_id=self.repo_id, |
| repo_type="dataset", |
| revision=current_commit |
| ) |
| |
| |
| backup_dirs = set() |
| for file_path in files: |
| if file_path.startswith("backups/state_"): |
| backup_dir = file_path.split("/")[1] |
| backup_dirs.add(backup_dir) |
| |
| |
| backup_list = sorted(backup_dirs) |
| if len(backup_list) > self.max_backups: |
| backups_to_remove = backup_list[:-self.max_backups] |
| |
| logger.info("cleaning_old_backups", { |
| "total_backups": len(backup_list), |
| "keeping": self.max_backups, |
| "removing": len(backups_to_remove), |
| "old_backups": backups_to_remove |
| }) |
| |
| |
| |
| |
| except Exception as e: |
| logger.error("backup_cleanup_failed", {"error": str(e)}) |
| |
| def save_state_atomic(self, state_data: Dict[str, Any], source_paths: List[str]) -> Dict[str, Any]: |
| """ |
| Save state to dataset atomically |
| |
| Args: |
| state_data: Dictionary containing state information |
| source_paths: List of file paths to include in the state |
| |
| Returns: |
| Dictionary with operation result |
| """ |
| operation_id = f"save_{int(time.time())}" |
| |
| logger.info("starting_atomic_save", { |
| "operation_id": operation_id, |
| "state_keys": list(state_data.keys()), |
| "source_paths": source_paths |
| }) |
| |
| try: |
| |
| try: |
| repo_info = self.api.repo_info( |
| repo_id=self.repo_id, |
| repo_type="dataset" |
| ) |
| current_commit = repo_info.sha |
| logger.info("current_commit_found", {"commit": current_commit}) |
| except RepositoryNotFoundError: |
| current_commit = None |
| logger.info("repository_not_found", {"action": "creating_new_repo"}) |
| |
| |
| backup_commit = self.create_backup(current_commit) |
| |
| |
| with tempfile.TemporaryDirectory() as tmpdir: |
| tmpdir_path = Path(tmpdir) |
| state_dir = tmpdir_path / self.dataset_path |
| state_dir.mkdir(parents=True, exist_ok=True) |
| |
| |
| metadata = { |
| "timestamp": datetime.now().isoformat(), |
| "operation_id": operation_id, |
| "checksum": None, |
| "backup_commit": backup_commit, |
| "state_data": state_data |
| } |
| |
| metadata_path = state_dir / "metadata.json" |
| with open(metadata_path, "w") as f: |
| json.dump(metadata, f, indent=2) |
| |
| |
| operations = [CommitOperationAdd(path_in_repo=f"state/metadata.json", path_or_fileobj=str(metadata_path))] |
| |
| for source_path in source_paths: |
| source = Path(source_path) |
| if source.exists(): |
| dest_path = state_dir / source.name |
| shutil.copy2(source, dest_path) |
| |
| |
| checksum = self.calculate_checksum(dest_path) |
| |
| operations.append( |
| CommitOperationAdd( |
| path_in_repo=f"state/{source.name}", |
| path_or_fileobj=str(dest_path) |
| ) |
| ) |
| |
| logger.info("file_added", { |
| "source": source_path, |
| "checksum": checksum, |
| "operation_id": operation_id |
| }) |
| |
| |
| final_metadata = metadata.copy() |
| final_metadata["checksum"] = hashlib.sha256( |
| json.dumps(state_data, sort_keys=True).encode() |
| ).hexdigest() |
| |
| |
| with open(metadata_path, "w") as f: |
| json.dump(final_metadata, f, indent=2) |
| |
| |
| commit_info = self.api.create_commit( |
| repo_id=self.repo_id, |
| repo_type="dataset", |
| operations=operations, |
| commit_message=f"Atomic state update - {operation_id}", |
| parent_commit=current_commit |
| ) |
| |
| |
| self.cleanup_old_backups(commit_info.oid) |
| |
| result = { |
| "success": True, |
| "operation_id": operation_id, |
| "commit_id": commit_info.oid, |
| "backup_commit": backup_commit, |
| "timestamp": datetime.now().isoformat(), |
| "files_count": len(source_paths) |
| } |
| |
| logger.info("atomic_save_completed", result) |
| return result |
| |
| except Exception as e: |
| error_result = { |
| "success": False, |
| "operation_id": operation_id, |
| "error": str(e), |
| "timestamp": datetime.now().isoformat() |
| } |
| |
| logger.error("atomic_save_failed", error_result) |
| raise Exception(f"Atomic save failed: {str(e)}") |
|
|
| def main(): |
| """Main function for command line usage""" |
| if len(sys.argv) < 3: |
| print(json.dumps({ |
| "error": "Usage: python save_to_dataset_atomic.py <repo_id> <source_path1> [source_path2...]", |
| "status": "error" |
| }, indent=2)) |
| sys.exit(1) |
| |
| repo_id = sys.argv[1] |
| source_paths = sys.argv[2:] |
| |
| |
| for path in source_paths: |
| if not os.path.exists(path): |
| print(json.dumps({ |
| "error": f"Source path does not exist: {path}", |
| "status": "error" |
| }, indent=2)) |
| sys.exit(1) |
| |
| try: |
| |
| state_data = { |
| "environment": "production", |
| "version": "1.0.0", |
| "platform": "huggingface-spaces", |
| "timestamp": datetime.now().isoformat() |
| } |
| |
| saver = AtomicDatasetSaver(repo_id) |
| result = saver.save_state_atomic(state_data, source_paths) |
| |
| print(json.dumps(result, indent=2)) |
| |
| except Exception as e: |
| print(json.dumps({ |
| "error": str(e), |
| "status": "error" |
| }, indent=2)) |
| sys.exit(1) |
|
|
| if __name__ == "__main__": |
| main() |