Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| G2: WebDAV + HF Dataset 双备份管理器 | |
| 功能: 全量备份、增量备份、恢复、调度器 | |
| """ | |
| import os | |
| import sys | |
| import json | |
| import hashlib | |
| import tarfile | |
| import io | |
| import time | |
| import schedule | |
| import threading | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Tuple | |
| import requests | |
| from huggingface_hub import HfApi, hf_hub_download, upload_file, create_repo | |
| # G2.1: WebDAV 通信层 | |
| class WebDAVClient: | |
| """WebDAV 客户端 - 实现 PUT/GET/PROPFIND/MKCOL""" | |
| def __init__(self, base_url: str, username: str, password: str): | |
| self.base_url = base_url.rstrip('/') | |
| self.auth = (username, password) | |
| self.session = requests.Session() | |
| self.session.auth = self.auth | |
| def _url(self, path: str) -> str: | |
| """构建完整URL""" | |
| path = path.lstrip('/') | |
| return f"{self.base_url}/{path}" | |
| def mkcol(self, path: str) -> bool: | |
| """创建目录 (MKCOL)""" | |
| try: | |
| resp = self.session.request("MKCOL", self._url(path)) | |
| return resp.status_code in [201, 207, 405] # 405 = 已存在 | |
| except Exception as e: | |
| print(f"[WebDAV] MKCOL error: {e}") | |
| return False | |
| def put(self, path: str, data: bytes) -> bool: | |
| """上传文件 (PUT)""" | |
| try: | |
| resp = self.session.put(self._url(path), data=data) | |
| return resp.status_code in [201, 204] | |
| except Exception as e: | |
| print(f"[WebDAV] PUT error: {e}") | |
| return False | |
| def get(self, path: str) -> Optional[bytes]: | |
| """下载文件 (GET)""" | |
| try: | |
| resp = self.session.get(self._url(path)) | |
| if resp.status_code == 200: | |
| return resp.content | |
| return None | |
| except Exception as e: | |
| print(f"[WebDAV] GET error: {e}") | |
| return None | |
| def propfind(self, path: str) -> List[Dict]: | |
| """列出目录内容 (PROPFIND)""" | |
| try: | |
| headers = {"Depth": "1"} | |
| resp = self.session.request("PROPFIND", self._url(path), headers=headers) | |
| if resp.status_code == 207: | |
| # 简单解析XML响应 | |
| import xml.etree.ElementTree as ET | |
| root = ET.fromstring(resp.content) | |
| items = [] | |
| for response in root.findall(".//{DAV:}response"): | |
| href = response.find("{DAV:}href") | |
| if href is not None: | |
| items.append({"href": href.text}) | |
| return items | |
| return [] | |
| except Exception as e: | |
| print(f"[WebDAV] PROPFIND error: {e}") | |
| return [] | |
| def exists(self, path: str) -> bool: | |
| """检查文件/目录是否存在""" | |
| try: | |
| resp = self.session.request("PROPFIND", self._url(path), headers={"Depth": "0"}) | |
| return resp.status_code == 207 | |
| except: | |
| return False | |
| def delete(self, path: str) -> bool: | |
| """Delete a file""" | |
| try: | |
| resp = self.session.delete(self._url(path)) | |
| return resp.status_code in [200, 204, 404] | |
| except Exception as e: | |
| print(f"[WebDAV] DELETE error: {e}") | |
| return False | |
| # G2.2/G2.3: 备份管理器 | |
| class BackupManager: | |
| """备份管理器 - WebDAV + HF Dataset 双备份""" | |
| def __init__(self): | |
| self.state_dir = os.environ.get("OPENCLAW_STATE_DIR", "/root/.openclaw") | |
| self.backup_dir = f"{self.state_dir}/backups" | |
| self.manifest_file = f"{self.backup_dir}/manifest.json" | |
| # WebDAV 配置 | |
| self.webdav_url = os.environ.get("WEBDAV_URL", "") | |
| self.webdav_user = os.environ.get("WEBDAV_USERNAME", "") | |
| self.webdav_pass = os.environ.get("WEBDAV_PASSWORD", "") | |
| self.webdav_path = os.environ.get("WEBDAV_BACKUP_PATH", "/openclaw-backups") | |
| self.webdav_interval = int(os.environ.get("BACKUP_WEBDAV_INTERVAL", "1440")) # minutes, default 24h | |
| # HF Dataset 配置 | |
| self.hf_token = os.environ.get("HF_TOKEN", "") | |
| self.hf_repo = os.environ.get("HF_DATASET", "") | |
| self.hf_interval = int(os.environ.get("BACKUP_HF_INTERVAL", "10")) # minutes, default 10min | |
| self.webdav: Optional[WebDAVClient] = None | |
| self.hf_api: Optional[HfApi] = None | |
| self._init_clients() | |
| os.makedirs(self.backup_dir, exist_ok=True) | |
| def _init_clients(self): | |
| """初始化客户端""" | |
| # WebDAV | |
| if self.webdav_url and self.webdav_user: | |
| self.webdav = WebDAVClient(self.webdav_url, self.webdav_user, self.webdav_pass) | |
| print(f"[Backup] WebDAV client initialized: {self.webdav_url}") | |
| # HF | |
| if self.hf_token: | |
| self.hf_api = HfApi(token=self.hf_token) | |
| print(f"[Backup] HF API initialized") | |
| def _sha256_file(self, filepath: str) -> str: | |
| """计算文件SHA256""" | |
| sha256 = hashlib.sha256() | |
| with open(filepath, 'rb') as f: | |
| for chunk in iter(lambda: f.read(8192), b''): | |
| sha256.update(chunk) | |
| return sha256.hexdigest() | |
| def _load_manifest(self) -> Dict: | |
| """加载备份清单""" | |
| if os.path.exists(self.manifest_file): | |
| with open(self.manifest_file, 'r') as f: | |
| return json.load(f) | |
| return {"files": {}, "last_full_backup": None} | |
| def _save_manifest(self, manifest: Dict): | |
| """保存备份清单""" | |
| with open(self.manifest_file, 'w') as f: | |
| json.dump(manifest, f, indent=2) | |
| def _get_state_files(self) -> List[Tuple[str, str, str]]: | |
| """获取所有状态文件列表 (路径, 相对路径, sha256)""" | |
| files = [] | |
| state_path = Path(self.state_dir) | |
| for filepath in state_path.rglob("*"): | |
| if filepath.is_file(): | |
| rel_path = str(filepath.relative_to(state_path)) | |
| # 跳过备份目录自身 | |
| if rel_path.startswith("backups/"): | |
| continue | |
| sha256 = self._sha256_file(str(filepath)) | |
| files.append((str(filepath), rel_path, sha256)) | |
| return files | |
| def _create_tar_gz(self, files: List[Tuple[str, str, str]]) -> bytes: | |
| """创建 tar.gz 归档""" | |
| buffer = io.BytesIO() | |
| with tarfile.open(fileobj=buffer, mode="w:gz") as tar: | |
| for full_path, rel_path, _ in files: | |
| tar.add(full_path, arcname=rel_path) | |
| return buffer.getvalue() | |
| # G2.2: 全量备份 | |
| def full_backup(self) -> bool: | |
| """执行全量备份""" | |
| print("[Backup] Starting full backup...") | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| # 收集所有文件 | |
| files = self._get_state_files() | |
| if not files: | |
| print("[Backup] No files to backup") | |
| return True | |
| # 创建归档 | |
| archive_name = f"openclaw_full_{timestamp}.tar.gz" | |
| archive_data = self._create_tar_gz(files) | |
| # 更新清单 | |
| manifest = self._load_manifest() | |
| for _, rel_path, sha256 in files: | |
| manifest["files"][rel_path] = sha256 | |
| manifest["last_full_backup"] = timestamp | |
| self._save_manifest(manifest) | |
| success = True | |
| # 备份到 WebDAV | |
| if self.webdav: | |
| print(f"[Backup] Uploading to WebDAV: {archive_name}") | |
| self.webdav.mkcol(self.webdav_path) | |
| if self.webdav.put(f"{self.webdav_path}/{archive_name}", archive_data): | |
| print(f"[Backup] WebDAV upload successful") | |
| else: | |
| print(f"[Backup] WebDAV upload failed") | |
| success = False | |
| # Keep only latest 10 WebDAV backups | |
| try: | |
| items = self.webdav.propfind(self.webdav_path) | |
| full_backups = sorted([i for i in items if "openclaw_full_" in i.get("href", "")]) | |
| while len(full_backups) > 10: | |
| oldest = full_backups.pop(0) | |
| oldest_name = oldest.split("/")[-1] | |
| self.webdav.delete(f"{self.webdav_path}/{oldest_name}") | |
| print(f"[Backup] Cleaned old WebDAV backup: {oldest_name}") | |
| except Exception as e: | |
| print(f"[Backup] WebDAV cleanup error: {e}") | |
| # 备份到 HF Dataset | |
| if self.hf_api and self.hf_repo: | |
| print(f"[Backup] Uploading to HF Dataset: openclaw_full_latest.tar.gz") | |
| try: | |
| # 确保repo存在 | |
| try: | |
| self.hf_api.repo_info(repo_id=self.hf_repo, repo_type="dataset") | |
| except: | |
| create_repo(self.hf_repo, repo_type="dataset", token=self.hf_token, private=True) | |
| # 上传文件 | |
| upload_file( | |
| path_or_fileobj=io.BytesIO(archive_data), | |
| path_in_repo="openclaw_full_latest.tar.gz", | |
| repo_id=self.hf_repo, | |
| repo_type="dataset", | |
| token=self.hf_token | |
| ) | |
| print(f"[Backup] HF Dataset upload successful") | |
| except Exception as e: | |
| print(f"[Backup] HF Dataset upload failed: {e}") | |
| success = False | |
| # 保存清单 | |
| manifest_data = json.dumps(manifest, indent=2).encode() | |
| manifest_name = f"manifest_{timestamp}.json" | |
| if self.webdav: | |
| self.webdav.put(f"{self.webdav_path}/{manifest_name}", manifest_data) | |
| if self.hf_api and self.hf_repo: | |
| upload_file( | |
| path_or_fileobj=io.BytesIO(manifest_data), | |
| path_in_repo=manifest_name, | |
| repo_id=self.hf_repo, | |
| repo_type="dataset", | |
| token=self.hf_token | |
| ) | |
| print(f"[Backup] Full backup completed: {timestamp}") | |
| return success | |
| # G2.3: 增量备份 | |
| def incremental_backup(self) -> bool: | |
| """执行增量备份 - SHA256对比""" | |
| print("[Backup] Starting incremental backup...") | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| # 加载清单 | |
| manifest = self._load_manifest() | |
| # 找出变更的文件 | |
| changed_files = [] | |
| current_files = self._get_state_files() | |
| for full_path, rel_path, sha256 in current_files: | |
| if manifest["files"].get(rel_path) != sha256: | |
| changed_files.append((full_path, rel_path, sha256)) | |
| if not changed_files: | |
| print("[Backup] No changes detected") | |
| return True | |
| print(f"[Backup] {len(changed_files)} files changed") | |
| # 创建增量归档 | |
| archive_name = f"openclaw_incr_{timestamp}.tar.gz" | |
| archive_data = self._create_tar_gz(changed_files) | |
| # 更新清单 | |
| for _, rel_path, sha256 in changed_files: | |
| manifest["files"][rel_path] = sha256 | |
| self._save_manifest(manifest) | |
| success = True | |
| # 上传增量备份 | |
| if self.webdav: | |
| print(f"[Backup] Uploading incremental to WebDAV: {archive_name}") | |
| if self.webdav.put(f"{self.webdav_path}/{archive_name}", archive_data): | |
| print(f"[Backup] WebDAV incremental upload successful") | |
| else: | |
| success = False | |
| # Keep only latest 10 WebDAV incremental backups | |
| try: | |
| items = self.webdav.propfind(self.webdav_path) | |
| incr_backups = sorted([i for i in items if "openclaw_incr_" in i.get("href", "")]) | |
| while len(incr_backups) > 10: | |
| oldest = incr_backups.pop(0) | |
| oldest_name = oldest.split("/")[-1] | |
| self.webdav.delete(f"{self.webdav_path}/{oldest_name}") | |
| print(f"[Backup] Cleaned old WebDAV incremental backup: {oldest_name}") | |
| except Exception as e: | |
| print(f"[Backup] WebDAV cleanup error: {e}") | |
| if self.hf_api and self.hf_repo: | |
| print(f"[Backup] Uploading incremental to HF Dataset: openclaw_incr_latest.tar.gz") | |
| try: | |
| upload_file( | |
| path_or_fileobj=io.BytesIO(archive_data), | |
| path_in_repo="openclaw_incr_latest.tar.gz", | |
| repo_id=self.hf_repo, | |
| repo_type="dataset", | |
| token=self.hf_token | |
| ) | |
| print(f"[Backup] HF Dataset incremental upload successful") | |
| except Exception as e: | |
| print(f"[Backup] HF Dataset incremental upload failed: {e}") | |
| success = False | |
| print(f"[Backup] Incremental backup completed: {timestamp}") | |
| return success | |
| # G2.4: 恢复 | |
| def restore(self) -> bool: | |
| """恢复备份 - 优先HF Dataset,fallback WebDAV""" | |
| print("[Backup] Starting restore...") | |
| # 优先从 HF Dataset 恢复 | |
| if self.hf_api and self.hf_repo: | |
| print("[Backup] Trying HF Dataset restore...") | |
| if self._restore_from_hf(): | |
| return True | |
| # Fallback 到 WebDAV | |
| if self.webdav: | |
| print("[Backup] Trying WebDAV restore...") | |
| if self._restore_from_webdav(): | |
| return True | |
| print("[Backup] No backup found to restore") | |
| return False | |
| def _restore_from_webdav(self) -> bool: | |
| """从 WebDAV 恢复""" | |
| try: | |
| # 列出备份文件 | |
| items = self.webdav.propfind(self.webdav_path) | |
| backups = [i for i in items if "openclaw_full_" in i.get("href", "")] | |
| if not backups: | |
| return False | |
| # 找到最新的全量备份 | |
| latest = sorted(backups)[-1] | |
| backup_name = latest.split("/")[-1] | |
| print(f"[Backup] Found WebDAV backup: {backup_name}") | |
| # 下载全量备份 | |
| backup_data = self.webdav.get(f"{self.webdav_path}/{backup_name}") | |
| if not backup_data: | |
| return False | |
| # 解压 | |
| self._extract_tar_gz(backup_data) | |
| # 查找并应用增量备份 | |
| self._apply_incremental_webdav() | |
| print("[Backup] WebDAV restore completed") | |
| return True | |
| except Exception as e: | |
| print(f"[Backup] WebDAV restore error: {e}") | |
| return False | |
| def _restore_from_hf(self) -> bool: | |
| """从 HF Dataset 恢复""" | |
| try: | |
| # 列出repo中的文件 | |
| files = self.hf_api.list_repo_files(self.hf_repo, repo_type="dataset") | |
| backups = [f for f in files if f.startswith("openclaw_full_latest")] | |
| if not backups: | |
| return False | |
| # 找到最新的全量备份 | |
| backup_name = sorted(backups)[-1] | |
| print(f"[Backup] Found HF backup: {backup_name}") | |
| # 下载 | |
| local_path = hf_hub_download( | |
| repo_id=self.hf_repo, | |
| filename=backup_name, | |
| repo_type="dataset", | |
| token=self.hf_token, | |
| local_dir=self.backup_dir | |
| ) | |
| # 读取并解压 | |
| with open(local_path, 'rb') as f: | |
| backup_data = f.read() | |
| self._extract_tar_gz(backup_data) | |
| # 应用增量备份 | |
| self._apply_incremental_hf() | |
| print("[Backup] HF Dataset restore completed") | |
| return True | |
| except Exception as e: | |
| print(f"[Backup] HF restore error: {e}") | |
| return False | |
| def _extract_tar_gz(self, data: bytes): | |
| """解压 tar.gz 到状态目录""" | |
| buffer = io.BytesIO(data) | |
| with tarfile.open(fileobj=buffer, mode="r:gz") as tar: | |
| tar.extractall(path=self.state_dir) | |
| print(f"[Backup] Extracted to {self.state_dir}") | |
| def _apply_incremental_webdav(self): | |
| """应用 WebDAV 增量备份""" | |
| try: | |
| items = self.webdav.propfind(self.webdav_path) | |
| incr_backups = sorted([i for i in items if "openclaw_incr_" in i.get("href", "")]) | |
| for item in incr_backups: | |
| backup_name = item.split("/")[-1] | |
| print(f"[Backup] Applying incremental: {backup_name}") | |
| data = self.webdav.get(f"{self.webdav_path}/{backup_name}") | |
| if data: | |
| self._extract_tar_gz(data) | |
| except Exception as e: | |
| print(f"[Backup] Apply incremental error: {e}") | |
| def _apply_incremental_hf(self): | |
| """应用 HF Dataset 增量备份""" | |
| try: | |
| files = self.hf_api.list_repo_files(self.hf_repo, repo_type="dataset") | |
| incr_backups = sorted([f for f in files if f.startswith("openclaw_incr_latest")]) | |
| for backup_name in incr_backups: | |
| print(f"[Backup] Applying incremental: {backup_name}") | |
| local_path = hf_hub_download( | |
| repo_id=self.hf_repo, | |
| filename=backup_name, | |
| repo_type="dataset", | |
| token=self.hf_token, | |
| local_dir=self.backup_dir | |
| ) | |
| with open(local_path, 'rb') as f: | |
| self._extract_tar_gz(f.read()) | |
| except Exception as e: | |
| print(f"[Backup] Apply incremental error: {e}") | |
| # G2.5: 调度器 | |
| def scheduler(self): | |
| """后台调度器 - 按配置间隔执行备份""" | |
| print(f"[Backup] Starting scheduler: WebDAV every {self.webdav_interval}min, HF every {self.hf_interval}min") | |
| # WebDAV: full backup at configured interval | |
| if self.webdav: | |
| schedule.every(self.webdav_interval).minutes.do(self.full_backup) | |
| # HF Dataset: incremental at configured interval | |
| if self.hf_api and self.hf_repo: | |
| schedule.every(self.hf_interval).minutes.do(self.incremental_backup) | |
| while True: | |
| schedule.run_pending() | |
| time.sleep(60) | |
| def run_scheduler(self): | |
| """在后台线程运行调度器""" | |
| thread = threading.Thread(target=self.scheduler, daemon=True) | |
| thread.start() | |
| return thread | |
| def main(): | |
| """命令行入口""" | |
| if len(sys.argv) < 2: | |
| print("Usage: backup-manager.py [full|incremental|restore|scheduler]") | |
| sys.exit(1) | |
| cmd = sys.argv[1] | |
| manager = BackupManager() | |
| if cmd == "full": | |
| manager.full_backup() | |
| elif cmd == "incremental": | |
| manager.incremental_backup() | |
| elif cmd == "restore": | |
| manager.restore() | |
| elif cmd == "scheduler": | |
| manager.scheduler() | |
| else: | |
| print(f"Unknown command: {cmd}") | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| main() | |