| |
| """根据指定的 HF Dataset,查找与关系(分包最多 + 总包最大 + 时间最新)的备份 meta.json, |
| 并可选择将匹配备份之前的旧备份全部删除。 |
| |
| 综合评分: score = parts/max_parts + size/max_size + time_rank/rank_total |
| 三个维度等权归一化,范围 0~3,最高分最优。 |
| |
| 用法: |
| python scripts/find-largest-backup.py <dataset_repo> [options] |
| python scripts/find-largest-backup.py <dataset_repo> --delete-before # 查找并清理旧备份 |
| |
| 示例: |
| python scripts/find-largest-backup.py my-space/my-space-backup |
| python scripts/find-largest-backup.py my-space/my-space-backup -v |
| python scripts/find-largest-backup.py my-space/my-space-backup --delete-before |
| python scripts/find-largest-backup.py my-space/my-space-backup --delete-before --yes |
| """ |
|
|
| import argparse |
| import json |
| import os |
| import re |
| import sys |
| import time |
| from collections import defaultdict |
| from pathlib import Path |
|
|
| |
| sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) |
| from _hf_token import get_hf_token_interactive |
|
|
| try: |
| from huggingface_hub import HfApi, hf_hub_download |
| except ModuleNotFoundError: |
| print("Error: 'huggingface_hub' module not found.") |
| print() |
| print("Install it with:") |
| print(" uv add 'huggingface_hub[cli]'") |
| print() |
| print("Or with pip:") |
| print(" pip install 'huggingface_hub[cli]'") |
| sys.exit(1) |
|
|
| |
| _ARCHIVE_NAME_RE = re.compile( |
| r"^openclaw-backup-(\d{8}-\d{6})\.tar\.gz(?:\.enc)?(?:\.part-[a-z]{2})?$" |
| ) |
|
|
| |
| _META_NAME_RE = re.compile( |
| r"^openclaw-backup-(\d{8}-\d{6})\.tar\.gz(?:\.enc)?\.meta\.json$" |
| ) |
|
|
| |
| _BATCH_SIZE = 100 |
|
|
|
|
| def _human_size(bytes_: int) -> str: |
| for unit in ("B", "KB", "MB", "GB", "TB"): |
| if abs(bytes_) < 1024: |
| return f"{bytes_:.2f} {unit}" |
| bytes_ /= 1024 |
| return f"{bytes_:.2f} PB" |
|
|
|
|
| def _download_meta( |
| repo_id: str, |
| meta_path: str, |
| token: str | None = None, |
| repo_type: str = "dataset", |
| ) -> dict | None: |
| """下载远程 meta.json 并返回解析后的 dict,失败返回 None。""" |
| try: |
| local_path = hf_hub_download( |
| repo_id=repo_id, |
| filename=meta_path, |
| repo_type=repo_type, |
| token=token, |
| ) |
| with open(local_path) as f: |
| meta = json.load(f) |
| Path(local_path).unlink(missing_ok=True) |
| return meta |
| except Exception as e: |
| print(f" [WARN] 下载 {meta_path} 失败: {e}", file=sys.stderr) |
| return None |
|
|
|
|
| def list_backups( |
| repo_id: str, |
| token: str | None = None, |
| repo_type: str = "dataset", |
| path_prefix: str = "backups", |
| ) -> list[dict]: |
| """列出 dataset 中所有备份(仅文件列表扫描,不下载任何内容)。 |
| |
| 返回按时间戳降序的列表,每个元素: |
| { |
| "timestamp": "20260430-223004", |
| "files": [...], |
| "part_count": 3, |
| "total_files": 4, |
| "has_meta": True, |
| "meta_path": "backups/....meta.json", |
| } |
| """ |
| api = HfApi(token=token) |
| try: |
| all_files = api.list_repo_files(repo_id=repo_id, repo_type=repo_type) |
| except Exception as e: |
| print(f"[ERROR] 无法列出文件: {e}", file=sys.stderr) |
| sys.exit(1) |
|
|
| prefix = path_prefix.strip("/") if path_prefix else "" |
| prefix_slash = f"{prefix}/" if prefix else "" |
|
|
| top_files = [] |
| for path in all_files: |
| if not isinstance(path, str): |
| continue |
| if prefix_slash: |
| if not path.startswith(prefix_slash): |
| continue |
| relative = path[len(prefix_slash) :] |
| else: |
| relative = path |
| if "/" in relative: |
| continue |
| top_files.append((path, relative)) |
|
|
| backups: dict[str, dict] = defaultdict( |
| lambda: {"files": [], "part_count": 0, "has_meta": False, "meta_path": None} |
| ) |
|
|
| for full_path, relative in top_files: |
| match = _ARCHIVE_NAME_RE.fullmatch(relative) |
| if match: |
| ts = match.group(1) |
| entry = backups[ts] |
| entry["files"].append(full_path) |
| if ".part-" in relative: |
| entry["part_count"] += 1 |
| continue |
|
|
| meta_match = _META_NAME_RE.fullmatch(relative) |
| if meta_match: |
| ts = meta_match.group(1) |
| entry = backups[ts] |
| entry["files"].append(full_path) |
| entry["has_meta"] = True |
| entry["meta_path"] = full_path |
| continue |
|
|
| result = [ |
| { |
| "timestamp": ts, |
| "files": d["files"], |
| "part_count": d["part_count"], |
| "total_files": len(d["files"]), |
| "has_meta": d["has_meta"], |
| "meta_path": d["meta_path"], |
| } |
| for ts, d in backups.items() |
| ] |
| result.sort(key=lambda b: b["timestamp"], reverse=True) |
| return result |
|
|
|
|
| def _score_candidate( |
| parts: int, max_parts: int, size: int, max_size: int, ts_rank: int, total: int |
| ) -> float: |
| """综合评分:三个维度等权归一化后求和,范围 0~3。""" |
| p = parts / max_parts if max_parts else 0 |
| s = size / max_size if max_size else 0 |
| t = (total - ts_rank) / total if total > 1 else 1.0 |
| return round(p + s + t, 4) |
|
|
|
|
| def find_best_backup( |
| repo_id: str, |
| token: str | None = None, |
| repo_type: str = "dataset", |
| path_prefix: str = "backups", |
| use_size: bool = True, |
| ) -> tuple[dict | None, dict | None]: |
| """找到分包最多 + 总包最大 + 时间最新的备份(三个维度与关系)。""" |
| backups = list_backups( |
| repo_id=repo_id, |
| token=token, |
| repo_type=repo_type, |
| path_prefix=path_prefix, |
| ) |
|
|
| if not backups: |
| print(f"[INFO] 在 {repo_id} 中未找到任何备份文件", file=sys.stderr) |
| return None, None |
|
|
| valid = [b for b in backups if b["has_meta"]] |
| if not valid: |
| print(f"[WARN] 所有备份均缺少 meta.json 文件", file=sys.stderr) |
| return None, None |
|
|
| max_parts = max(b["part_count"] for b in valid) |
|
|
| if not use_size: |
| total = len(valid) |
| scored = [] |
| for rank, b in enumerate(valid): |
| score = _score_candidate(b["part_count"], max_parts, 0, 1, rank, total) |
| scored.append((score, b)) |
| scored.sort(key=lambda x: x[0], reverse=True) |
| return scored[0][1], None |
|
|
| print(f"正在获取 {len(valid)} 个备份的容量信息(下载 meta.json)...") |
| enriched = [] |
| for b in valid: |
| meta = _download_meta(repo_id, b["meta_path"], token, repo_type) |
| size = (meta or {}).get("archive_size", 0) or 0 |
| enriched.append({**b, "archive_size": size}) |
|
|
| max_size = max(e["archive_size"] for e in enriched) if enriched else 1 |
| total = len(enriched) |
|
|
| scored = [] |
| for rank, e in enumerate(enriched): |
| score = _score_candidate( |
| e["part_count"], max_parts, |
| e["archive_size"], max_size, |
| rank, total, |
| ) |
| scored.append((score, e)) |
|
|
| scored.sort(key=lambda x: x[0], reverse=True) |
| best_score, best = scored[0] |
|
|
| print(f" 综合评分最高的备份: score={best_score:.4f}") |
| print(f" (分包={best['part_count']}/{max_parts}, " |
| f"容量={_human_size(best['archive_size'])}, " |
| f"时间={best['timestamp']})") |
|
|
| size_info = { |
| "archive_size": best["archive_size"], |
| "archive_size_human": _human_size(best["archive_size"]), |
| "score": best_score, |
| } |
| meta = _download_meta(repo_id, best["meta_path"], token, repo_type) |
| if meta: |
| size_info["volumes"] = meta.get("volumes", []) |
| size_info["is_split"] = meta.get("is_split", False) |
| size_info["file_count"] = meta.get("file_count", 0) |
|
|
| return best, size_info |
|
|
|
|
| def _delete_backups( |
| api: HfApi, |
| repo_id: str, |
| repo_type: str, |
| prefix: str, |
| timestamps_to_delete: list[str], |
| ) -> int: |
| """批量删除指定时间戳的备份(使用通配符模式 + api.delete_files)。 |
| |
| 返回成功删除的时间戳数量。 |
| """ |
| prefix_slash = f"{prefix}/" if prefix else "" |
| total = len(timestamps_to_delete) |
| total_batches = (total + _BATCH_SIZE - 1) // _BATCH_SIZE |
| deleted_count = 0 |
|
|
| for batch_idx in range(total_batches): |
| start = batch_idx * _BATCH_SIZE |
| end = min(start + _BATCH_SIZE, total) |
| batch_ts = timestamps_to_delete[start:end] |
|
|
| |
| patterns = [ |
| f"{prefix_slash}openclaw-backup-{ts}*" for ts in batch_ts |
| ] |
|
|
| try: |
| api.delete_files( |
| repo_id=repo_id, |
| repo_type=repo_type, |
| delete_patterns=patterns, |
| commit_message=( |
| f"backup cleanup: delete {len(batch_ts)} old backup(s) " |
| f"(batch {batch_idx + 1}/{total_batches})" |
| ), |
| ) |
| deleted_count += len(batch_ts) |
| print(f" ✓ 第 {batch_idx + 1}/{total_batches} 批: " |
| f"删除了 {len(batch_ts)} 个旧备份") |
| |
| if batch_idx + 1 < total_batches: |
| time.sleep(1) |
| except Exception as e: |
| print(f" ✗ 第 {batch_idx + 1}/{total_batches} 批失败: {e}", file=sys.stderr) |
| print(f" 已删除 {deleted_count}/{total} 个备份后中断") |
| break |
|
|
| return deleted_count |
|
|
|
|
| def super_squash_history( |
| repo_id: str, |
| token: str | None = None, |
| repo_type: str = "dataset", |
| ) -> bool: |
| """对仓库执行超级压缩,真正释放已删除文件的存储空间。 |
| |
| 超级压缩会将整个 Git 历史压缩成一个提交,删除所有旧 LFS 对象。 |
| 这是一个破坏性操作,无法撤销。 |
| """ |
| print(f"\n{'='*60}") |
| print(f"执行超级压缩(super_squash_history)...") |
| print(f" 警告: 这是破坏性操作,Git 历史将永久丢失!") |
| print(f"{'='*60}") |
|
|
| try: |
| api = HfApi(token=token) |
| api.super_squash_history( |
| repo_id=repo_id, |
| repo_type=repo_type, |
| commit_message="Super squash: reclaim storage after backup cleanup", |
| ) |
| print("✓ 超级压缩完成,存储空间已释放") |
| print(" 注意: 存储配额变化可能在 36 小时内生效") |
| return True |
| except Exception as e: |
| print(f"✗ 超级压缩失败: {e}", file=sys.stderr) |
| return False |
|
|
|
|
| def main(): |
| |
| hf_token = get_hf_token_interactive() |
| if not hf_token: |
| sys.exit(1) |
|
|
| parser = argparse.ArgumentParser( |
| description="查找 HF Dataset 中分包最多、总包最大、时间最新的备份 meta.json" |
| ) |
| parser.add_argument("dataset_repo", help="HF Dataset repo ID") |
| parser.add_argument("--token", default=hf_token, help="HF API token") |
| parser.add_argument("--prefix", default="backups", help="备份路径前缀(默认: backups)") |
| parser.add_argument( |
| "--repo-type", |
| default="dataset", |
| choices=["dataset", "model", "space"], |
| help="仓库类型(默认: dataset)", |
| ) |
| parser.add_argument( |
| "--verbose", "-v", action="store_true", help="列出所有备份的统计信息" |
| ) |
| parser.add_argument( |
| "--no-size", |
| action="store_true", |
| help="不按容量排序(跳过下载 meta.json)", |
| ) |
| parser.add_argument( |
| "--delete-before", |
| action="store_true", |
| help="找到最佳备份后,删除所有比它更旧的备份", |
| ) |
| parser.add_argument( |
| "--yes", "-y", |
| action="store_true", |
| help="自动确认删除(跳过确认提示)", |
| ) |
| parser.add_argument( |
| "--no-super-squash", |
| action="store_true", |
| help="删除后不执行超级压缩(存储空间将在36小时后自动释放)", |
| ) |
|
|
| args = parser.parse_args() |
|
|
| |
| backups = list_backups( |
| repo_id=args.dataset_repo, |
| token=args.token, |
| repo_type=args.repo_type, |
| path_prefix=args.prefix, |
| ) |
| if not backups: |
| print(f"[INFO] 在 {args.dataset_repo} 中未找到任何备份文件") |
| return |
|
|
| |
| if args.verbose: |
| print(f"\n{'='*90}") |
| print(f"Dataset: {args.dataset_repo}") |
| print(f"{'='*90}") |
| print(f"{'时间戳':<20} {'文件数':>8} {'分包数':>8} {'有meta':>8} {'评分':>8} meta.json") |
| print(f"{'-'*20} {'-'*8} {'-'*8} {'-'*8} {'-'*8} {'-'*40}") |
| max_parts_v = max(b["part_count"] for b in backups) if backups else 1 |
| total_v = len(backups) |
| for rank_v, b in enumerate(backups): |
| meta_flag = "✓" if b["has_meta"] else "✗" |
| meta_file = b["meta_path"] or "-" |
| score_v = ( |
| _score_candidate(b["part_count"], max_parts_v, 0, 1, rank_v, total_v) |
| if b["has_meta"] |
| else 0 |
| ) |
| print( |
| f"{b['timestamp']:<20} {b['total_files']:>8} {b['part_count']:>8} " |
| f"{meta_flag:>8} {score_v:>8.4f} {meta_file}" |
| ) |
| print() |
|
|
| |
| best, size_info = find_best_backup( |
| repo_id=args.dataset_repo, |
| token=args.token, |
| repo_type=args.repo_type, |
| path_prefix=args.prefix, |
| use_size=not args.no_size, |
| ) |
|
|
| if not best: |
| print("[RESULT] 未找到符合条件的备份") |
| return |
|
|
| |
| print(f"\n{'='*80}") |
| print(f"结果:") |
| print(f" Dataset: {args.dataset_repo}") |
| if size_info and "score" in size_info: |
| print(f" 综合评分: {size_info['score']:.4f} / 3.0000") |
| print(f" meta.json: {best['meta_path']}") |
| print(f" 备份时间: {best['timestamp']}") |
| print(f" 分包数量: {best['part_count']}") |
| print(f" 总文件数: {best['total_files']}") |
| if size_info: |
| print(f" 总容量: {size_info['archive_size_human']} ({size_info['archive_size']:,} bytes)") |
| print(f" 文件数量(meta): {size_info.get('file_count', 'N/A')}") |
| if size_info.get("is_split"): |
| print(f" 是否分片: ✓ (共 {len(size_info.get('volumes', []))} 个分片)") |
| else: |
| print(f" 是否分片: ✗ (单文件)") |
| else: |
| print(f" 总容量: (未获取)") |
| print(f"{'='*80}\n") |
|
|
| |
| if not args.delete_before: |
| return |
|
|
| |
| older = [b for b in backups if b["timestamp"] < best["timestamp"]] |
| if not older: |
| print("[INFO] 没有比当前备份更旧的备份需要清理") |
| return |
|
|
| total_older_files = sum(b["total_files"] for b in older) |
| print(f"发现 {len(older)} 个旧备份(共 {total_older_files} 个文件):") |
| for b in older: |
| print(f" - {b['timestamp']} ({b['total_files']} 个文件)") |
|
|
| |
| if not args.yes: |
| print() |
| ans = input(f"确认删除这 {len(older)} 个旧备份?(yes/NO): ").strip().lower() |
| if ans not in ("yes", "y"): |
| print("[INFO] 已取消删除") |
| return |
|
|
| |
| print(f"\n正在删除 {len(older)} 个旧备份...") |
| api = HfApi(token=args.token) |
|
|
| |
| older_ts = sorted( |
| [b["timestamp"] for b in older], |
| reverse=False, |
| ) |
|
|
| deleted = _delete_backups( |
| api=api, |
| repo_id=args.dataset_repo, |
| repo_type=args.repo_type, |
| prefix=args.prefix, |
| timestamps_to_delete=older_ts, |
| ) |
|
|
| print(f"\n结果: 成功删除 {deleted}/{len(older_ts)} 个旧备份") |
|
|
| |
| if not args.no_super_squash and deleted > 0: |
| print() |
| do_squash = False |
| if args.yes: |
| do_squash = True |
| else: |
| ans = input("是否执行超级压缩以真正释放存储空间?(yes/NO): ").strip().lower() |
| do_squash = ans in ("yes", "y") |
|
|
| if do_squash: |
| success = super_squash_history( |
| repo_id=args.dataset_repo, |
| token=args.token, |
| repo_type=args.repo_type, |
| ) |
| if not success: |
| print("\n[WARNING] 超级压缩失败,但旧备份已删除") |
| print(" 提示: 您可以稍后手动执行超级压缩或联系 HF 支持") |
| else: |
| print("[INFO] 已跳过超级压缩") |
| print(" 提示: 存储空间将在 36 小时后自动释放") |
| print(" 如需立即释放,请使用 --super-squash 选项并确认") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|