#!/usr/bin/env python3 """ G2: WebDAV + HF Dataset 双备份管理器 功能: 全量备份、增量备份、恢复、调度器 """ import os import sys import json import hashlib import tarfile import io import time import schedule import threading from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Tuple import requests from huggingface_hub import HfApi, hf_hub_download, upload_file, create_repo # G2.1: WebDAV 通信层 class WebDAVClient: """WebDAV 客户端 - 实现 PUT/GET/PROPFIND/MKCOL""" def __init__(self, base_url: str, username: str, password: str): self.base_url = base_url.rstrip('/') self.auth = (username, password) self.session = requests.Session() self.session.auth = self.auth def _url(self, path: str) -> str: """构建完整URL""" path = path.lstrip('/') return f"{self.base_url}/{path}" def mkcol(self, path: str) -> bool: """创建目录 (MKCOL)""" try: resp = self.session.request("MKCOL", self._url(path)) return resp.status_code in [201, 207, 405] # 405 = 已存在 except Exception as e: print(f"[WebDAV] MKCOL error: {e}") return False def put(self, path: str, data: bytes) -> bool: """上传文件 (PUT)""" try: resp = self.session.put(self._url(path), data=data) return resp.status_code in [201, 204] except Exception as e: print(f"[WebDAV] PUT error: {e}") return False def get(self, path: str) -> Optional[bytes]: """下载文件 (GET)""" try: resp = self.session.get(self._url(path)) if resp.status_code == 200: return resp.content return None except Exception as e: print(f"[WebDAV] GET error: {e}") return None def propfind(self, path: str) -> List[Dict]: """列出目录内容 (PROPFIND)""" try: headers = {"Depth": "1"} resp = self.session.request("PROPFIND", self._url(path), headers=headers) if resp.status_code == 207: # 简单解析XML响应 import xml.etree.ElementTree as ET root = ET.fromstring(resp.content) items = [] for response in root.findall(".//{DAV:}response"): href = response.find("{DAV:}href") if href is not None: items.append({"href": href.text}) return items return [] except Exception as e: print(f"[WebDAV] PROPFIND error: {e}") return [] def exists(self, path: str) -> bool: """检查文件/目录是否存在""" try: resp = self.session.request("PROPFIND", self._url(path), headers={"Depth": "0"}) return resp.status_code == 207 except: return False def delete(self, path: str) -> bool: """Delete a file""" try: resp = self.session.delete(self._url(path)) return resp.status_code in [200, 204, 404] except Exception as e: print(f"[WebDAV] DELETE error: {e}") return False # G2.2/G2.3: 备份管理器 class BackupManager: """备份管理器 - WebDAV + HF Dataset 双备份""" def __init__(self): self.state_dir = os.environ.get("OPENCLAW_STATE_DIR", "/root/.openclaw") self.backup_dir = f"{self.state_dir}/backups" self.manifest_file = f"{self.backup_dir}/manifest.json" # WebDAV 配置 self.webdav_url = os.environ.get("WEBDAV_URL", "") self.webdav_user = os.environ.get("WEBDAV_USERNAME", "") self.webdav_pass = os.environ.get("WEBDAV_PASSWORD", "") self.webdav_path = os.environ.get("WEBDAV_BACKUP_PATH", "/openclaw-backups") self.webdav_interval = int(os.environ.get("BACKUP_WEBDAV_INTERVAL", "1440")) # minutes, default 24h # HF Dataset 配置 self.hf_token = os.environ.get("HF_TOKEN", "") self.hf_repo = os.environ.get("HF_DATASET", "") self.hf_interval = int(os.environ.get("BACKUP_HF_INTERVAL", "10")) # minutes, default 10min self.webdav: Optional[WebDAVClient] = None self.hf_api: Optional[HfApi] = None self._init_clients() os.makedirs(self.backup_dir, exist_ok=True) def _init_clients(self): """初始化客户端""" # WebDAV if self.webdav_url and self.webdav_user: self.webdav = WebDAVClient(self.webdav_url, self.webdav_user, self.webdav_pass) print(f"[Backup] WebDAV client initialized: {self.webdav_url}") # HF if self.hf_token: self.hf_api = HfApi(token=self.hf_token) print(f"[Backup] HF API initialized") def _sha256_file(self, filepath: str) -> str: """计算文件SHA256""" sha256 = hashlib.sha256() with open(filepath, 'rb') as f: for chunk in iter(lambda: f.read(8192), b''): sha256.update(chunk) return sha256.hexdigest() def _load_manifest(self) -> Dict: """加载备份清单""" if os.path.exists(self.manifest_file): with open(self.manifest_file, 'r') as f: return json.load(f) return {"files": {}, "last_full_backup": None} def _save_manifest(self, manifest: Dict): """保存备份清单""" with open(self.manifest_file, 'w') as f: json.dump(manifest, f, indent=2) def _get_state_files(self) -> List[Tuple[str, str, str]]: """获取所有状态文件列表 (路径, 相对路径, sha256)""" files = [] state_path = Path(self.state_dir) for filepath in state_path.rglob("*"): if filepath.is_file(): rel_path = str(filepath.relative_to(state_path)) # 跳过备份目录自身 if rel_path.startswith("backups/"): continue sha256 = self._sha256_file(str(filepath)) files.append((str(filepath), rel_path, sha256)) return files def _create_tar_gz(self, files: List[Tuple[str, str, str]]) -> bytes: """创建 tar.gz 归档""" buffer = io.BytesIO() with tarfile.open(fileobj=buffer, mode="w:gz") as tar: for full_path, rel_path, _ in files: tar.add(full_path, arcname=rel_path) return buffer.getvalue() # G2.2: 全量备份 def full_backup(self) -> bool: """执行全量备份""" print("[Backup] Starting full backup...") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # 收集所有文件 files = self._get_state_files() if not files: print("[Backup] No files to backup") return True # 创建归档 archive_name = f"openclaw_full_{timestamp}.tar.gz" archive_data = self._create_tar_gz(files) # 更新清单 manifest = self._load_manifest() for _, rel_path, sha256 in files: manifest["files"][rel_path] = sha256 manifest["last_full_backup"] = timestamp self._save_manifest(manifest) success = True # 备份到 WebDAV if self.webdav: print(f"[Backup] Uploading to WebDAV: {archive_name}") self.webdav.mkcol(self.webdav_path) if self.webdav.put(f"{self.webdav_path}/{archive_name}", archive_data): print(f"[Backup] WebDAV upload successful") else: print(f"[Backup] WebDAV upload failed") success = False # Keep only latest 10 WebDAV backups try: items = self.webdav.propfind(self.webdav_path) full_backups = sorted([i for i in items if "openclaw_full_" in i.get("href", "")]) while len(full_backups) > 10: oldest = full_backups.pop(0) oldest_name = oldest.split("/")[-1] self.webdav.delete(f"{self.webdav_path}/{oldest_name}") print(f"[Backup] Cleaned old WebDAV backup: {oldest_name}") except Exception as e: print(f"[Backup] WebDAV cleanup error: {e}") # 备份到 HF Dataset if self.hf_api and self.hf_repo: print(f"[Backup] Uploading to HF Dataset: openclaw_full_latest.tar.gz") try: # 确保repo存在 try: self.hf_api.repo_info(repo_id=self.hf_repo, repo_type="dataset") except: create_repo(self.hf_repo, repo_type="dataset", token=self.hf_token, private=True) # 上传文件 upload_file( path_or_fileobj=io.BytesIO(archive_data), path_in_repo="openclaw_full_latest.tar.gz", repo_id=self.hf_repo, repo_type="dataset", token=self.hf_token ) print(f"[Backup] HF Dataset upload successful") except Exception as e: print(f"[Backup] HF Dataset upload failed: {e}") success = False # 保存清单 manifest_data = json.dumps(manifest, indent=2).encode() manifest_name = f"manifest_{timestamp}.json" if self.webdav: self.webdav.put(f"{self.webdav_path}/{manifest_name}", manifest_data) if self.hf_api and self.hf_repo: upload_file( path_or_fileobj=io.BytesIO(manifest_data), path_in_repo=manifest_name, repo_id=self.hf_repo, repo_type="dataset", token=self.hf_token ) print(f"[Backup] Full backup completed: {timestamp}") return success # G2.3: 增量备份 def incremental_backup(self) -> bool: """执行增量备份 - SHA256对比""" print("[Backup] Starting incremental backup...") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # 加载清单 manifest = self._load_manifest() # 找出变更的文件 changed_files = [] current_files = self._get_state_files() for full_path, rel_path, sha256 in current_files: if manifest["files"].get(rel_path) != sha256: changed_files.append((full_path, rel_path, sha256)) if not changed_files: print("[Backup] No changes detected") return True print(f"[Backup] {len(changed_files)} files changed") # 创建增量归档 archive_name = f"openclaw_incr_{timestamp}.tar.gz" archive_data = self._create_tar_gz(changed_files) # 更新清单 for _, rel_path, sha256 in changed_files: manifest["files"][rel_path] = sha256 self._save_manifest(manifest) success = True # 上传增量备份 if self.webdav: print(f"[Backup] Uploading incremental to WebDAV: {archive_name}") if self.webdav.put(f"{self.webdav_path}/{archive_name}", archive_data): print(f"[Backup] WebDAV incremental upload successful") else: success = False # Keep only latest 10 WebDAV incremental backups try: items = self.webdav.propfind(self.webdav_path) incr_backups = sorted([i for i in items if "openclaw_incr_" in i.get("href", "")]) while len(incr_backups) > 10: oldest = incr_backups.pop(0) oldest_name = oldest.split("/")[-1] self.webdav.delete(f"{self.webdav_path}/{oldest_name}") print(f"[Backup] Cleaned old WebDAV incremental backup: {oldest_name}") except Exception as e: print(f"[Backup] WebDAV cleanup error: {e}") if self.hf_api and self.hf_repo: print(f"[Backup] Uploading incremental to HF Dataset: openclaw_incr_latest.tar.gz") try: upload_file( path_or_fileobj=io.BytesIO(archive_data), path_in_repo="openclaw_incr_latest.tar.gz", repo_id=self.hf_repo, repo_type="dataset", token=self.hf_token ) print(f"[Backup] HF Dataset incremental upload successful") except Exception as e: print(f"[Backup] HF Dataset incremental upload failed: {e}") success = False print(f"[Backup] Incremental backup completed: {timestamp}") return success # G2.4: 恢复 def restore(self) -> bool: """恢复备份 - 优先HF Dataset,fallback WebDAV""" print("[Backup] Starting restore...") # 优先从 HF Dataset 恢复 if self.hf_api and self.hf_repo: print("[Backup] Trying HF Dataset restore...") if self._restore_from_hf(): return True # Fallback 到 WebDAV if self.webdav: print("[Backup] Trying WebDAV restore...") if self._restore_from_webdav(): return True print("[Backup] No backup found to restore") return False def _restore_from_webdav(self) -> bool: """从 WebDAV 恢复""" try: # 列出备份文件 items = self.webdav.propfind(self.webdav_path) backups = [i for i in items if "openclaw_full_" in i.get("href", "")] if not backups: return False # 找到最新的全量备份 latest = sorted(backups)[-1] backup_name = latest.split("/")[-1] print(f"[Backup] Found WebDAV backup: {backup_name}") # 下载全量备份 backup_data = self.webdav.get(f"{self.webdav_path}/{backup_name}") if not backup_data: return False # 解压 self._extract_tar_gz(backup_data) # 查找并应用增量备份 self._apply_incremental_webdav() print("[Backup] WebDAV restore completed") return True except Exception as e: print(f"[Backup] WebDAV restore error: {e}") return False def _restore_from_hf(self) -> bool: """从 HF Dataset 恢复""" try: # 列出repo中的文件 files = self.hf_api.list_repo_files(self.hf_repo, repo_type="dataset") backups = [f for f in files if f.startswith("openclaw_full_latest")] if not backups: return False # 找到最新的全量备份 backup_name = sorted(backups)[-1] print(f"[Backup] Found HF backup: {backup_name}") # 下载 local_path = hf_hub_download( repo_id=self.hf_repo, filename=backup_name, repo_type="dataset", token=self.hf_token, local_dir=self.backup_dir ) # 读取并解压 with open(local_path, 'rb') as f: backup_data = f.read() self._extract_tar_gz(backup_data) # 应用增量备份 self._apply_incremental_hf() print("[Backup] HF Dataset restore completed") return True except Exception as e: print(f"[Backup] HF restore error: {e}") return False def _extract_tar_gz(self, data: bytes): """解压 tar.gz 到状态目录""" buffer = io.BytesIO(data) with tarfile.open(fileobj=buffer, mode="r:gz") as tar: tar.extractall(path=self.state_dir) print(f"[Backup] Extracted to {self.state_dir}") def _apply_incremental_webdav(self): """应用 WebDAV 增量备份""" try: items = self.webdav.propfind(self.webdav_path) incr_backups = sorted([i for i in items if "openclaw_incr_" in i.get("href", "")]) for item in incr_backups: backup_name = item.split("/")[-1] print(f"[Backup] Applying incremental: {backup_name}") data = self.webdav.get(f"{self.webdav_path}/{backup_name}") if data: self._extract_tar_gz(data) except Exception as e: print(f"[Backup] Apply incremental error: {e}") def _apply_incremental_hf(self): """应用 HF Dataset 增量备份""" try: files = self.hf_api.list_repo_files(self.hf_repo, repo_type="dataset") incr_backups = sorted([f for f in files if f.startswith("openclaw_incr_latest")]) for backup_name in incr_backups: print(f"[Backup] Applying incremental: {backup_name}") local_path = hf_hub_download( repo_id=self.hf_repo, filename=backup_name, repo_type="dataset", token=self.hf_token, local_dir=self.backup_dir ) with open(local_path, 'rb') as f: self._extract_tar_gz(f.read()) except Exception as e: print(f"[Backup] Apply incremental error: {e}") # G2.5: 调度器 def scheduler(self): """后台调度器 - 按配置间隔执行备份""" print(f"[Backup] Starting scheduler: WebDAV every {self.webdav_interval}min, HF every {self.hf_interval}min") # WebDAV: full backup at configured interval if self.webdav: schedule.every(self.webdav_interval).minutes.do(self.full_backup) # HF Dataset: incremental at configured interval if self.hf_api and self.hf_repo: schedule.every(self.hf_interval).minutes.do(self.incremental_backup) while True: schedule.run_pending() time.sleep(60) def run_scheduler(self): """在后台线程运行调度器""" thread = threading.Thread(target=self.scheduler, daemon=True) thread.start() return thread def main(): """命令行入口""" if len(sys.argv) < 2: print("Usage: backup-manager.py [full|incremental|restore|scheduler]") sys.exit(1) cmd = sys.argv[1] manager = BackupManager() if cmd == "full": manager.full_backup() elif cmd == "incremental": manager.incremental_backup() elif cmd == "restore": manager.restore() elif cmd == "scheduler": manager.scheduler() else: print(f"Unknown command: {cmd}") sys.exit(1) if __name__ == "__main__": main()