| """提交 **CPU Basic** Job:把 Hub 上 Cosmos(或其它源)服务端复制到你的 Bucket,并尝试解压 tar。
|
|
|
| - 计费:见 https://huggingface.co/docs/hub/jobs-pricing(CPU Basic 约 \\$0.01/ 小时量级,以官网为准)。
|
| - 默认挂载:代码 ``fuzirui/WJAD``、可写 Bucket;超时默认 48h(大仓库复制可能很久)。
|
| - 须 ``hf auth login``;NVIDIA 数据集须在网页接受条款。
|
|
|
| 用法::
|
|
|
| python scripts/push_cpu_ingest_job.py --bucket fuzirui/wjad-cosmos-data
|
| python scripts/push_cpu_ingest_job.py --bucket fuzirui/wjad-cosmos-data --follow
|
| python scripts/push_cpu_ingest_job.py --bucket fuzirui/x --source 'hf://datasets/foo/bar/'
|
| """
|
|
|
| from __future__ import annotations
|
|
|
| import argparse
|
| import os
|
| import sys
|
|
|
| from huggingface_hub import HfApi, Volume, create_bucket
|
|
|
| try:
|
| from huggingface_hub.cli._cli_utils import parse_env_map
|
| except Exception:
|
| parse_env_map = None
|
|
|
| DEFAULT_CODE_REPO = "fuzirui/WJAD"
|
| DEFAULT_BUCKET = "fuzirui/WJAD"
|
| DEFAULT_SOURCE = "hf://datasets/nvidia/PhysicalAI-Autonomous-Vehicle-Cosmos-Drive-Dreams/"
|
| DEFAULT_DEST_PREFIX = "cosmos_hub_mirror"
|
| DEFAULT_IMAGE = "python:3.12"
|
| DEFAULT_TIMEOUT = "7d"
|
|
|
|
|
| def _secrets_for_job() -> dict | None:
|
| if parse_env_map is not None:
|
| try:
|
| m = parse_env_map(["HF_TOKEN"])
|
| if m.get("HF_TOKEN"):
|
| return m
|
| except Exception:
|
| pass
|
| t = os.environ.get("HF_TOKEN") or os.environ.get("HUGGING_FACE_HUB_TOKEN")
|
| return {"HF_TOKEN": t} if t else None
|
|
|
|
|
| def main() -> None:
|
| parser = argparse.ArgumentParser(description="HF Jobs:CPU 拉取 Hub → Bucket + 解压")
|
| parser.add_argument("--bucket", default=DEFAULT_BUCKET, help="目标 Storage Bucket id(须已创建或加 --ensure-bucket)")
|
| parser.add_argument("--code-repo", default=DEFAULT_CODE_REPO, help="含 ingest 脚本的 Hub model/space id")
|
| parser.add_argument("--code-type", default="model", choices=("model", "space", "dataset"))
|
| parser.add_argument("--source", default=DEFAULT_SOURCE, help="hf:// 源目录")
|
| parser.add_argument("--dest-prefix", default=DEFAULT_DEST_PREFIX, help="bucket 内子路径")
|
| parser.add_argument(
|
| "--skip-create-bucket",
|
| action="store_true",
|
| help="不在本机预先 create_bucket(bucket 必须已存在,否则挂载失败)",
|
| )
|
| parser.add_argument(
|
| "--no-extract",
|
| action="store_true",
|
| help="只做 copy_files,不解压 tar",
|
| )
|
| parser.add_argument(
|
| "--max-tars",
|
| type=int,
|
| default=None,
|
| help="传给 ingest_hub_to_bucket.py --max-tars",
|
| )
|
| parser.add_argument(
|
| "--extract-out-prefix",
|
| default=None,
|
| metavar="NAME",
|
| help="解压输出子路径(默认 {dest-prefix}_unpacked)",
|
| )
|
| parser.add_argument(
|
| "--extract-beside-tar",
|
| action="store_true",
|
| help="旧行为:在每条 tar 旁解压为 _extracted",
|
| )
|
| parser.add_argument("--image", default=DEFAULT_IMAGE)
|
| parser.add_argument("--timeout", default=DEFAULT_TIMEOUT)
|
| parser.add_argument("--follow", action="store_true")
|
| parser.add_argument("--no-secrets", action="store_true")
|
| args = parser.parse_args()
|
|
|
| bucket_mount = "/mnt/cosmos"
|
| code_mount = "/workspace"
|
|
|
| max_tars = ""
|
| if args.max_tars is not None:
|
| max_tars = f" --max-tars {args.max_tars}"
|
|
|
| extract_flag = "" if args.no_extract else " --extract-tars"
|
|
|
| extract_beside = " --extract-beside-tar" if args.extract_beside_tar else ""
|
| out_prefix = ""
|
| if args.extract_out_prefix:
|
| out_prefix = f" --extract-out-prefix '{args.extract_out_prefix}'"
|
|
|
| script = f"""set -euo pipefail
|
| Eph="{bucket_mount}/.wjad_ephemeral"
|
| mkdir -p "$Eph/tmp" "$Eph/hf_home/hub" "$Eph/xdg_cache"
|
| export TMPDIR="$Eph/tmp"
|
| export TMP="$TMPDIR" TEMP="$TMPDIR"
|
| export HF_HOME="$Eph/hf_home"
|
| export HF_HUB_CACHE="$HF_HOME/hub"
|
| export XDG_CACHE_HOME="$Eph/xdg_cache"
|
| pip install --root-user-action=ignore --no-cache-dir 'huggingface_hub>=0.30'
|
| python {code_mount}/scripts/ingest_hub_to_bucket.py \\
|
| --bucket '{args.bucket}' \\
|
| --source '{args.source}' \\
|
| --dest-prefix '{args.dest_prefix}' \\
|
| --bucket-mount '{bucket_mount}'{extract_flag}{max_tars}{out_prefix}{extract_beside}
|
| """
|
|
|
| secrets = None if args.no_secrets else _secrets_for_job()
|
| if secrets is None and not args.no_secrets:
|
| print("[push_cpu_ingest] 警告: 无 HF_TOKEN,gated 数据会失败。", file=sys.stderr)
|
|
|
| if not args.skip_create_bucket:
|
| create_bucket(args.bucket, exist_ok=True)
|
| print(f"[push_cpu_ingest] bucket 已确保存在(或已存在): {args.bucket}")
|
|
|
| volumes = [
|
| Volume(type=args.code_type, source=args.code_repo, mount_path=code_mount),
|
| Volume(type="bucket", source=args.bucket, mount_path=bucket_mount),
|
| ]
|
|
|
| api = HfApi()
|
| job = api.run_job(
|
| image=args.image,
|
| command=["bash", "-lc", script],
|
| flavor="cpu-basic",
|
| volumes=volumes,
|
| secrets=secrets,
|
| timeout=args.timeout,
|
| )
|
| print(f"[push_cpu_ingest] Job ID: {job.id}")
|
| print(f"[push_cpu_ingest] URL: {job.url}")
|
|
|
| if args.follow:
|
| for line in api.fetch_job_logs(job_id=job.id, namespace=job.owner.name, follow=True):
|
| print(line, end="" if str(line).endswith("\n") else "\n")
|
|
|
|
|
| if __name__ == "__main__":
|
| main()
|
|
|