claw / sync.py
Simford.Dong
refactor: move sync and startup logic to separate files to fix Docker build errors
7eb2e6e
import os, sys, tarfile, json
from huggingface_hub import HfApi, hf_hub_download
from datetime import datetime, timedelta
api = HfApi()
repo_id = os.getenv("HF_DATASET")
token = os.getenv("HF_TOKEN")
def restore():
try:
print(f"--- [SYNC] 启动恢复流程, 目标仓库: {repo_id} ---")
if repo_id and token:
files = api.list_repo_files(repo_id=repo_id, repo_type="dataset", token=token)
now = datetime.now()
found = False
for i in range(5):
day = (now - timedelta(days=i)).strftime("%Y-%m-%d")
name = f"new_backup_{day}.tar.gz"
if name in files:
print(f"--- [SYNC] 发现备份文件: {name}, 正在下载... ---")
path = hf_hub_download(repo_id=repo_id, filename=name, repo_type="dataset", token=token)
with tarfile.open(path, "r:gz") as tar: tar.extractall(path="/root/.openclaw/")
print(f"--- [SYNC] 恢复成功! 数据已覆盖至 /root/.openclaw/ ---")
found = True; break
if not found: print("--- [SYNC] 未找到最近 5 天的备份包 ---")
else: print("--- [SYNC] 跳过恢复: 未配置 HF_DATASET 或 HF_TOKEN ---")
# 强制清理所有残留的 .lock 文件,防止 session 锁定错误
count = 0
for root, _, fs in os.walk("/root/.openclaw/"):
for f in fs:
if f.endswith(".lock"):
try:
os.remove(os.path.join(root, f))
count += 1
except: pass
if count > 0: print(f"--- [SYNC] 已清理 {count} 个残留的锁定文件 ---")
# 清理孤立的 transcript 文件 (不在 sessions.json 中的会话)
sessions_dir = "/root/.openclaw/agents/main/sessions"
sessions_json = os.path.join(sessions_dir, "sessions.json")
valid_ids = set()
try:
if os.path.exists(sessions_json):
with open(sessions_json, "r") as f:
data = json.load(f)
valid_ids = set(data.get("sessions", {}).keys())
orphan_count = 0
for item in os.listdir(sessions_dir) if os.path.exists(sessions_dir) else []:
item_path = os.path.join(sessions_dir, item)
if os.path.isfile(item_path) and item.endswith(".transcript"):
session_id = item.replace(".transcript", "")
if session_id not in valid_ids:
try:
os.remove(item_path)
orphan_count += 1
except: pass
if orphan_count > 0:
print(f"--- [SYNC] 已清理 {orphan_count} 个孤立的 transcript 文件 ---")
except Exception as e:
print(f"--- [SYNC] 清理 orphan 文件时出错 (跳过): {e} ---")
return True
except Exception as e:
print(f"--- [SYNC] 恢复异常: {e} ---")
print(f"--- [SYNC] 网络错误或配置问题,跳过恢复继续启动 ---")
return True
def backup():
try:
day = datetime.now().strftime("%Y-%m-%d")
name = f"new_backup_{day}.tar.gz"
print(f"--- [SYNC] 正在执行全量备份: {name} ---")
# 读取有效的 session IDs 用于过滤孤立 transcript
sessions_dir = "/root/.openclaw/agents/main/sessions"
sessions_json = os.path.join(sessions_dir, "sessions.json")
valid_ids = set()
try:
if os.path.exists(sessions_json):
with open(sessions_json, "r") as f:
data = json.load(f)
valid_ids = set(data.get("sessions", {}).keys())
except: pass
def backup_filter(tarinfo):
# 排除 .lock 文件
if tarinfo.name.endswith(".lock"): return None
# 排除孤立的 transcript 文件
if tarinfo.name.endswith(".transcript"):
parts = tarinfo.name.split("/")
if len(parts) > 0:
filename = parts[-1]
session_id = filename.replace(".transcript", "")
if session_id not in valid_ids:
return None
return tarinfo
with tarfile.open(name, "w:gz") as tar:
for target in ["sessions", "workspace", "agents", "memory", "plugins", "openclaw.json"]:
full_path = f"/root/.openclaw/{target}"
if os.path.exists(full_path):
tar.add(full_path, arcname=target, filter=backup_filter)
api.upload_file(path_or_fileobj=name, path_in_repo=name, repo_id=repo_id, repo_type="dataset", token=token)
print(f"--- [SYNC] 备份上传成功! ---")
except Exception as e: print(f"--- [SYNC] 备份失败: {e} ---")
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == "backup": backup()
else: restore()