File size: 4,982 Bytes
eaa272b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import os
import sys
import tarfile
import hashlib
import logging
from datetime import datetime
from huggingface_hub import HfApi, hf_hub_download
from huggingface_hub.utils import EntryNotFoundError, RepositoryNotFoundError

# ── 日志配置
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    datefmt="%Y-%m-%dT%H:%M:%SZ",
)
log = logging.getLogger("sync")

# ── 配置
api     = HfApi()
repo_id = os.getenv("HF_DATASET")
token   = os.getenv("HF_TOKEN")

FILENAME    = "latest_backup.tar.gz"
BACKUP_PATH = f"/tmp/{FILENAME}"
BASE_DIR    = "/home/node/.openclaw"

PATHS_TO_BACKUP = [
    f"{BASE_DIR}/sessions",
    f"{BASE_DIR}/agents/main/sessions",
    f"{BASE_DIR}/credentials",
    f"{BASE_DIR}/workspace",
    f"{BASE_DIR}/extensions",
    f"{BASE_DIR}/openclaw.json",
]

# ── 工具函数
def _check_env() -> bool:
    if not repo_id or not token:
        log.warning("HF_DATASET 或 HF_TOKEN 未设置,跳过同步。")
        return False
    return True

def _sha256(path: str) -> str:
    h = hashlib.sha256()
    with open(path, "rb") as f:
        for chunk in iter(lambda: f.read(65536), b""):
            h.update(chunk)
    return h.hexdigest()

def _verify_tar(path: str) -> bool:
    try:
        with tarfile.open(path, "r:gz") as tar:
            members = tar.getmembers()
            if not members:
                log.warning("压缩包为空,跳过。")
                return False
            log.info(f"压缩包验证通过,共 {len(members)} 个条目。")
            return True
    except tarfile.TarError as e:
        log.error(f"压缩包损坏: {e}")
        return False

# ── restore
def restore() -> bool:
    if not _check_env():
        return False

    log.info(f"开始恢复:从 {repo_id} 下载 {FILENAME} ...")

    try:
        path = hf_hub_download(
            repo_id=repo_id,
            filename=FILENAME,
            repo_type="dataset",
            token=token,
        )
    except (EntryNotFoundError, RepositoryNotFoundError):
        log.info("仓库中尚无备份文件,首次运行,跳过恢复。")
        return False
    except Exception as e:
        log.error(f"下载失败: {e}")
        return False

    if not _verify_tar(path):
        log.error("备份文件验证失败,放弃解压。")
        return False

    log.info(f"文件 SHA-256: {_sha256(path)}")

    try:
        os.makedirs(BASE_DIR, exist_ok=True)
        with tarfile.open(path, "r:gz") as tar:
            # 兼容处理:如果你之前的备份带有 /root 路径,解压时会自动映射到当前目录
            tar.extractall(path=BASE_DIR)
        log.info(f"恢复成功 → {BASE_DIR}")
        return True
    except Exception as e:
        log.error(f"解压失败: {e}")
        return False

# ── backup
def backup() -> bool:
    if not _check_env():
        return False

    existing = [p for p in PATHS_TO_BACKUP if os.path.exists(p)]
    if not existing:
        log.warning("所有备份路径均不存在,跳过备份。")
        return False

    log.info(f"开始备份,共 {len(existing)} 个路径...")

    try:
        with tarfile.open(BACKUP_PATH, "w:gz") as tar:
            for p in existing:
                # 剥离前缀,确保解压时不带绝对路径
                arcname = p.replace(f"{BASE_DIR}/", "")
                tar.add(p, arcname=arcname, recursive=True)
                log.info(f"  已打包: {p}{arcname}")
    except Exception as e:
        log.error(f"打包失败: {e}")
        return False

    if not _verify_tar(BACKUP_PATH):
        log.error("生成的压缩包验证失败,取消上传。")
        return False

    log.info(f"压缩包大小: {os.path.getsize(BACKUP_PATH)/1024:.1f} KB,SHA-256: {_sha256(BACKUP_PATH)}")

    try:
        api.upload_file(
            path_or_fileobj=BACKUP_PATH,
            path_in_repo=FILENAME,
            repo_id=repo_id,
            repo_type="dataset",
            token=token,
            commit_message=f"backup {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')} UTC",
        )
        log.info(f"备份上传成功 → {repo_id}/{FILENAME}")
        return True
    except Exception as e:
        log.error(f"上传失败: {e}")
        return False
    finally:
        if os.path.exists(BACKUP_PATH):
            os.remove(BACKUP_PATH)
            log.info("本地临时文件已清理。")

# ── 入口
if __name__ == "__main__":
    action = sys.argv[1] if len(sys.argv) > 1 else "restore"

    if action == "backup":
        success = backup()
    elif action == "restore":
        success = restore()
    else:
        log.error(f"未知命令: {action},用法: python sync.py [backup|restore]")
        sys.exit(1)

    sys.exit(0 if success else 1)