File size: 5,619 Bytes
196e6e0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 | """提交 **CPU Basic** Job:把 Hub 上 Cosmos(或其它源)服务端复制到你的 Bucket,并尝试解压 tar。
- 计费:见 https://huggingface.co/docs/hub/jobs-pricing(CPU Basic 约 \\$0.01/ 小时量级,以官网为准)。
- 默认挂载:代码 ``fuzirui/WJAD``、可写 Bucket;超时默认 48h(大仓库复制可能很久)。
- 须 ``hf auth login``;NVIDIA 数据集须在网页接受条款。
用法::
python scripts/push_cpu_ingest_job.py --bucket fuzirui/wjad-cosmos-data
python scripts/push_cpu_ingest_job.py --bucket fuzirui/wjad-cosmos-data --follow
python scripts/push_cpu_ingest_job.py --bucket fuzirui/x --source 'hf://datasets/foo/bar/'
"""
from __future__ import annotations
import argparse
import os
import sys
from huggingface_hub import HfApi, Volume, create_bucket
try:
from huggingface_hub.cli._cli_utils import parse_env_map
except Exception: # pragma: no cover
parse_env_map = None
DEFAULT_CODE_REPO = "fuzirui/WJAD"
DEFAULT_BUCKET = "fuzirui/WJAD"
DEFAULT_SOURCE = "hf://datasets/nvidia/PhysicalAI-Autonomous-Vehicle-Cosmos-Drive-Dreams/"
DEFAULT_DEST_PREFIX = "cosmos_hub_mirror"
DEFAULT_IMAGE = "python:3.12"
DEFAULT_TIMEOUT = "7d"
def _secrets_for_job() -> dict | None:
if parse_env_map is not None:
try:
m = parse_env_map(["HF_TOKEN"])
if m.get("HF_TOKEN"):
return m
except Exception:
pass
t = os.environ.get("HF_TOKEN") or os.environ.get("HUGGING_FACE_HUB_TOKEN")
return {"HF_TOKEN": t} if t else None
def main() -> None:
parser = argparse.ArgumentParser(description="HF Jobs:CPU 拉取 Hub → Bucket + 解压")
parser.add_argument("--bucket", default=DEFAULT_BUCKET, help="目标 Storage Bucket id(须已创建或加 --ensure-bucket)")
parser.add_argument("--code-repo", default=DEFAULT_CODE_REPO, help="含 ingest 脚本的 Hub model/space id")
parser.add_argument("--code-type", default="model", choices=("model", "space", "dataset"))
parser.add_argument("--source", default=DEFAULT_SOURCE, help="hf:// 源目录")
parser.add_argument("--dest-prefix", default=DEFAULT_DEST_PREFIX, help="bucket 内子路径")
parser.add_argument(
"--skip-create-bucket",
action="store_true",
help="不在本机预先 create_bucket(bucket 必须已存在,否则挂载失败)",
)
parser.add_argument(
"--no-extract",
action="store_true",
help="只做 copy_files,不解压 tar",
)
parser.add_argument(
"--max-tars",
type=int,
default=None,
help="传给 ingest_hub_to_bucket.py --max-tars",
)
parser.add_argument(
"--extract-out-prefix",
default=None,
metavar="NAME",
help="解压输出子路径(默认 {dest-prefix}_unpacked)",
)
parser.add_argument(
"--extract-beside-tar",
action="store_true",
help="旧行为:在每条 tar 旁解压为 _extracted",
)
parser.add_argument("--image", default=DEFAULT_IMAGE)
parser.add_argument("--timeout", default=DEFAULT_TIMEOUT)
parser.add_argument("--follow", action="store_true")
parser.add_argument("--no-secrets", action="store_true")
args = parser.parse_args()
bucket_mount = "/mnt/cosmos"
code_mount = "/workspace"
max_tars = ""
if args.max_tars is not None:
max_tars = f" --max-tars {args.max_tars}"
extract_flag = "" if args.no_extract else " --extract-tars"
extract_beside = " --extract-beside-tar" if args.extract_beside_tar else ""
out_prefix = ""
if args.extract_out_prefix:
out_prefix = f" --extract-out-prefix '{args.extract_out_prefix}'"
script = f"""set -euo pipefail
Eph="{bucket_mount}/.wjad_ephemeral"
mkdir -p "$Eph/tmp" "$Eph/hf_home/hub" "$Eph/xdg_cache"
export TMPDIR="$Eph/tmp"
export TMP="$TMPDIR" TEMP="$TMPDIR"
export HF_HOME="$Eph/hf_home"
export HF_HUB_CACHE="$HF_HOME/hub"
export XDG_CACHE_HOME="$Eph/xdg_cache"
pip install --root-user-action=ignore --no-cache-dir 'huggingface_hub>=0.30'
python {code_mount}/scripts/ingest_hub_to_bucket.py \\
--bucket '{args.bucket}' \\
--source '{args.source}' \\
--dest-prefix '{args.dest_prefix}' \\
--bucket-mount '{bucket_mount}'{extract_flag}{max_tars}{out_prefix}{extract_beside}
"""
secrets = None if args.no_secrets else _secrets_for_job()
if secrets is None and not args.no_secrets:
print("[push_cpu_ingest] 警告: 无 HF_TOKEN,gated 数据会失败。", file=sys.stderr)
if not args.skip_create_bucket:
create_bucket(args.bucket, exist_ok=True)
print(f"[push_cpu_ingest] bucket 已确保存在(或已存在): {args.bucket}")
volumes = [
Volume(type=args.code_type, source=args.code_repo, mount_path=code_mount),
Volume(type="bucket", source=args.bucket, mount_path=bucket_mount),
]
api = HfApi()
job = api.run_job(
image=args.image,
command=["bash", "-lc", script],
flavor="cpu-basic",
volumes=volumes,
secrets=secrets,
timeout=args.timeout,
)
print(f"[push_cpu_ingest] Job ID: {job.id}")
print(f"[push_cpu_ingest] URL: {job.url}")
if args.follow:
for line in api.fetch_job_logs(job_id=job.id, namespace=job.owner.name, follow=True):
print(line, end="" if str(line).endswith("\n") else "\n")
if __name__ == "__main__":
main()
|