Spaces:
Runtime error
Runtime error
| import os | |
| import sys | |
| import time | |
| import shutil | |
| import zipfile | |
| from pathlib import Path | |
| from huggingface_hub import HfApi, hf_hub_download, list_repo_files | |
| # ========================= | |
| # CONFIG | |
| # ========================= | |
| api = HfApi() | |
| REPO_ID = os.getenv("HF_DATASET") | |
| TOKEN = os.getenv("HF_TOKEN") | |
| BASE_DIR = Path("/root/.openclaw") | |
| PREFIX = "openclawai_backup_" | |
| CONFIG_FILE_NAME = "openclaw.json" | |
| KEEP_LAST = 5 | |
| # ========================= | |
| # UTILS | |
| # ========================= | |
| def log(msg): | |
| print(f"[{time.strftime('%H:%M:%S')}] {msg}", flush=True) | |
| def should_ignore(path): | |
| ignore = ['.lock', '.tmp', '.wal', '.shm', '__pycache__', '.pyc', '.pyo', | |
| '*.jsonl', '*.bak*', '*clobbered*', '*.trajectory.jsonl' ] | |
| s = str(path) | |
| return any(x in s or s.endswith(x) for x in ignore) | |
| # ========================= | |
| # CREATE ZIP | |
| # ========================= | |
| def create_zip(zip_path): | |
| # เก็บ list ไฟล์ทั้งหมดก่อน | |
| all_files = [] | |
| for root, _, files in os.walk(BASE_DIR): | |
| for f in files: | |
| p = Path(root) / f | |
| rel = p.relative_to(BASE_DIR) | |
| if should_ignore(rel): | |
| continue | |
| all_files.append((p, rel)) | |
| total = len(all_files) | |
| if total == 0: | |
| log("⚠️ No files to zip") | |
| return | |
| log(f"📦 Zipping {total} files...") | |
| with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as z: | |
| for i, (p, rel) in enumerate(all_files, 1): | |
| z.write(p, rel) | |
| # progress ทุก 2% | |
| if i % max(1, total // 50) == 0 or i == total: | |
| percent = (i / total) * 100 | |
| print(f"\r📊 Progress: {i}/{total} ({percent:.1f}%)", end="", flush=True) | |
| print() # newline หลังจบ | |
| # ========================= | |
| # BACKUP | |
| # ========================= | |
| def backup(): | |
| if not REPO_ID or not TOKEN: | |
| log("❌ Missing config") | |
| return False | |
| if not BASE_DIR.exists(): | |
| log("⚠️ Nothing to backup") | |
| return False | |
| timestamp = time.strftime("%Y%m%d_%H%M%S") | |
| zip_name = f"{PREFIX}{timestamp}.zip" | |
| zip_path = Path(f"/tmp/{zip_name}") | |
| try: | |
| log(f"📦 Creating {zip_name}") | |
| create_zip(zip_path) | |
| log("📤 Uploading...") | |
| api.upload_file( | |
| path_or_fileobj=str(zip_path), | |
| path_in_repo=zip_name, | |
| repo_id=REPO_ID, | |
| repo_type="dataset", | |
| token=TOKEN, | |
| commit_message=f"Full backup {timestamp}" | |
| ) | |
| log(f"✅ Uploaded: {zip_name}") | |
| cleanup_old_backups() | |
| return True | |
| finally: | |
| if zip_path.exists(): | |
| zip_path.unlink() | |
| # ========================= | |
| # CLEANUP | |
| # ========================= | |
| def cleanup_old_backups(): | |
| contents = list_repo_files(REPO_ID, repo_type="dataset", token=TOKEN) | |
| backups = sorted([ | |
| f for f in contents | |
| if f.startswith(PREFIX) and f.endswith(".zip") | |
| ]) | |
| if len(backups) <= KEEP_LAST: | |
| return | |
| for old in backups[:-KEEP_LAST]: | |
| try: | |
| api.delete_file( | |
| path_in_repo=old, | |
| repo_id=REPO_ID, | |
| repo_type="dataset", | |
| token=TOKEN | |
| ) | |
| log(f"🗑️ Deleted: {old}") | |
| except Exception as e: | |
| log(f"⚠️ delete fail {old}: {e}") | |
| # ========================= | |
| # RESTORE | |
| # ========================= | |
| def restore(): | |
| if not REPO_ID or not TOKEN: | |
| log("❌ Missing config") | |
| return False | |
| contents = list_repo_files(REPO_ID, repo_type="dataset", token=TOKEN) | |
| # 1. Restore Full Backup (ZIP) | |
| backups = sorted([f for f in contents if f.startswith(PREFIX) and f.endswith(".zip")]) | |
| if backups: | |
| latest = backups[-1] | |
| log(f"📥 Step 1: Restoring full backup from {latest}") | |
| zip_path = hf_hub_download( | |
| repo_id=REPO_ID, filename=latest, repo_type="dataset", token=TOKEN | |
| ) | |
| if BASE_DIR.exists(): | |
| shutil.rmtree(BASE_DIR) | |
| BASE_DIR.mkdir(parents=True) | |
| log("📦 Extracting full backup...") | |
| with zipfile.ZipFile(zip_path, "r") as z: | |
| z.extractall(BASE_DIR) | |
| log("✅ Full backup restored.") | |
| else: | |
| log("⚠️ No full backup found. Skipping Step 1.") | |
| BASE_DIR.mkdir(parents=True, exist_ok=True) | |
| # 2. Restore openclaw.json (แยกไฟล์) | |
| config_exists_in_repo = CONFIG_FILE_NAME in contents | |
| if config_exists_in_repo: | |
| log(f"📥 Step 2: Restoring openclaw.json from Dataset") | |
| try: | |
| config_path = hf_hub_download( | |
| repo_id=REPO_ID, filename=CONFIG_FILE_NAME, repo_type="dataset", token=TOKEN | |
| ) | |
| shutil.copy(config_path, BASE_DIR / CONFIG_FILE_NAME) | |
| log("✅ Config file restored.") | |
| except Exception as e: | |
| log(f"⚠️ Failed to restore config: {e}") | |
| else: | |
| log("⚠️ No separate openclaw.json found in Dataset. Step 2 skipped.") | |
| return True | |
| # ========================= | |
| # LOOP: ทุกชั่วโมงที่ :00 | |
| # ========================= | |
| def backup_loop_every_1hour_at_minute_00(): | |
| log("🔄 Backup every hour at XX:00") | |
| while True: | |
| now = time.localtime() | |
| wait_sec = (60 - now.tm_min) * 60 - now.tm_sec | |
| log(f"⏳ Waiting {wait_sec}s...") | |
| time.sleep(wait_sec) | |
| backup() | |
| # ========================= | |
| # LOOP: TEST 5 MIN | |
| # ========================= | |
| def backup_loop_5min(): | |
| log("🧪 TEST MODE: every 5 minutes") | |
| while True: | |
| backup() | |
| log("⏳ Waiting 5 minutes...") | |
| time.sleep(300) | |
| # ========================= | |
| # MAIN | |
| # ========================= | |
| if __name__ == "__main__": | |
| action = sys.argv[1].lower() if len(sys.argv) > 1 else "restore" | |
| if action == "backup": | |
| backup_loop_every_1hour_at_minute_00() | |
| # backup_loop_5min() | |
| else: | |
| restore() |