File size: 2,215 Bytes
3d3526c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 | import os
import sys
import tarfile
import time
from huggingface_hub import HfApi, hf_hub_download
api = HfApi()
repo_id = os.getenv("HF_DATASET")
token = os.getenv("HF_TOKEN")
FILENAME = "latest_backup.tar.gz"
BACKUP_PATH = "/root/.openclaw"
def restore():
"""Restore data from Hugging Face Dataset on startup."""
if not repo_id or not token:
print("⚠️ Skip Restore: HF_DATASET or HF_TOKEN not set")
return False
try:
print(f"⬇️ Downloading {FILENAME} from {repo_id}...")
path = hf_hub_download(repo_id=repo_id, filename=FILENAME, repo_type="dataset", token=token)
if os.path.exists(path):
with tarfile.open(path, "r:gz") as tar:
tar.extractall(path=BACKUP_PATH)
print(f"✅ Success: Restored from {FILENAME}")
return True
except Exception as e:
print(f"ℹ️ Restore Note: No existing backup found or error: {e}")
return False
def backup():
"""Backup data to Hugging Face Dataset periodically."""
if not repo_id or not token:
return
try:
temp_tar = "/tmp/latest_backup.tar.gz"
paths_to_backup = [
"sessions",
"agents/main/sessions",
"openclaw.json",
"credentials",
"agents/main/SOUL.md",
"agents/main/USER.md",
"agents/main/IDENTITY.md"
]
with tarfile.open(temp_tar, "w:gz") as tar:
for p in paths_to_backup:
full_path = os.path.join(BACKUP_PATH, p)
if os.path.exists(full_path):
arcname = p
tar.add(full_path, arcname=arcname)
api.upload_file(
path_or_fileobj=temp_tar,
path_in_repo=FILENAME,
repo_id=repo_id,
repo_type="dataset",
token=token
)
print(f"✅ Backup {FILENAME} Success at {time.strftime('%X')}")
if os.path.exists(temp_tar):
os.remove(temp_tar)
except Exception as e:
print(f"❌ Backup Error: {e}")
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == "backup":
backup()
else:
restore() |