xiaoxiaxia / backup-manager.py
sharween's picture
Upload backup-manager.py with huggingface_hub
13abd1d verified
#!/usr/bin/env python3
"""
backup-manager.py — WebDAV + HF Dataset 双引擎备份/恢复
策略:
- 全量备份 (tar.gz): 每 24h → WebDAV + HF Dataset
- 增量备份 (SHA256 manifest): 每小时 → WebDAV (仅变更文件)
- 恢复: 优先 WebDAV (全量→增量叠加) → fallback HF Dataset
"""
import hashlib, json, os, tarfile, time, sys, copy
from pathlib import Path
import requests
# ── 配置 ──────────────────────────────────────────────────────────
STATE_DIR = os.environ.get("OPENCLAW_STATE_DIR", "/root/.openclaw")
WEBDAV_URL = os.environ.get("WEBDAV_URL", "").rstrip("/")
WEBDAV_USER = os.environ.get("WEBDAV_USERNAME", "")
WEBDAV_PASS = os.environ.get("WEBDAV_PASSWORD", "")
WEBDAV_PATH = os.environ.get("WEBDAV_BASE_PATH", "openclaw-backup")
HF_REPO = os.environ.get("HF_DATASET", "")
HF_TOKEN = os.environ.get("HF_TOKEN", "")
# 备份周期(分钟)
BACKUP_INCREMENT_INTERVAL = int(os.environ.get("BACKUP_INCREMENT_INTERVAL", "60"))
BACKUP_FULL_INTERVAL = int(os.environ.get("BACKUP_FULL_INTERVAL", "1440"))
FULL_NAME = "openclaw-full.tar.gz"
MANIFEST_NAME = "_incremental_manifest.json"
# ── WebDAV 原始 HTTP 层 ──────────────────────────────────────────
def _wd_auth():
return (WEBDAV_USER, WEBDAV_PASS) if WEBDAV_USER else None
def _wd_url(path=""):
return f"{WEBDAV_URL}/{WEBDAV_PATH}/{path.lstrip('/')}"
def _wd_req(method, path="", **kwargs):
url = _wd_url(path)
resp = requests.request(method, url, auth=_wd_auth(), timeout=60, **kwargs)
resp.raise_for_status()
return resp
def wd_exists(path):
try:
_wd_req("PROPFIND", path)
return True
except Exception:
return False
def wd_upload(path, data):
return _wd_req("PUT", path, data=data)
def wd_download(path):
return _wd_req("GET", path).content
def wd_mkdir(parts):
"""Create parent directories via MKCOL."""
for i in range(1, len(parts) + 1):
p = "/".join(parts[:i])
try:
_wd_req("MKCOL", p)
except Exception:
pass
# ── HF Dataset 层 ────────────────────────────────────────────────
def _hf_upload(tarpath: str):
if not HF_REPO or not HF_TOKEN:
return
from huggingface_hub import HfApi
api = HfApi()
with open(tarpath, "rb") as f:
api.upload_file(
path_or_fileobj=f,
path_in_repo=FULL_NAME,
repo_id=HF_REPO,
repo_type="dataset",
token=HF_TOKEN,
)
print(f"[backup] Full backup mirrored to HF Dataset ({HF_REPO})")
def _hf_download() -> str | None:
if not HF_REPO or not HF_TOKEN:
return None
try:
from huggingface_hub import hf_hub_download
return hf_hub_download(
repo_id=HF_REPO, filename=FULL_NAME,
repo_type="dataset", token=HF_TOKEN,
)
except Exception as e:
print(f"[restore] HF fallback unavailable: {e}")
return None
# ── 文件哈希 ──────────────────────────────────────────────────────
def _file_hash(path: str) -> str:
h = hashlib.sha256()
with open(path, "rb") as f:
while True:
chunk = f.read(65536)
if not chunk:
break
h.update(chunk)
return h.hexdigest()
# ── Manifest (增量索引) ──────────────────────────────────────────
def _load_manifest() -> dict:
try:
data = wd_download(MANIFEST_NAME)
return json.loads(data)
except Exception:
return {}
def _save_manifest(manifest: dict):
wd_upload(MANIFEST_NAME, json.dumps(manifest, indent=2).encode())
# ── 全量备份 ──────────────────────────────────────────────────────
def full_backup():
tarpath = f"/tmp/{FULL_NAME}"
with tarfile.open(tarpath, "w:gz") as tar:
root = Path(STATE_DIR)
if root.exists():
for item in root.iterdir():
if item.exists():
tar.add(str(item), arcname=item.name)
size = os.path.getsize(tarpath)
print(f"[backup] Full archive created ({size} bytes)")
# Upload to WebDAV
if WEBDAV_URL:
wd_mkdir([])
with open(tarpath, "rb") as f:
wd_upload(FULL_NAME, f.read())
print(f"[backup] Full backup uploaded to WebDAV")
# Mirror to HF Dataset
_hf_upload(tarpath)
os.remove(tarpath)
# ── 增量备份 ──────────────────────────────────────────────────────
def incremental_backup() -> int:
root = Path(STATE_DIR)
if not root.exists():
return 0
manifest = _load_manifest()
changed = 0
for fpath in root.rglob("*"):
if not fpath.is_file():
continue
rel = str(fpath.relative_to(root))
if rel.startswith(".") or rel == MANIFEST_NAME or rel.startswith("_incremental"):
continue
cur_h = _file_hash(str(fpath))
prev = manifest.get(rel, {})
if cur_h != prev.get("sha256"):
parts = ["files", *rel.split("/")]
wd_mkdir(parts[:-1])
wd_upload("/".join(parts), fpath.read_bytes())
manifest[rel] = {
"sha256": cur_h,
"mtime": fpath.stat().st_mtime,
"size": fpath.stat().st_size,
}
changed += 1
if changed:
_save_manifest(manifest)
return changed
# ── 恢复 ──────────────────────────────────────────────────────────
def restore():
"""Restore: WebDAV primary (full → incremental) → HF Dataset fallback."""
root = Path(STATE_DIR)
root.mkdir(parents=True, exist_ok=True)
restored = False
# Strategy 1: WebDAV full backup + incremental overrides
if WEBDAV_URL and wd_exists(FULL_NAME):
print("[restore] Downloading full backup from WebDAV...")
data = wd_download(FULL_NAME)
tarpath = f"/tmp/{FULL_NAME}"
with open(tarpath, "wb") as f:
f.write(data)
with tarfile.open(tarpath, "r:gz") as tar:
tar.extractall(path=STATE_DIR)
os.remove(tarpath)
# Apply incremental overrides
try:
manifest = _load_manifest()
count = 0
for rel, meta in manifest.items():
p = root / rel
p.parent.mkdir(parents=True, exist_ok=True)
try:
data = wd_download(f"files/{rel}")
p.write_bytes(data)
count += 1
except Exception:
pass
print(f"[restore] Applied {count} incremental file overrides")
except Exception:
print("[restore] No incremental manifest found (clean start)")
print("[restore] Restore from WebDAV complete")
restored = True
# Strategy 2: HF Dataset fallback
if not restored:
print("[restore] WebDAV unavailable, trying HF Dataset fallback...")
path = _hf_download()
if path:
with tarfile.open(path, "r:gz") as tar:
tar.extractall(path=STATE_DIR)
print("[restore] Restore from HF Dataset complete")
restored = True
if not restored:
print("[restore] No backup found — fresh start")
# ── 调度器 ────────────────────────────────────────────────────────
def scheduler_loop():
from datetime import datetime, timedelta
last_full: datetime | None = None
inc_interval = BACKUP_INCREMENT_INTERVAL
full_interval = BACKUP_FULL_INTERVAL
while True:
time.sleep(inc_interval * 60)
c = incremental_backup()
print(f"[scheduler] Incremental: {c} files changed")
now = datetime.now()
if last_full is None or (now - last_full).total_seconds() / 60 >= full_interval:
full_backup()
last_full = now
# ── CLI ───────────────────────────────────────────────────────────
if __name__ == "__main__":
cmd = sys.argv[1] if len(sys.argv) > 1 else "restore"
if cmd == "restore":
restore()
elif cmd == "incremental":
c = incremental_backup()
print(f"[backup] Incremental: {c} files changed")
elif cmd == "full":
full_backup()
elif cmd == "scheduler":
scheduler_loop()
else:
print(f"Usage: {sys.argv[0]} {{restore|incremental|full|scheduler}}")
sys.exit(1)