|
|
| """
|
| Atomic Dataset Persistence for OpenClaw AI
|
| Save state to Hugging Face Dataset with atomic operations
|
| """
|
|
|
| import os
|
| import sys
|
| import json
|
| import hashlib
|
| import time
|
| import tarfile
|
| import tempfile
|
| import shutil
|
| from datetime import datetime
|
| from pathlib import Path
|
| from typing import Dict, Any, Optional, List
|
| import requests
|
| import logging
|
|
|
| from huggingface_hub import HfApi, CommitOperationAdd
|
| from huggingface_hub.utils import RepositoryNotFoundError
|
| from huggingface_hub import hf_hub_download
|
|
|
|
|
| logging.basicConfig(
|
| level=logging.INFO,
|
| format='{"timestamp": "%(asctime)s", "level": "%(levelname)s", "module": "atomic-save", "message": "%(message)s"}'
|
| )
|
| logger = logging.getLogger(__name__)
|
|
|
| class AtomicDatasetSaver:
|
| """Atomic dataset persistence with proper error handling and retries"""
|
|
|
| def __init__(self, repo_id: str, dataset_path: str = "state"):
|
| self.repo_id = repo_id
|
| self.dataset_path = Path(dataset_path)
|
| self.api = HfApi()
|
| self.max_retries = 3
|
| self.base_delay = 1.0
|
| self.max_backups = 3
|
|
|
| logger.info("init", {
|
| "repo_id": repo_id,
|
| "dataset_path": dataset_path,
|
| "max_retries": self.max_retries,
|
| "max_backups": self.max_backups
|
| })
|
|
|
| def calculate_checksum(self, file_path: Path) -> str:
|
| """Calculate SHA256 checksum of file"""
|
| sha256_hash = hashlib.sha256()
|
| with open(file_path, "rb") as f:
|
| for chunk in iter(lambda: f.read(4096), b""):
|
| sha256_hash.update(chunk)
|
| return sha256_hash.hexdigest()
|
|
|
| def create_backup(self, current_commit: Optional[str] = None) -> Optional[str]:
|
| """Create backup of current state before overwriting"""
|
| try:
|
| if not current_commit:
|
| return None
|
|
|
|
|
| files = self.api.list_repo_files(
|
| repo_id=self.repo_id,
|
| repo_type="dataset",
|
| revision=current_commit
|
| )
|
|
|
|
|
| state_files = [f for f in files if f.startswith(str(self.dataset_path))]
|
| if not state_files:
|
| return None
|
|
|
|
|
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
| backup_path = f"backups/state_{timestamp}"
|
|
|
| logger.info("creating_backup", {
|
| "current_commit": current_commit,
|
| "backup_path": backup_path,
|
| "files_count": len(state_files)
|
| })
|
|
|
|
|
| with tempfile.TemporaryDirectory() as tmpdir:
|
| tmpdir_path = Path(tmpdir)
|
|
|
|
|
| for file_path in state_files:
|
| file_content = hf_hub_download(
|
| repo_id=self.repo_id,
|
| repo_type="dataset",
|
| filename=file_path,
|
| revision=current_commit,
|
| local_files_only=False
|
| )
|
| if file_content:
|
| shutil.copy2(file_content, tmpdir_path / Path(file_path).name)
|
|
|
|
|
| backup_files = []
|
| for file_path in state_files:
|
| local_path = tmpdir_path / file_path
|
| if local_path.exists():
|
| backup_file_path = f"{backup_path}/{Path(file_path).name}"
|
| backup_files.append(
|
| CommitOperationAdd(
|
| path_in_repo=backup_file_path,
|
| path_or_fileobj=str(local_path)
|
| )
|
| )
|
|
|
| if backup_files:
|
|
|
| commit_info = self.api.create_commit(
|
| repo_id=self.repo_id,
|
| repo_type="dataset",
|
| operations=backup_files,
|
| commit_message=f"Backup state before update - {timestamp}",
|
| parent_commit=current_commit
|
| )
|
|
|
| logger.info("backup_created", {
|
| "backup_commit": commit_info.oid,
|
| "backup_path": backup_path
|
| })
|
|
|
| return commit_info.oid
|
|
|
| except Exception as e:
|
| logger.error("backup_failed", {"error": str(e), "current_commit": current_commit})
|
| return None
|
|
|
| def cleanup_old_backups(self, current_commit: Optional[str] = None) -> None:
|
| """Clean up old backups, keeping only the most recent ones"""
|
| try:
|
| if not current_commit:
|
| return
|
|
|
|
|
| files = self.api.list_repo_files(
|
| repo_id=self.repo_id,
|
| repo_type="dataset",
|
| revision=current_commit
|
| )
|
|
|
|
|
| backup_dirs = set()
|
| for file_path in files:
|
| if file_path.startswith("backups/state_"):
|
| backup_dir = file_path.split("/")[1]
|
| backup_dirs.add(backup_dir)
|
|
|
|
|
| backup_list = sorted(backup_dirs)
|
| if len(backup_list) > self.max_backups:
|
| backups_to_remove = backup_list[:-self.max_backups]
|
|
|
| logger.info("cleaning_old_backups", {
|
| "total_backups": len(backup_list),
|
| "keeping": self.max_backups,
|
| "removing": len(backups_to_remove),
|
| "old_backups": backups_to_remove
|
| })
|
|
|
|
|
|
|
|
|
| except Exception as e:
|
| logger.error("backup_cleanup_failed", {"error": str(e)})
|
|
|
| def save_state_atomic(self, state_data: Dict[str, Any], source_paths: List[str]) -> Dict[str, Any]:
|
| """
|
| Save state to dataset atomically
|
|
|
| Args:
|
| state_data: Dictionary containing state information
|
| source_paths: List of file paths to include in the state
|
|
|
| Returns:
|
| Dictionary with operation result
|
| """
|
| operation_id = f"save_{int(time.time())}"
|
|
|
| logger.info("starting_atomic_save", {
|
| "operation_id": operation_id,
|
| "state_keys": list(state_data.keys()),
|
| "source_paths": source_paths
|
| })
|
|
|
| try:
|
|
|
| try:
|
| repo_info = self.api.repo_info(
|
| repo_id=self.repo_id,
|
| repo_type="dataset"
|
| )
|
| current_commit = repo_info.sha
|
| logger.info("current_commit_found", {"commit": current_commit})
|
| except RepositoryNotFoundError:
|
| current_commit = None
|
| logger.info("repository_not_found", {"action": "creating_new_repo"})
|
|
|
|
|
| backup_commit = self.create_backup(current_commit)
|
|
|
|
|
| with tempfile.TemporaryDirectory() as tmpdir:
|
| tmpdir_path = Path(tmpdir)
|
| state_dir = tmpdir_path / self.dataset_path
|
| state_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
| metadata = {
|
| "timestamp": datetime.now().isoformat(),
|
| "operation_id": operation_id,
|
| "checksum": None,
|
| "backup_commit": backup_commit,
|
| "state_data": state_data
|
| }
|
|
|
| metadata_path = state_dir / "metadata.json"
|
| with open(metadata_path, "w") as f:
|
| json.dump(metadata, f, indent=2)
|
|
|
|
|
| operations = [CommitOperationAdd(path_in_repo=f"state/metadata.json", path_or_fileobj=str(metadata_path))]
|
|
|
| for source_path in source_paths:
|
| source = Path(source_path)
|
| if source.exists():
|
| dest_path = state_dir / source.name
|
| shutil.copy2(source, dest_path)
|
|
|
|
|
| checksum = self.calculate_checksum(dest_path)
|
|
|
| operations.append(
|
| CommitOperationAdd(
|
| path_in_repo=f"state/{source.name}",
|
| path_or_fileobj=str(dest_path)
|
| )
|
| )
|
|
|
| logger.info("file_added", {
|
| "source": source_path,
|
| "checksum": checksum,
|
| "operation_id": operation_id
|
| })
|
|
|
|
|
| final_metadata = metadata.copy()
|
| final_metadata["checksum"] = hashlib.sha256(
|
| json.dumps(state_data, sort_keys=True).encode()
|
| ).hexdigest()
|
|
|
|
|
| with open(metadata_path, "w") as f:
|
| json.dump(final_metadata, f, indent=2)
|
|
|
|
|
| commit_info = self.api.create_commit(
|
| repo_id=self.repo_id,
|
| repo_type="dataset",
|
| operations=operations,
|
| commit_message=f"Atomic state update - {operation_id}",
|
| parent_commit=current_commit
|
| )
|
|
|
|
|
| self.cleanup_old_backups(commit_info.oid)
|
|
|
| result = {
|
| "success": True,
|
| "operation_id": operation_id,
|
| "commit_id": commit_info.oid,
|
| "backup_commit": backup_commit,
|
| "timestamp": datetime.now().isoformat(),
|
| "files_count": len(source_paths)
|
| }
|
|
|
| logger.info("atomic_save_completed", result)
|
| return result
|
|
|
| except Exception as e:
|
| error_result = {
|
| "success": False,
|
| "operation_id": operation_id,
|
| "error": str(e),
|
| "timestamp": datetime.now().isoformat()
|
| }
|
|
|
| logger.error("atomic_save_failed", error_result)
|
| raise Exception(f"Atomic save failed: {str(e)}")
|
|
|
| def main():
|
| """Main function for command line usage"""
|
| if len(sys.argv) < 3:
|
| print(json.dumps({
|
| "error": "Usage: python save_to_dataset_atomic.py <repo_id> <source_path1> [source_path2...]",
|
| "status": "error"
|
| }, indent=2))
|
| sys.exit(1)
|
|
|
| repo_id = sys.argv[1]
|
| source_paths = sys.argv[2:]
|
|
|
|
|
| for path in source_paths:
|
| if not os.path.exists(path):
|
| print(json.dumps({
|
| "error": f"Source path does not exist: {path}",
|
| "status": "error"
|
| }, indent=2))
|
| sys.exit(1)
|
|
|
| try:
|
|
|
| state_data = {
|
| "environment": "production",
|
| "version": "1.0.0",
|
| "platform": "huggingface-spaces",
|
| "timestamp": datetime.now().isoformat()
|
| }
|
|
|
| saver = AtomicDatasetSaver(repo_id)
|
| result = saver.save_state_atomic(state_data, source_paths)
|
|
|
| print(json.dumps(result, indent=2))
|
|
|
| except Exception as e:
|
| print(json.dumps({
|
| "error": str(e),
|
| "status": "error"
|
| }, indent=2))
|
| sys.exit(1)
|
|
|
| if __name__ == "__main__":
|
| main() |