#!/usr/bin/env python3 """ backup-manager.py — WebDAV + HF Dataset 双引擎备份/恢复 策略: - 全量备份 (tar.gz): 每 24h → WebDAV + HF Dataset - 增量备份 (SHA256 manifest): 每小时 → WebDAV (仅变更文件) - 恢复: 优先 WebDAV (全量→增量叠加) → fallback HF Dataset """ import hashlib, json, os, tarfile, time, sys, copy from pathlib import Path import requests # ── 配置 ────────────────────────────────────────────────────────── STATE_DIR = os.environ.get("OPENCLAW_STATE_DIR", "/root/.openclaw") WEBDAV_URL = os.environ.get("WEBDAV_URL", "").rstrip("/") WEBDAV_USER = os.environ.get("WEBDAV_USERNAME", "") WEBDAV_PASS = os.environ.get("WEBDAV_PASSWORD", "") WEBDAV_PATH = os.environ.get("WEBDAV_BASE_PATH", "openclaw-backup") HF_REPO = os.environ.get("HF_DATASET", "") HF_TOKEN = os.environ.get("HF_TOKEN", "") # 备份周期(分钟) BACKUP_INCREMENT_INTERVAL = int(os.environ.get("BACKUP_INCREMENT_INTERVAL", "60")) BACKUP_FULL_INTERVAL = int(os.environ.get("BACKUP_FULL_INTERVAL", "1440")) FULL_NAME = "openclaw-full.tar.gz" MANIFEST_NAME = "_incremental_manifest.json" # ── WebDAV 原始 HTTP 层 ────────────────────────────────────────── def _wd_auth(): return (WEBDAV_USER, WEBDAV_PASS) if WEBDAV_USER else None def _wd_url(path=""): return f"{WEBDAV_URL}/{WEBDAV_PATH}/{path.lstrip('/')}" def _wd_req(method, path="", **kwargs): url = _wd_url(path) resp = requests.request(method, url, auth=_wd_auth(), timeout=60, **kwargs) resp.raise_for_status() return resp def wd_exists(path): try: _wd_req("PROPFIND", path) return True except Exception: return False def wd_upload(path, data): return _wd_req("PUT", path, data=data) def wd_download(path): return _wd_req("GET", path).content def wd_mkdir(parts): """Create parent directories via MKCOL.""" for i in range(1, len(parts) + 1): p = "/".join(parts[:i]) try: _wd_req("MKCOL", p) except Exception: pass # ── HF Dataset 层 ──────────────────────────────────────────────── def _hf_upload(tarpath: str): if not HF_REPO or not HF_TOKEN: return from huggingface_hub import HfApi api = HfApi() with open(tarpath, "rb") as f: api.upload_file( path_or_fileobj=f, path_in_repo=FULL_NAME, repo_id=HF_REPO, repo_type="dataset", token=HF_TOKEN, ) print(f"[backup] Full backup mirrored to HF Dataset ({HF_REPO})") def _hf_download() -> str | None: if not HF_REPO or not HF_TOKEN: return None try: from huggingface_hub import hf_hub_download return hf_hub_download( repo_id=HF_REPO, filename=FULL_NAME, repo_type="dataset", token=HF_TOKEN, ) except Exception as e: print(f"[restore] HF fallback unavailable: {e}") return None # ── 文件哈希 ────────────────────────────────────────────────────── def _file_hash(path: str) -> str: h = hashlib.sha256() with open(path, "rb") as f: while True: chunk = f.read(65536) if not chunk: break h.update(chunk) return h.hexdigest() # ── Manifest (增量索引) ────────────────────────────────────────── def _load_manifest() -> dict: try: data = wd_download(MANIFEST_NAME) return json.loads(data) except Exception: return {} def _save_manifest(manifest: dict): wd_upload(MANIFEST_NAME, json.dumps(manifest, indent=2).encode()) # ── 全量备份 ────────────────────────────────────────────────────── def full_backup(): tarpath = f"/tmp/{FULL_NAME}" with tarfile.open(tarpath, "w:gz") as tar: root = Path(STATE_DIR) if root.exists(): for item in root.iterdir(): if item.exists(): tar.add(str(item), arcname=item.name) size = os.path.getsize(tarpath) print(f"[backup] Full archive created ({size} bytes)") # Upload to WebDAV if WEBDAV_URL: wd_mkdir([]) with open(tarpath, "rb") as f: wd_upload(FULL_NAME, f.read()) print(f"[backup] Full backup uploaded to WebDAV") # Mirror to HF Dataset _hf_upload(tarpath) os.remove(tarpath) # ── 增量备份 ────────────────────────────────────────────────────── def incremental_backup() -> int: root = Path(STATE_DIR) if not root.exists(): return 0 manifest = _load_manifest() changed = 0 for fpath in root.rglob("*"): if not fpath.is_file(): continue rel = str(fpath.relative_to(root)) if rel.startswith(".") or rel == MANIFEST_NAME or rel.startswith("_incremental"): continue cur_h = _file_hash(str(fpath)) prev = manifest.get(rel, {}) if cur_h != prev.get("sha256"): parts = ["files", *rel.split("/")] wd_mkdir(parts[:-1]) wd_upload("/".join(parts), fpath.read_bytes()) manifest[rel] = { "sha256": cur_h, "mtime": fpath.stat().st_mtime, "size": fpath.stat().st_size, } changed += 1 if changed: _save_manifest(manifest) return changed # ── 恢复 ────────────────────────────────────────────────────────── def restore(): """Restore: WebDAV primary (full → incremental) → HF Dataset fallback.""" root = Path(STATE_DIR) root.mkdir(parents=True, exist_ok=True) restored = False # Strategy 1: WebDAV full backup + incremental overrides if WEBDAV_URL and wd_exists(FULL_NAME): print("[restore] Downloading full backup from WebDAV...") data = wd_download(FULL_NAME) tarpath = f"/tmp/{FULL_NAME}" with open(tarpath, "wb") as f: f.write(data) with tarfile.open(tarpath, "r:gz") as tar: tar.extractall(path=STATE_DIR) os.remove(tarpath) # Apply incremental overrides try: manifest = _load_manifest() count = 0 for rel, meta in manifest.items(): p = root / rel p.parent.mkdir(parents=True, exist_ok=True) try: data = wd_download(f"files/{rel}") p.write_bytes(data) count += 1 except Exception: pass print(f"[restore] Applied {count} incremental file overrides") except Exception: print("[restore] No incremental manifest found (clean start)") print("[restore] Restore from WebDAV complete") restored = True # Strategy 2: HF Dataset fallback if not restored: print("[restore] WebDAV unavailable, trying HF Dataset fallback...") path = _hf_download() if path: with tarfile.open(path, "r:gz") as tar: tar.extractall(path=STATE_DIR) print("[restore] Restore from HF Dataset complete") restored = True if not restored: print("[restore] No backup found — fresh start") # ── 调度器 ──────────────────────────────────────────────────────── def scheduler_loop(): from datetime import datetime, timedelta last_full: datetime | None = None inc_interval = BACKUP_INCREMENT_INTERVAL full_interval = BACKUP_FULL_INTERVAL while True: time.sleep(inc_interval * 60) c = incremental_backup() print(f"[scheduler] Incremental: {c} files changed") now = datetime.now() if last_full is None or (now - last_full).total_seconds() / 60 >= full_interval: full_backup() last_full = now # ── CLI ─────────────────────────────────────────────────────────── if __name__ == "__main__": cmd = sys.argv[1] if len(sys.argv) > 1 else "restore" if cmd == "restore": restore() elif cmd == "incremental": c = incremental_backup() print(f"[backup] Incremental: {c} files changed") elif cmd == "full": full_backup() elif cmd == "scheduler": scheduler_loop() else: print(f"Usage: {sys.argv[0]} {{restore|incremental|full|scheduler}}") sys.exit(1)