xiaoxiaxia / backup-manager.py
Claude
fix: restore priority HF Dataset first, WebDAV fallback
e27fcc9
#!/usr/bin/env python3
"""
G2: WebDAV + HF Dataset 双备份管理器
功能: 全量备份、增量备份、恢复、调度器
"""
import os
import sys
import json
import hashlib
import tarfile
import io
import time
import schedule
import threading
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import requests
from huggingface_hub import HfApi, hf_hub_download, upload_file, create_repo
# G2.1: WebDAV 通信层
class WebDAVClient:
"""WebDAV 客户端 - 实现 PUT/GET/PROPFIND/MKCOL"""
def __init__(self, base_url: str, username: str, password: str):
self.base_url = base_url.rstrip('/')
self.auth = (username, password)
self.session = requests.Session()
self.session.auth = self.auth
def _url(self, path: str) -> str:
"""构建完整URL"""
path = path.lstrip('/')
return f"{self.base_url}/{path}"
def mkcol(self, path: str) -> bool:
"""创建目录 (MKCOL)"""
try:
resp = self.session.request("MKCOL", self._url(path))
return resp.status_code in [201, 207, 405] # 405 = 已存在
except Exception as e:
print(f"[WebDAV] MKCOL error: {e}")
return False
def put(self, path: str, data: bytes) -> bool:
"""上传文件 (PUT)"""
try:
resp = self.session.put(self._url(path), data=data)
return resp.status_code in [201, 204]
except Exception as e:
print(f"[WebDAV] PUT error: {e}")
return False
def get(self, path: str) -> Optional[bytes]:
"""下载文件 (GET)"""
try:
resp = self.session.get(self._url(path))
if resp.status_code == 200:
return resp.content
return None
except Exception as e:
print(f"[WebDAV] GET error: {e}")
return None
def propfind(self, path: str) -> List[Dict]:
"""列出目录内容 (PROPFIND)"""
try:
headers = {"Depth": "1"}
resp = self.session.request("PROPFIND", self._url(path), headers=headers)
if resp.status_code == 207:
# 简单解析XML响应
import xml.etree.ElementTree as ET
root = ET.fromstring(resp.content)
items = []
for response in root.findall(".//{DAV:}response"):
href = response.find("{DAV:}href")
if href is not None:
items.append({"href": href.text})
return items
return []
except Exception as e:
print(f"[WebDAV] PROPFIND error: {e}")
return []
def exists(self, path: str) -> bool:
"""检查文件/目录是否存在"""
try:
resp = self.session.request("PROPFIND", self._url(path), headers={"Depth": "0"})
return resp.status_code == 207
except:
return False
def delete(self, path: str) -> bool:
"""Delete a file"""
try:
resp = self.session.delete(self._url(path))
return resp.status_code in [200, 204, 404]
except Exception as e:
print(f"[WebDAV] DELETE error: {e}")
return False
# G2.2/G2.3: 备份管理器
class BackupManager:
"""备份管理器 - WebDAV + HF Dataset 双备份"""
def __init__(self):
self.state_dir = os.environ.get("OPENCLAW_STATE_DIR", "/root/.openclaw")
self.backup_dir = f"{self.state_dir}/backups"
self.manifest_file = f"{self.backup_dir}/manifest.json"
# WebDAV 配置
self.webdav_url = os.environ.get("WEBDAV_URL", "")
self.webdav_user = os.environ.get("WEBDAV_USERNAME", "")
self.webdav_pass = os.environ.get("WEBDAV_PASSWORD", "")
self.webdav_path = os.environ.get("WEBDAV_BACKUP_PATH", "/openclaw-backups")
self.webdav_interval = int(os.environ.get("BACKUP_WEBDAV_INTERVAL", "1440")) # minutes, default 24h
# HF Dataset 配置
self.hf_token = os.environ.get("HF_TOKEN", "")
self.hf_repo = os.environ.get("HF_DATASET", "")
self.hf_interval = int(os.environ.get("BACKUP_HF_INTERVAL", "10")) # minutes, default 10min
self.webdav: Optional[WebDAVClient] = None
self.hf_api: Optional[HfApi] = None
self._init_clients()
os.makedirs(self.backup_dir, exist_ok=True)
def _init_clients(self):
"""初始化客户端"""
# WebDAV
if self.webdav_url and self.webdav_user:
self.webdav = WebDAVClient(self.webdav_url, self.webdav_user, self.webdav_pass)
print(f"[Backup] WebDAV client initialized: {self.webdav_url}")
# HF
if self.hf_token:
self.hf_api = HfApi(token=self.hf_token)
print(f"[Backup] HF API initialized")
def _sha256_file(self, filepath: str) -> str:
"""计算文件SHA256"""
sha256 = hashlib.sha256()
with open(filepath, 'rb') as f:
for chunk in iter(lambda: f.read(8192), b''):
sha256.update(chunk)
return sha256.hexdigest()
def _load_manifest(self) -> Dict:
"""加载备份清单"""
if os.path.exists(self.manifest_file):
with open(self.manifest_file, 'r') as f:
return json.load(f)
return {"files": {}, "last_full_backup": None}
def _save_manifest(self, manifest: Dict):
"""保存备份清单"""
with open(self.manifest_file, 'w') as f:
json.dump(manifest, f, indent=2)
def _get_state_files(self) -> List[Tuple[str, str, str]]:
"""获取所有状态文件列表 (路径, 相对路径, sha256)"""
files = []
state_path = Path(self.state_dir)
for filepath in state_path.rglob("*"):
if filepath.is_file():
rel_path = str(filepath.relative_to(state_path))
# 跳过备份目录自身
if rel_path.startswith("backups/"):
continue
sha256 = self._sha256_file(str(filepath))
files.append((str(filepath), rel_path, sha256))
return files
def _create_tar_gz(self, files: List[Tuple[str, str, str]]) -> bytes:
"""创建 tar.gz 归档"""
buffer = io.BytesIO()
with tarfile.open(fileobj=buffer, mode="w:gz") as tar:
for full_path, rel_path, _ in files:
tar.add(full_path, arcname=rel_path)
return buffer.getvalue()
# G2.2: 全量备份
def full_backup(self) -> bool:
"""执行全量备份"""
print("[Backup] Starting full backup...")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# 收集所有文件
files = self._get_state_files()
if not files:
print("[Backup] No files to backup")
return True
# 创建归档
archive_name = f"openclaw_full_{timestamp}.tar.gz"
archive_data = self._create_tar_gz(files)
# 更新清单
manifest = self._load_manifest()
for _, rel_path, sha256 in files:
manifest["files"][rel_path] = sha256
manifest["last_full_backup"] = timestamp
self._save_manifest(manifest)
success = True
# 备份到 WebDAV
if self.webdav:
print(f"[Backup] Uploading to WebDAV: {archive_name}")
self.webdav.mkcol(self.webdav_path)
if self.webdav.put(f"{self.webdav_path}/{archive_name}", archive_data):
print(f"[Backup] WebDAV upload successful")
else:
print(f"[Backup] WebDAV upload failed")
success = False
# Keep only latest 10 WebDAV backups
try:
items = self.webdav.propfind(self.webdav_path)
full_backups = sorted([i for i in items if "openclaw_full_" in i.get("href", "")])
while len(full_backups) > 10:
oldest = full_backups.pop(0)
oldest_name = oldest.split("/")[-1]
self.webdav.delete(f"{self.webdav_path}/{oldest_name}")
print(f"[Backup] Cleaned old WebDAV backup: {oldest_name}")
except Exception as e:
print(f"[Backup] WebDAV cleanup error: {e}")
# 备份到 HF Dataset
if self.hf_api and self.hf_repo:
print(f"[Backup] Uploading to HF Dataset: openclaw_full_latest.tar.gz")
try:
# 确保repo存在
try:
self.hf_api.repo_info(repo_id=self.hf_repo, repo_type="dataset")
except:
create_repo(self.hf_repo, repo_type="dataset", token=self.hf_token, private=True)
# 上传文件
upload_file(
path_or_fileobj=io.BytesIO(archive_data),
path_in_repo="openclaw_full_latest.tar.gz",
repo_id=self.hf_repo,
repo_type="dataset",
token=self.hf_token
)
print(f"[Backup] HF Dataset upload successful")
except Exception as e:
print(f"[Backup] HF Dataset upload failed: {e}")
success = False
# 保存清单
manifest_data = json.dumps(manifest, indent=2).encode()
manifest_name = f"manifest_{timestamp}.json"
if self.webdav:
self.webdav.put(f"{self.webdav_path}/{manifest_name}", manifest_data)
if self.hf_api and self.hf_repo:
upload_file(
path_or_fileobj=io.BytesIO(manifest_data),
path_in_repo=manifest_name,
repo_id=self.hf_repo,
repo_type="dataset",
token=self.hf_token
)
print(f"[Backup] Full backup completed: {timestamp}")
return success
# G2.3: 增量备份
def incremental_backup(self) -> bool:
"""执行增量备份 - SHA256对比"""
print("[Backup] Starting incremental backup...")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# 加载清单
manifest = self._load_manifest()
# 找出变更的文件
changed_files = []
current_files = self._get_state_files()
for full_path, rel_path, sha256 in current_files:
if manifest["files"].get(rel_path) != sha256:
changed_files.append((full_path, rel_path, sha256))
if not changed_files:
print("[Backup] No changes detected")
return True
print(f"[Backup] {len(changed_files)} files changed")
# 创建增量归档
archive_name = f"openclaw_incr_{timestamp}.tar.gz"
archive_data = self._create_tar_gz(changed_files)
# 更新清单
for _, rel_path, sha256 in changed_files:
manifest["files"][rel_path] = sha256
self._save_manifest(manifest)
success = True
# 上传增量备份
if self.webdav:
print(f"[Backup] Uploading incremental to WebDAV: {archive_name}")
if self.webdav.put(f"{self.webdav_path}/{archive_name}", archive_data):
print(f"[Backup] WebDAV incremental upload successful")
else:
success = False
# Keep only latest 10 WebDAV incremental backups
try:
items = self.webdav.propfind(self.webdav_path)
incr_backups = sorted([i for i in items if "openclaw_incr_" in i.get("href", "")])
while len(incr_backups) > 10:
oldest = incr_backups.pop(0)
oldest_name = oldest.split("/")[-1]
self.webdav.delete(f"{self.webdav_path}/{oldest_name}")
print(f"[Backup] Cleaned old WebDAV incremental backup: {oldest_name}")
except Exception as e:
print(f"[Backup] WebDAV cleanup error: {e}")
if self.hf_api and self.hf_repo:
print(f"[Backup] Uploading incremental to HF Dataset: openclaw_incr_latest.tar.gz")
try:
upload_file(
path_or_fileobj=io.BytesIO(archive_data),
path_in_repo="openclaw_incr_latest.tar.gz",
repo_id=self.hf_repo,
repo_type="dataset",
token=self.hf_token
)
print(f"[Backup] HF Dataset incremental upload successful")
except Exception as e:
print(f"[Backup] HF Dataset incremental upload failed: {e}")
success = False
print(f"[Backup] Incremental backup completed: {timestamp}")
return success
# G2.4: 恢复
def restore(self) -> bool:
"""恢复备份 - 优先HF Dataset,fallback WebDAV"""
print("[Backup] Starting restore...")
# 优先从 HF Dataset 恢复
if self.hf_api and self.hf_repo:
print("[Backup] Trying HF Dataset restore...")
if self._restore_from_hf():
return True
# Fallback 到 WebDAV
if self.webdav:
print("[Backup] Trying WebDAV restore...")
if self._restore_from_webdav():
return True
print("[Backup] No backup found to restore")
return False
def _restore_from_webdav(self) -> bool:
"""从 WebDAV 恢复"""
try:
# 列出备份文件
items = self.webdav.propfind(self.webdav_path)
backups = [i for i in items if "openclaw_full_" in i.get("href", "")]
if not backups:
return False
# 找到最新的全量备份
latest = sorted(backups)[-1]
backup_name = latest.split("/")[-1]
print(f"[Backup] Found WebDAV backup: {backup_name}")
# 下载全量备份
backup_data = self.webdav.get(f"{self.webdav_path}/{backup_name}")
if not backup_data:
return False
# 解压
self._extract_tar_gz(backup_data)
# 查找并应用增量备份
self._apply_incremental_webdav()
print("[Backup] WebDAV restore completed")
return True
except Exception as e:
print(f"[Backup] WebDAV restore error: {e}")
return False
def _restore_from_hf(self) -> bool:
"""从 HF Dataset 恢复"""
try:
# 列出repo中的文件
files = self.hf_api.list_repo_files(self.hf_repo, repo_type="dataset")
backups = [f for f in files if f.startswith("openclaw_full_latest")]
if not backups:
return False
# 找到最新的全量备份
backup_name = sorted(backups)[-1]
print(f"[Backup] Found HF backup: {backup_name}")
# 下载
local_path = hf_hub_download(
repo_id=self.hf_repo,
filename=backup_name,
repo_type="dataset",
token=self.hf_token,
local_dir=self.backup_dir
)
# 读取并解压
with open(local_path, 'rb') as f:
backup_data = f.read()
self._extract_tar_gz(backup_data)
# 应用增量备份
self._apply_incremental_hf()
print("[Backup] HF Dataset restore completed")
return True
except Exception as e:
print(f"[Backup] HF restore error: {e}")
return False
def _extract_tar_gz(self, data: bytes):
"""解压 tar.gz 到状态目录"""
buffer = io.BytesIO(data)
with tarfile.open(fileobj=buffer, mode="r:gz") as tar:
tar.extractall(path=self.state_dir)
print(f"[Backup] Extracted to {self.state_dir}")
def _apply_incremental_webdav(self):
"""应用 WebDAV 增量备份"""
try:
items = self.webdav.propfind(self.webdav_path)
incr_backups = sorted([i for i in items if "openclaw_incr_" in i.get("href", "")])
for item in incr_backups:
backup_name = item.split("/")[-1]
print(f"[Backup] Applying incremental: {backup_name}")
data = self.webdav.get(f"{self.webdav_path}/{backup_name}")
if data:
self._extract_tar_gz(data)
except Exception as e:
print(f"[Backup] Apply incremental error: {e}")
def _apply_incremental_hf(self):
"""应用 HF Dataset 增量备份"""
try:
files = self.hf_api.list_repo_files(self.hf_repo, repo_type="dataset")
incr_backups = sorted([f for f in files if f.startswith("openclaw_incr_latest")])
for backup_name in incr_backups:
print(f"[Backup] Applying incremental: {backup_name}")
local_path = hf_hub_download(
repo_id=self.hf_repo,
filename=backup_name,
repo_type="dataset",
token=self.hf_token,
local_dir=self.backup_dir
)
with open(local_path, 'rb') as f:
self._extract_tar_gz(f.read())
except Exception as e:
print(f"[Backup] Apply incremental error: {e}")
# G2.5: 调度器
def scheduler(self):
"""后台调度器 - 按配置间隔执行备份"""
print(f"[Backup] Starting scheduler: WebDAV every {self.webdav_interval}min, HF every {self.hf_interval}min")
# WebDAV: full backup at configured interval
if self.webdav:
schedule.every(self.webdav_interval).minutes.do(self.full_backup)
# HF Dataset: incremental at configured interval
if self.hf_api and self.hf_repo:
schedule.every(self.hf_interval).minutes.do(self.incremental_backup)
while True:
schedule.run_pending()
time.sleep(60)
def run_scheduler(self):
"""在后台线程运行调度器"""
thread = threading.Thread(target=self.scheduler, daemon=True)
thread.start()
return thread
def main():
"""命令行入口"""
if len(sys.argv) < 2:
print("Usage: backup-manager.py [full|incremental|restore|scheduler]")
sys.exit(1)
cmd = sys.argv[1]
manager = BackupManager()
if cmd == "full":
manager.full_backup()
elif cmd == "incremental":
manager.incremental_backup()
elif cmd == "restore":
manager.restore()
elif cmd == "scheduler":
manager.scheduler()
else:
print(f"Unknown command: {cmd}")
sys.exit(1)
if __name__ == "__main__":
main()