import os, sys, tarfile, json from huggingface_hub import HfApi, hf_hub_download from datetime import datetime, timedelta api = HfApi() repo_id = os.getenv("HF_DATASET") token = os.getenv("HF_TOKEN") def restore(): try: print(f"--- [SYNC] 启动恢复流程, 目标仓库: {repo_id} ---") if repo_id and token: files = api.list_repo_files(repo_id=repo_id, repo_type="dataset", token=token) now = datetime.now() found = False for i in range(5): day = (now - timedelta(days=i)).strftime("%Y-%m-%d") name = f"new_backup_{day}.tar.gz" if name in files: print(f"--- [SYNC] 发现备份文件: {name}, 正在下载... ---") path = hf_hub_download(repo_id=repo_id, filename=name, repo_type="dataset", token=token) with tarfile.open(path, "r:gz") as tar: tar.extractall(path="/root/.openclaw/") print(f"--- [SYNC] 恢复成功! 数据已覆盖至 /root/.openclaw/ ---") found = True; break if not found: print("--- [SYNC] 未找到最近 5 天的备份包 ---") else: print("--- [SYNC] 跳过恢复: 未配置 HF_DATASET 或 HF_TOKEN ---") # 强制清理所有残留的 .lock 文件,防止 session 锁定错误 count = 0 for root, _, fs in os.walk("/root/.openclaw/"): for f in fs: if f.endswith(".lock"): try: os.remove(os.path.join(root, f)) count += 1 except: pass if count > 0: print(f"--- [SYNC] 已清理 {count} 个残留的锁定文件 ---") # 清理孤立的 transcript 文件 (不在 sessions.json 中的会话) sessions_dir = "/root/.openclaw/agents/main/sessions" sessions_json = os.path.join(sessions_dir, "sessions.json") valid_ids = set() try: if os.path.exists(sessions_json): with open(sessions_json, "r") as f: data = json.load(f) valid_ids = set(data.get("sessions", {}).keys()) orphan_count = 0 for item in os.listdir(sessions_dir) if os.path.exists(sessions_dir) else []: item_path = os.path.join(sessions_dir, item) if os.path.isfile(item_path) and item.endswith(".transcript"): session_id = item.replace(".transcript", "") if session_id not in valid_ids: try: os.remove(item_path) orphan_count += 1 except: pass if orphan_count > 0: print(f"--- [SYNC] 已清理 {orphan_count} 个孤立的 transcript 文件 ---") except Exception as e: print(f"--- [SYNC] 清理 orphan 文件时出错 (跳过): {e} ---") return True except Exception as e: print(f"--- [SYNC] 恢复异常: {e} ---") print(f"--- [SYNC] 网络错误或配置问题,跳过恢复继续启动 ---") return True def backup(): try: day = datetime.now().strftime("%Y-%m-%d") name = f"new_backup_{day}.tar.gz" print(f"--- [SYNC] 正在执行全量备份: {name} ---") # 读取有效的 session IDs 用于过滤孤立 transcript sessions_dir = "/root/.openclaw/agents/main/sessions" sessions_json = os.path.join(sessions_dir, "sessions.json") valid_ids = set() try: if os.path.exists(sessions_json): with open(sessions_json, "r") as f: data = json.load(f) valid_ids = set(data.get("sessions", {}).keys()) except: pass def backup_filter(tarinfo): # 排除 .lock 文件 if tarinfo.name.endswith(".lock"): return None # 排除孤立的 transcript 文件 if tarinfo.name.endswith(".transcript"): parts = tarinfo.name.split("/") if len(parts) > 0: filename = parts[-1] session_id = filename.replace(".transcript", "") if session_id not in valid_ids: return None return tarinfo with tarfile.open(name, "w:gz") as tar: for target in ["sessions", "workspace", "agents", "memory", "plugins", "openclaw.json"]: full_path = f"/root/.openclaw/{target}" if os.path.exists(full_path): tar.add(full_path, arcname=target, filter=backup_filter) api.upload_file(path_or_fileobj=name, path_in_repo=name, repo_id=repo_id, repo_type="dataset", token=token) print(f"--- [SYNC] 备份上传成功! ---") except Exception as e: print(f"--- [SYNC] 备份失败: {e} ---") if __name__ == "__main__": if len(sys.argv) > 1 and sys.argv[1] == "backup": backup() else: restore()