page / scripts /find-largest-backup.py
GGSheng's picture
feat: deploy to hf space | model=claude-22327b
31b91d7 verified
Raw
History Blame Contribute Delete
17.3 kB
#!/usr/bin/env python3
"""根据指定的 HF Dataset,查找与关系(分包最多 + 总包最大 + 时间最新)的备份 meta.json,
并可选择将匹配备份之前的旧备份全部删除。
综合评分: score = parts/max_parts + size/max_size + time_rank/rank_total
三个维度等权归一化,范围 0~3,最高分最优。
用法:
python scripts/find-largest-backup.py <dataset_repo> [options]
python scripts/find-largest-backup.py <dataset_repo> --delete-before # 查找并清理旧备份
示例:
python scripts/find-largest-backup.py my-space/my-space-backup
python scripts/find-largest-backup.py my-space/my-space-backup -v
python scripts/find-largest-backup.py my-space/my-space-backup --delete-before
python scripts/find-largest-backup.py my-space/my-space-backup --delete-before --yes
"""
import argparse
import json
import os
import re
import sys
import time
from collections import defaultdict
from pathlib import Path
# 引入 HF 共享 helper(_hf_token.py 同目录)
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from _hf_token import get_hf_token_interactive # noqa: E402
try:
from huggingface_hub import HfApi, hf_hub_download
except ModuleNotFoundError:
print("Error: 'huggingface_hub' module not found.")
print()
print("Install it with:")
print(" uv add 'huggingface_hub[cli]'")
print()
print("Or with pip:")
print(" pip install 'huggingface_hub[cli]'")
sys.exit(1)
# 匹配备份归档文件名(压缩包/分片),提取时间戳
_ARCHIVE_NAME_RE = re.compile(
r"^openclaw-backup-(\d{8}-\d{6})\.tar\.gz(?:\.enc)?(?:\.part-[a-z]{2})?$"
)
# 匹配 .meta.json 文件,提取时间戳
_META_NAME_RE = re.compile(
r"^openclaw-backup-(\d{8}-\d{6})\.tar\.gz(?:\.enc)?\.meta\.json$"
)
# 每批删除的日期模式数
_BATCH_SIZE = 100
def _human_size(bytes_: int) -> str:
for unit in ("B", "KB", "MB", "GB", "TB"):
if abs(bytes_) < 1024:
return f"{bytes_:.2f} {unit}"
bytes_ /= 1024
return f"{bytes_:.2f} PB"
def _download_meta(
repo_id: str,
meta_path: str,
token: str | None = None,
repo_type: str = "dataset",
) -> dict | None:
"""下载远程 meta.json 并返回解析后的 dict,失败返回 None。"""
try:
local_path = hf_hub_download(
repo_id=repo_id,
filename=meta_path,
repo_type=repo_type,
token=token,
)
with open(local_path) as f:
meta = json.load(f)
Path(local_path).unlink(missing_ok=True)
return meta
except Exception as e:
print(f" [WARN] 下载 {meta_path} 失败: {e}", file=sys.stderr)
return None
def list_backups(
repo_id: str,
token: str | None = None,
repo_type: str = "dataset",
path_prefix: str = "backups",
) -> list[dict]:
"""列出 dataset 中所有备份(仅文件列表扫描,不下载任何内容)。
返回按时间戳降序的列表,每个元素:
{
"timestamp": "20260430-223004",
"files": [...],
"part_count": 3,
"total_files": 4,
"has_meta": True,
"meta_path": "backups/....meta.json",
}
"""
api = HfApi(token=token)
try:
all_files = api.list_repo_files(repo_id=repo_id, repo_type=repo_type)
except Exception as e:
print(f"[ERROR] 无法列出文件: {e}", file=sys.stderr)
sys.exit(1)
prefix = path_prefix.strip("/") if path_prefix else ""
prefix_slash = f"{prefix}/" if prefix else ""
top_files = []
for path in all_files:
if not isinstance(path, str):
continue
if prefix_slash:
if not path.startswith(prefix_slash):
continue
relative = path[len(prefix_slash) :]
else:
relative = path
if "/" in relative:
continue
top_files.append((path, relative))
backups: dict[str, dict] = defaultdict(
lambda: {"files": [], "part_count": 0, "has_meta": False, "meta_path": None}
)
for full_path, relative in top_files:
match = _ARCHIVE_NAME_RE.fullmatch(relative)
if match:
ts = match.group(1)
entry = backups[ts]
entry["files"].append(full_path)
if ".part-" in relative:
entry["part_count"] += 1
continue
meta_match = _META_NAME_RE.fullmatch(relative)
if meta_match:
ts = meta_match.group(1)
entry = backups[ts]
entry["files"].append(full_path)
entry["has_meta"] = True
entry["meta_path"] = full_path
continue
result = [
{
"timestamp": ts,
"files": d["files"],
"part_count": d["part_count"],
"total_files": len(d["files"]),
"has_meta": d["has_meta"],
"meta_path": d["meta_path"],
}
for ts, d in backups.items()
]
result.sort(key=lambda b: b["timestamp"], reverse=True)
return result
def _score_candidate(
parts: int, max_parts: int, size: int, max_size: int, ts_rank: int, total: int
) -> float:
"""综合评分:三个维度等权归一化后求和,范围 0~3。"""
p = parts / max_parts if max_parts else 0
s = size / max_size if max_size else 0
t = (total - ts_rank) / total if total > 1 else 1.0
return round(p + s + t, 4)
def find_best_backup(
repo_id: str,
token: str | None = None,
repo_type: str = "dataset",
path_prefix: str = "backups",
use_size: bool = True,
) -> tuple[dict | None, dict | None]:
"""找到分包最多 + 总包最大 + 时间最新的备份(三个维度与关系)。"""
backups = list_backups(
repo_id=repo_id,
token=token,
repo_type=repo_type,
path_prefix=path_prefix,
)
if not backups:
print(f"[INFO] 在 {repo_id} 中未找到任何备份文件", file=sys.stderr)
return None, None
valid = [b for b in backups if b["has_meta"]]
if not valid:
print(f"[WARN] 所有备份均缺少 meta.json 文件", file=sys.stderr)
return None, None
max_parts = max(b["part_count"] for b in valid)
if not use_size:
total = len(valid)
scored = []
for rank, b in enumerate(valid):
score = _score_candidate(b["part_count"], max_parts, 0, 1, rank, total)
scored.append((score, b))
scored.sort(key=lambda x: x[0], reverse=True)
return scored[0][1], None
print(f"正在获取 {len(valid)} 个备份的容量信息(下载 meta.json)...")
enriched = []
for b in valid:
meta = _download_meta(repo_id, b["meta_path"], token, repo_type)
size = (meta or {}).get("archive_size", 0) or 0
enriched.append({**b, "archive_size": size})
max_size = max(e["archive_size"] for e in enriched) if enriched else 1
total = len(enriched)
scored = []
for rank, e in enumerate(enriched):
score = _score_candidate(
e["part_count"], max_parts,
e["archive_size"], max_size,
rank, total,
)
scored.append((score, e))
scored.sort(key=lambda x: x[0], reverse=True)
best_score, best = scored[0]
print(f" 综合评分最高的备份: score={best_score:.4f}")
print(f" (分包={best['part_count']}/{max_parts}, "
f"容量={_human_size(best['archive_size'])}, "
f"时间={best['timestamp']})")
size_info = {
"archive_size": best["archive_size"],
"archive_size_human": _human_size(best["archive_size"]),
"score": best_score,
}
meta = _download_meta(repo_id, best["meta_path"], token, repo_type)
if meta:
size_info["volumes"] = meta.get("volumes", [])
size_info["is_split"] = meta.get("is_split", False)
size_info["file_count"] = meta.get("file_count", 0)
return best, size_info
def _delete_backups(
api: HfApi,
repo_id: str,
repo_type: str,
prefix: str,
timestamps_to_delete: list[str],
) -> int:
"""批量删除指定时间戳的备份(使用通配符模式 + api.delete_files)。
返回成功删除的时间戳数量。
"""
prefix_slash = f"{prefix}/" if prefix else ""
total = len(timestamps_to_delete)
total_batches = (total + _BATCH_SIZE - 1) // _BATCH_SIZE
deleted_count = 0
for batch_idx in range(total_batches):
start = batch_idx * _BATCH_SIZE
end = min(start + _BATCH_SIZE, total)
batch_ts = timestamps_to_delete[start:end]
# 每个时间戳生成一个通配符模式
patterns = [
f"{prefix_slash}openclaw-backup-{ts}*" for ts in batch_ts
]
try:
api.delete_files(
repo_id=repo_id,
repo_type=repo_type,
delete_patterns=patterns,
commit_message=(
f"backup cleanup: delete {len(batch_ts)} old backup(s) "
f"(batch {batch_idx + 1}/{total_batches})"
),
)
deleted_count += len(batch_ts)
print(f" ✓ 第 {batch_idx + 1}/{total_batches} 批: "
f"删除了 {len(batch_ts)} 个旧备份")
# 限速保护
if batch_idx + 1 < total_batches:
time.sleep(1)
except Exception as e:
print(f" ✗ 第 {batch_idx + 1}/{total_batches} 批失败: {e}", file=sys.stderr)
print(f" 已删除 {deleted_count}/{total} 个备份后中断")
break
return deleted_count
def super_squash_history(
repo_id: str,
token: str | None = None,
repo_type: str = "dataset",
) -> bool:
"""对仓库执行超级压缩,真正释放已删除文件的存储空间。
超级压缩会将整个 Git 历史压缩成一个提交,删除所有旧 LFS 对象。
这是一个破坏性操作,无法撤销。
"""
print(f"\n{'='*60}")
print(f"执行超级压缩(super_squash_history)...")
print(f" 警告: 这是破坏性操作,Git 历史将永久丢失!")
print(f"{'='*60}")
try:
api = HfApi(token=token)
api.super_squash_history(
repo_id=repo_id,
repo_type=repo_type,
commit_message="Super squash: reclaim storage after backup cleanup",
)
print("✓ 超级压缩完成,存储空间已释放")
print(" 注意: 存储配额变化可能在 36 小时内生效")
return True
except Exception as e:
print(f"✗ 超级压缩失败: {e}", file=sys.stderr)
return False
def main():
# 检查并获取 HF token(支持账号切换)
hf_token = get_hf_token_interactive()
if not hf_token:
sys.exit(1)
parser = argparse.ArgumentParser(
description="查找 HF Dataset 中分包最多、总包最大、时间最新的备份 meta.json"
)
parser.add_argument("dataset_repo", help="HF Dataset repo ID")
parser.add_argument("--token", default=hf_token, help="HF API token")
parser.add_argument("--prefix", default="backups", help="备份路径前缀(默认: backups)")
parser.add_argument(
"--repo-type",
default="dataset",
choices=["dataset", "model", "space"],
help="仓库类型(默认: dataset)",
)
parser.add_argument(
"--verbose", "-v", action="store_true", help="列出所有备份的统计信息"
)
parser.add_argument(
"--no-size",
action="store_true",
help="不按容量排序(跳过下载 meta.json)",
)
parser.add_argument(
"--delete-before",
action="store_true",
help="找到最佳备份后,删除所有比它更旧的备份",
)
parser.add_argument(
"--yes", "-y",
action="store_true",
help="自动确认删除(跳过确认提示)",
)
parser.add_argument(
"--no-super-squash",
action="store_true",
help="删除后不执行超级压缩(存储空间将在36小时后自动释放)",
)
args = parser.parse_args()
# ---- 1. 列出所有备份 ----
backups = list_backups(
repo_id=args.dataset_repo,
token=args.token,
repo_type=args.repo_type,
path_prefix=args.prefix,
)
if not backups:
print(f"[INFO] 在 {args.dataset_repo} 中未找到任何备份文件")
return
# ---- 2. verbose 表格 ----
if args.verbose:
print(f"\n{'='*90}")
print(f"Dataset: {args.dataset_repo}")
print(f"{'='*90}")
print(f"{'时间戳':<20} {'文件数':>8} {'分包数':>8} {'有meta':>8} {'评分':>8} meta.json")
print(f"{'-'*20} {'-'*8} {'-'*8} {'-'*8} {'-'*8} {'-'*40}")
max_parts_v = max(b["part_count"] for b in backups) if backups else 1
total_v = len(backups)
for rank_v, b in enumerate(backups):
meta_flag = "✓" if b["has_meta"] else "✗"
meta_file = b["meta_path"] or "-"
score_v = (
_score_candidate(b["part_count"], max_parts_v, 0, 1, rank_v, total_v)
if b["has_meta"]
else 0
)
print(
f"{b['timestamp']:<20} {b['total_files']:>8} {b['part_count']:>8} "
f"{meta_flag:>8} {score_v:>8.4f} {meta_file}"
)
print()
# ---- 3. 查找最佳备份 ----
best, size_info = find_best_backup(
repo_id=args.dataset_repo,
token=args.token,
repo_type=args.repo_type,
path_prefix=args.prefix,
use_size=not args.no_size,
)
if not best:
print("[RESULT] 未找到符合条件的备份")
return
# ---- 4. 输出结果 ----
print(f"\n{'='*80}")
print(f"结果:")
print(f" Dataset: {args.dataset_repo}")
if size_info and "score" in size_info:
print(f" 综合评分: {size_info['score']:.4f} / 3.0000")
print(f" meta.json: {best['meta_path']}")
print(f" 备份时间: {best['timestamp']}")
print(f" 分包数量: {best['part_count']}")
print(f" 总文件数: {best['total_files']}")
if size_info:
print(f" 总容量: {size_info['archive_size_human']} ({size_info['archive_size']:,} bytes)")
print(f" 文件数量(meta): {size_info.get('file_count', 'N/A')}")
if size_info.get("is_split"):
print(f" 是否分片: ✓ (共 {len(size_info.get('volumes', []))} 个分片)")
else:
print(f" 是否分片: ✗ (单文件)")
else:
print(f" 总容量: (未获取)")
print(f"{'='*80}\n")
# ---- 5. 删除旧备份(可选) ----
if not args.delete_before:
return
# 收集比最佳备份更旧的所有备份
older = [b for b in backups if b["timestamp"] < best["timestamp"]]
if not older:
print("[INFO] 没有比当前备份更旧的备份需要清理")
return
total_older_files = sum(b["total_files"] for b in older)
print(f"发现 {len(older)} 个旧备份(共 {total_older_files} 个文件):")
for b in older:
print(f" - {b['timestamp']} ({b['total_files']} 个文件)")
# 确认
if not args.yes:
print()
ans = input(f"确认删除这 {len(older)} 个旧备份?(yes/NO): ").strip().lower()
if ans not in ("yes", "y"):
print("[INFO] 已取消删除")
return
# 执行删除
print(f"\n正在删除 {len(older)} 个旧备份...")
api = HfApi(token=args.token)
# 按时间戳从旧到新排序,方便观察进度
older_ts = sorted(
[b["timestamp"] for b in older],
reverse=False, # 从最旧的开始删
)
deleted = _delete_backups(
api=api,
repo_id=args.dataset_repo,
repo_type=args.repo_type,
prefix=args.prefix,
timestamps_to_delete=older_ts,
)
print(f"\n结果: 成功删除 {deleted}/{len(older_ts)} 个旧备份")
# ---- 6. 超级压缩(默认启用,真正释放存储空间) ----
if not args.no_super_squash and deleted > 0:
print()
do_squash = False
if args.yes:
do_squash = True
else:
ans = input("是否执行超级压缩以真正释放存储空间?(yes/NO): ").strip().lower()
do_squash = ans in ("yes", "y")
if do_squash:
success = super_squash_history(
repo_id=args.dataset_repo,
token=args.token,
repo_type=args.repo_type,
)
if not success:
print("\n[WARNING] 超级压缩失败,但旧备份已删除")
print(" 提示: 您可以稍后手动执行超级压缩或联系 HF 支持")
else:
print("[INFO] 已跳过超级压缩")
print(" 提示: 存储空间将在 36 小时后自动释放")
print(" 如需立即释放,请使用 --super-squash 选项并确认")
if __name__ == "__main__":
main()