| import os |
| import sys |
| import time |
| import shutil |
| import zipfile |
| from pathlib import Path |
| from huggingface_hub import HfApi, hf_hub_download, list_repo_files |
|
|
| |
| |
| |
| api = HfApi() |
| REPO_ID = os.getenv("HF_DATASET") |
| TOKEN = os.getenv("HF_TOKEN") |
|
|
| BASE_DIR = Path("/root/.openclaw") |
| PREFIX = "openclaw_backup_" |
| CONFIG_FILE_NAME = "openclaw.json" |
|
|
| KEEP_LAST = 5 |
|
|
| |
| |
| |
| def log(msg): |
| print(f"[{time.strftime('%H:%M:%S')}] {msg}", flush=True) |
|
|
| def should_ignore(path): |
| ignore = ['.lock', '.tmp', '.wal', '.shm'] |
| s = str(path) |
| return any(x in s or s.endswith(x) for x in ignore) |
|
|
| |
| |
| |
| def create_zip(zip_path): |
| |
| all_files = [] |
| for root, _, files in os.walk(BASE_DIR): |
| for f in files: |
| p = Path(root) / f |
| rel = p.relative_to(BASE_DIR) |
|
|
| if should_ignore(rel): |
| continue |
|
|
| all_files.append((p, rel)) |
|
|
| total = len(all_files) |
| if total == 0: |
| log("⚠️ No files to zip") |
| return |
|
|
| log(f"📦 Zipping {total} files...") |
|
|
| with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as z: |
| for i, (p, rel) in enumerate(all_files, 1): |
| z.write(p, rel) |
|
|
| |
| if i % max(1, total // 50) == 0 or i == total: |
| percent = (i / total) * 100 |
| print(f"\r📊 Progress: {i}/{total} ({percent:.1f}%)", end="", flush=True) |
|
|
| print() |
|
|
| |
| |
| |
| def backup(): |
| if not REPO_ID or not TOKEN: |
| log("❌ Missing config") |
| return False |
|
|
| if not BASE_DIR.exists(): |
| log("⚠️ Nothing to backup") |
| return False |
|
|
| timestamp = time.strftime("%Y%m%d_%H%M%S") |
| zip_name = f"{PREFIX}{timestamp}.zip" |
| zip_path = Path(f"/tmp/{zip_name}") |
|
|
| try: |
| log(f"📦 Creating {zip_name}") |
| create_zip(zip_path) |
|
|
| log("📤 Uploading...") |
| api.upload_file( |
| path_or_fileobj=str(zip_path), |
| path_in_repo=zip_name, |
| repo_id=REPO_ID, |
| repo_type="dataset", |
| token=TOKEN, |
| commit_message=f"Full backup {timestamp}" |
| ) |
|
|
| log(f"✅ Uploaded: {zip_name}") |
|
|
| cleanup_old_backups() |
| return True |
|
|
| finally: |
| if zip_path.exists(): |
| zip_path.unlink() |
|
|
| |
| |
| |
| def cleanup_old_backups(): |
| contents = list_repo_files(REPO_ID, repo_type="dataset", token=TOKEN) |
|
|
| backups = sorted([ |
| f for f in contents |
| if f.startswith(PREFIX) and f.endswith(".zip") |
| ]) |
|
|
| if len(backups) <= KEEP_LAST: |
| return |
|
|
| for old in backups[:-KEEP_LAST]: |
| try: |
| api.delete_file( |
| path_in_repo=old, |
| repo_id=REPO_ID, |
| repo_type="dataset", |
| token=TOKEN |
| ) |
| log(f"🗑️ Deleted: {old}") |
| except Exception as e: |
| log(f"⚠️ delete fail {old}: {e}") |
|
|
| |
| |
| |
| def restore(): |
| if not REPO_ID or not TOKEN: |
| log("❌ Missing config") |
| return False |
| |
| contents = list_repo_files(REPO_ID, repo_type="dataset", token=TOKEN) |
| |
| |
| backups = sorted([f for f in contents if f.startswith(PREFIX) and f.endswith(".zip")]) |
| if backups: |
| latest = backups[-1] |
| log(f"📥 Step 1: Restoring full backup from {latest}") |
| zip_path = hf_hub_download( |
| repo_id=REPO_ID, filename=latest, repo_type="dataset", token=TOKEN |
| ) |
| if BASE_DIR.exists(): |
| shutil.rmtree(BASE_DIR) |
| BASE_DIR.mkdir(parents=True) |
| log("📦 Extracting full backup...") |
| with zipfile.ZipFile(zip_path, "r") as z: |
| z.extractall(BASE_DIR) |
| log("✅ Full backup restored.") |
| else: |
| log("⚠️ No full backup found. Skipping Step 1.") |
| BASE_DIR.mkdir(parents=True, exist_ok=True) |
|
|
| |
| config_exists_in_repo = CONFIG_FILE_NAME in contents |
| if config_exists_in_repo: |
| log(f"📥 Step 2: Restoring openclaw.json from Dataset") |
| try: |
| config_path = hf_hub_download( |
| repo_id=REPO_ID, filename=CONFIG_FILE_NAME, repo_type="dataset", token=TOKEN |
| ) |
| shutil.copy(config_path, BASE_DIR / CONFIG_FILE_NAME) |
| log("✅ Config file restored.") |
| except Exception as e: |
| log(f"⚠️ Failed to restore config: {e}") |
| else: |
| log("⚠️ No separate openclaw.json found in Dataset. Step 2 skipped.") |
|
|
| return True |
| |
| |
| |
| |
| def backup_loop_every_1hour_at_minute_00(): |
| log("🔄 Backup every hour at :00") |
|
|
| while True: |
| now = time.localtime() |
| wait_sec = (60 - now.tm_min) * 60 - now.tm_sec |
|
|
| log(f"⏳ Waiting {wait_sec}s...") |
| time.sleep(wait_sec) |
|
|
| backup() |
|
|
| |
| |
| |
| def backup_loop_5min(): |
| log("🧪 TEST MODE: every 5 minutes") |
|
|
| while True: |
| backup() |
| log("⏳ Waiting 5 minutes...") |
| time.sleep(300) |
|
|
| |
| |
| |
| if __name__ == "__main__": |
| action = sys.argv[1].lower() if len(sys.argv) > 1 else "restore" |
|
|
| if action == "backup": |
| backup_loop_every_1hour_at_minute_00() |
| |
| else: |
| restore() |