#!/usr/bin/env python3 """根据指定的 HF Dataset,查找与关系(分包最多 + 总包最大 + 时间最新)的备份 meta.json, 并可选择将匹配备份之前的旧备份全部删除。 综合评分: score = parts/max_parts + size/max_size + time_rank/rank_total 三个维度等权归一化,范围 0~3,最高分最优。 用法: python scripts/find-largest-backup.py [options] python scripts/find-largest-backup.py --delete-before # 查找并清理旧备份 示例: python scripts/find-largest-backup.py my-space/my-space-backup python scripts/find-largest-backup.py my-space/my-space-backup -v python scripts/find-largest-backup.py my-space/my-space-backup --delete-before python scripts/find-largest-backup.py my-space/my-space-backup --delete-before --yes """ import argparse import json import os import re import sys import time from collections import defaultdict from pathlib import Path # 引入 HF 共享 helper(_hf_token.py 同目录) sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from _hf_token import get_hf_token_interactive # noqa: E402 try: from huggingface_hub import HfApi, hf_hub_download except ModuleNotFoundError: print("Error: 'huggingface_hub' module not found.") print() print("Install it with:") print(" uv add 'huggingface_hub[cli]'") print() print("Or with pip:") print(" pip install 'huggingface_hub[cli]'") sys.exit(1) # 匹配备份归档文件名(压缩包/分片),提取时间戳 _ARCHIVE_NAME_RE = re.compile( r"^openclaw-backup-(\d{8}-\d{6})\.tar\.gz(?:\.enc)?(?:\.part-[a-z]{2})?$" ) # 匹配 .meta.json 文件,提取时间戳 _META_NAME_RE = re.compile( r"^openclaw-backup-(\d{8}-\d{6})\.tar\.gz(?:\.enc)?\.meta\.json$" ) # 每批删除的日期模式数 _BATCH_SIZE = 100 def _human_size(bytes_: int) -> str: for unit in ("B", "KB", "MB", "GB", "TB"): if abs(bytes_) < 1024: return f"{bytes_:.2f} {unit}" bytes_ /= 1024 return f"{bytes_:.2f} PB" def _download_meta( repo_id: str, meta_path: str, token: str | None = None, repo_type: str = "dataset", ) -> dict | None: """下载远程 meta.json 并返回解析后的 dict,失败返回 None。""" try: local_path = hf_hub_download( repo_id=repo_id, filename=meta_path, repo_type=repo_type, token=token, ) with open(local_path) as f: meta = json.load(f) Path(local_path).unlink(missing_ok=True) return meta except Exception as e: print(f" [WARN] 下载 {meta_path} 失败: {e}", file=sys.stderr) return None def list_backups( repo_id: str, token: str | None = None, repo_type: str = "dataset", path_prefix: str = "backups", ) -> list[dict]: """列出 dataset 中所有备份(仅文件列表扫描,不下载任何内容)。 返回按时间戳降序的列表,每个元素: { "timestamp": "20260430-223004", "files": [...], "part_count": 3, "total_files": 4, "has_meta": True, "meta_path": "backups/....meta.json", } """ api = HfApi(token=token) try: all_files = api.list_repo_files(repo_id=repo_id, repo_type=repo_type) except Exception as e: print(f"[ERROR] 无法列出文件: {e}", file=sys.stderr) sys.exit(1) prefix = path_prefix.strip("/") if path_prefix else "" prefix_slash = f"{prefix}/" if prefix else "" top_files = [] for path in all_files: if not isinstance(path, str): continue if prefix_slash: if not path.startswith(prefix_slash): continue relative = path[len(prefix_slash) :] else: relative = path if "/" in relative: continue top_files.append((path, relative)) backups: dict[str, dict] = defaultdict( lambda: {"files": [], "part_count": 0, "has_meta": False, "meta_path": None} ) for full_path, relative in top_files: match = _ARCHIVE_NAME_RE.fullmatch(relative) if match: ts = match.group(1) entry = backups[ts] entry["files"].append(full_path) if ".part-" in relative: entry["part_count"] += 1 continue meta_match = _META_NAME_RE.fullmatch(relative) if meta_match: ts = meta_match.group(1) entry = backups[ts] entry["files"].append(full_path) entry["has_meta"] = True entry["meta_path"] = full_path continue result = [ { "timestamp": ts, "files": d["files"], "part_count": d["part_count"], "total_files": len(d["files"]), "has_meta": d["has_meta"], "meta_path": d["meta_path"], } for ts, d in backups.items() ] result.sort(key=lambda b: b["timestamp"], reverse=True) return result def _score_candidate( parts: int, max_parts: int, size: int, max_size: int, ts_rank: int, total: int ) -> float: """综合评分:三个维度等权归一化后求和,范围 0~3。""" p = parts / max_parts if max_parts else 0 s = size / max_size if max_size else 0 t = (total - ts_rank) / total if total > 1 else 1.0 return round(p + s + t, 4) def find_best_backup( repo_id: str, token: str | None = None, repo_type: str = "dataset", path_prefix: str = "backups", use_size: bool = True, ) -> tuple[dict | None, dict | None]: """找到分包最多 + 总包最大 + 时间最新的备份(三个维度与关系)。""" backups = list_backups( repo_id=repo_id, token=token, repo_type=repo_type, path_prefix=path_prefix, ) if not backups: print(f"[INFO] 在 {repo_id} 中未找到任何备份文件", file=sys.stderr) return None, None valid = [b for b in backups if b["has_meta"]] if not valid: print(f"[WARN] 所有备份均缺少 meta.json 文件", file=sys.stderr) return None, None max_parts = max(b["part_count"] for b in valid) if not use_size: total = len(valid) scored = [] for rank, b in enumerate(valid): score = _score_candidate(b["part_count"], max_parts, 0, 1, rank, total) scored.append((score, b)) scored.sort(key=lambda x: x[0], reverse=True) return scored[0][1], None print(f"正在获取 {len(valid)} 个备份的容量信息(下载 meta.json)...") enriched = [] for b in valid: meta = _download_meta(repo_id, b["meta_path"], token, repo_type) size = (meta or {}).get("archive_size", 0) or 0 enriched.append({**b, "archive_size": size}) max_size = max(e["archive_size"] for e in enriched) if enriched else 1 total = len(enriched) scored = [] for rank, e in enumerate(enriched): score = _score_candidate( e["part_count"], max_parts, e["archive_size"], max_size, rank, total, ) scored.append((score, e)) scored.sort(key=lambda x: x[0], reverse=True) best_score, best = scored[0] print(f" 综合评分最高的备份: score={best_score:.4f}") print(f" (分包={best['part_count']}/{max_parts}, " f"容量={_human_size(best['archive_size'])}, " f"时间={best['timestamp']})") size_info = { "archive_size": best["archive_size"], "archive_size_human": _human_size(best["archive_size"]), "score": best_score, } meta = _download_meta(repo_id, best["meta_path"], token, repo_type) if meta: size_info["volumes"] = meta.get("volumes", []) size_info["is_split"] = meta.get("is_split", False) size_info["file_count"] = meta.get("file_count", 0) return best, size_info def _delete_backups( api: HfApi, repo_id: str, repo_type: str, prefix: str, timestamps_to_delete: list[str], ) -> int: """批量删除指定时间戳的备份(使用通配符模式 + api.delete_files)。 返回成功删除的时间戳数量。 """ prefix_slash = f"{prefix}/" if prefix else "" total = len(timestamps_to_delete) total_batches = (total + _BATCH_SIZE - 1) // _BATCH_SIZE deleted_count = 0 for batch_idx in range(total_batches): start = batch_idx * _BATCH_SIZE end = min(start + _BATCH_SIZE, total) batch_ts = timestamps_to_delete[start:end] # 每个时间戳生成一个通配符模式 patterns = [ f"{prefix_slash}openclaw-backup-{ts}*" for ts in batch_ts ] try: api.delete_files( repo_id=repo_id, repo_type=repo_type, delete_patterns=patterns, commit_message=( f"backup cleanup: delete {len(batch_ts)} old backup(s) " f"(batch {batch_idx + 1}/{total_batches})" ), ) deleted_count += len(batch_ts) print(f" ✓ 第 {batch_idx + 1}/{total_batches} 批: " f"删除了 {len(batch_ts)} 个旧备份") # 限速保护 if batch_idx + 1 < total_batches: time.sleep(1) except Exception as e: print(f" ✗ 第 {batch_idx + 1}/{total_batches} 批失败: {e}", file=sys.stderr) print(f" 已删除 {deleted_count}/{total} 个备份后中断") break return deleted_count def super_squash_history( repo_id: str, token: str | None = None, repo_type: str = "dataset", ) -> bool: """对仓库执行超级压缩,真正释放已删除文件的存储空间。 超级压缩会将整个 Git 历史压缩成一个提交,删除所有旧 LFS 对象。 这是一个破坏性操作,无法撤销。 """ print(f"\n{'='*60}") print(f"执行超级压缩(super_squash_history)...") print(f" 警告: 这是破坏性操作,Git 历史将永久丢失!") print(f"{'='*60}") try: api = HfApi(token=token) api.super_squash_history( repo_id=repo_id, repo_type=repo_type, commit_message="Super squash: reclaim storage after backup cleanup", ) print("✓ 超级压缩完成,存储空间已释放") print(" 注意: 存储配额变化可能在 36 小时内生效") return True except Exception as e: print(f"✗ 超级压缩失败: {e}", file=sys.stderr) return False def main(): # 检查并获取 HF token(支持账号切换) hf_token = get_hf_token_interactive() if not hf_token: sys.exit(1) parser = argparse.ArgumentParser( description="查找 HF Dataset 中分包最多、总包最大、时间最新的备份 meta.json" ) parser.add_argument("dataset_repo", help="HF Dataset repo ID") parser.add_argument("--token", default=hf_token, help="HF API token") parser.add_argument("--prefix", default="backups", help="备份路径前缀(默认: backups)") parser.add_argument( "--repo-type", default="dataset", choices=["dataset", "model", "space"], help="仓库类型(默认: dataset)", ) parser.add_argument( "--verbose", "-v", action="store_true", help="列出所有备份的统计信息" ) parser.add_argument( "--no-size", action="store_true", help="不按容量排序(跳过下载 meta.json)", ) parser.add_argument( "--delete-before", action="store_true", help="找到最佳备份后,删除所有比它更旧的备份", ) parser.add_argument( "--yes", "-y", action="store_true", help="自动确认删除(跳过确认提示)", ) parser.add_argument( "--no-super-squash", action="store_true", help="删除后不执行超级压缩(存储空间将在36小时后自动释放)", ) args = parser.parse_args() # ---- 1. 列出所有备份 ---- backups = list_backups( repo_id=args.dataset_repo, token=args.token, repo_type=args.repo_type, path_prefix=args.prefix, ) if not backups: print(f"[INFO] 在 {args.dataset_repo} 中未找到任何备份文件") return # ---- 2. verbose 表格 ---- if args.verbose: print(f"\n{'='*90}") print(f"Dataset: {args.dataset_repo}") print(f"{'='*90}") print(f"{'时间戳':<20} {'文件数':>8} {'分包数':>8} {'有meta':>8} {'评分':>8} meta.json") print(f"{'-'*20} {'-'*8} {'-'*8} {'-'*8} {'-'*8} {'-'*40}") max_parts_v = max(b["part_count"] for b in backups) if backups else 1 total_v = len(backups) for rank_v, b in enumerate(backups): meta_flag = "✓" if b["has_meta"] else "✗" meta_file = b["meta_path"] or "-" score_v = ( _score_candidate(b["part_count"], max_parts_v, 0, 1, rank_v, total_v) if b["has_meta"] else 0 ) print( f"{b['timestamp']:<20} {b['total_files']:>8} {b['part_count']:>8} " f"{meta_flag:>8} {score_v:>8.4f} {meta_file}" ) print() # ---- 3. 查找最佳备份 ---- best, size_info = find_best_backup( repo_id=args.dataset_repo, token=args.token, repo_type=args.repo_type, path_prefix=args.prefix, use_size=not args.no_size, ) if not best: print("[RESULT] 未找到符合条件的备份") return # ---- 4. 输出结果 ---- print(f"\n{'='*80}") print(f"结果:") print(f" Dataset: {args.dataset_repo}") if size_info and "score" in size_info: print(f" 综合评分: {size_info['score']:.4f} / 3.0000") print(f" meta.json: {best['meta_path']}") print(f" 备份时间: {best['timestamp']}") print(f" 分包数量: {best['part_count']}") print(f" 总文件数: {best['total_files']}") if size_info: print(f" 总容量: {size_info['archive_size_human']} ({size_info['archive_size']:,} bytes)") print(f" 文件数量(meta): {size_info.get('file_count', 'N/A')}") if size_info.get("is_split"): print(f" 是否分片: ✓ (共 {len(size_info.get('volumes', []))} 个分片)") else: print(f" 是否分片: ✗ (单文件)") else: print(f" 总容量: (未获取)") print(f"{'='*80}\n") # ---- 5. 删除旧备份(可选) ---- if not args.delete_before: return # 收集比最佳备份更旧的所有备份 older = [b for b in backups if b["timestamp"] < best["timestamp"]] if not older: print("[INFO] 没有比当前备份更旧的备份需要清理") return total_older_files = sum(b["total_files"] for b in older) print(f"发现 {len(older)} 个旧备份(共 {total_older_files} 个文件):") for b in older: print(f" - {b['timestamp']} ({b['total_files']} 个文件)") # 确认 if not args.yes: print() ans = input(f"确认删除这 {len(older)} 个旧备份?(yes/NO): ").strip().lower() if ans not in ("yes", "y"): print("[INFO] 已取消删除") return # 执行删除 print(f"\n正在删除 {len(older)} 个旧备份...") api = HfApi(token=args.token) # 按时间戳从旧到新排序,方便观察进度 older_ts = sorted( [b["timestamp"] for b in older], reverse=False, # 从最旧的开始删 ) deleted = _delete_backups( api=api, repo_id=args.dataset_repo, repo_type=args.repo_type, prefix=args.prefix, timestamps_to_delete=older_ts, ) print(f"\n结果: 成功删除 {deleted}/{len(older_ts)} 个旧备份") # ---- 6. 超级压缩(默认启用,真正释放存储空间) ---- if not args.no_super_squash and deleted > 0: print() do_squash = False if args.yes: do_squash = True else: ans = input("是否执行超级压缩以真正释放存储空间?(yes/NO): ").strip().lower() do_squash = ans in ("yes", "y") if do_squash: success = super_squash_history( repo_id=args.dataset_repo, token=args.token, repo_type=args.repo_type, ) if not success: print("\n[WARNING] 超级压缩失败,但旧备份已删除") print(" 提示: 您可以稍后手动执行超级压缩或联系 HF 支持") else: print("[INFO] 已跳过超级压缩") print(" 提示: 存储空间将在 36 小时后自动释放") print(" 如需立即释放,请使用 --super-squash 选项并确认") if __name__ == "__main__": main()