"""提交 **CPU Basic** Job:把 Hub 上 Cosmos(或其它源)服务端复制到你的 Bucket,并尝试解压 tar。 - 计费:见 https://huggingface.co/docs/hub/jobs-pricing(CPU Basic 约 \\$0.01/ 小时量级,以官网为准)。 - 默认挂载:代码 ``fuzirui/WJAD``、可写 Bucket;超时默认 48h(大仓库复制可能很久)。 - 须 ``hf auth login``;NVIDIA 数据集须在网页接受条款。 用法:: python scripts/push_cpu_ingest_job.py --bucket fuzirui/wjad-cosmos-data python scripts/push_cpu_ingest_job.py --bucket fuzirui/wjad-cosmos-data --follow python scripts/push_cpu_ingest_job.py --bucket fuzirui/x --source 'hf://datasets/foo/bar/' """ from __future__ import annotations import argparse import os import sys from huggingface_hub import HfApi, Volume, create_bucket try: from huggingface_hub.cli._cli_utils import parse_env_map except Exception: # pragma: no cover parse_env_map = None DEFAULT_CODE_REPO = "fuzirui/WJAD" DEFAULT_BUCKET = "fuzirui/WJAD" DEFAULT_SOURCE = "hf://datasets/nvidia/PhysicalAI-Autonomous-Vehicle-Cosmos-Drive-Dreams/" DEFAULT_DEST_PREFIX = "cosmos_hub_mirror" DEFAULT_IMAGE = "python:3.12" DEFAULT_TIMEOUT = "7d" def _secrets_for_job() -> dict | None: if parse_env_map is not None: try: m = parse_env_map(["HF_TOKEN"]) if m.get("HF_TOKEN"): return m except Exception: pass t = os.environ.get("HF_TOKEN") or os.environ.get("HUGGING_FACE_HUB_TOKEN") return {"HF_TOKEN": t} if t else None def main() -> None: parser = argparse.ArgumentParser(description="HF Jobs:CPU 拉取 Hub → Bucket + 解压") parser.add_argument("--bucket", default=DEFAULT_BUCKET, help="目标 Storage Bucket id(须已创建或加 --ensure-bucket)") parser.add_argument("--code-repo", default=DEFAULT_CODE_REPO, help="含 ingest 脚本的 Hub model/space id") parser.add_argument("--code-type", default="model", choices=("model", "space", "dataset")) parser.add_argument("--source", default=DEFAULT_SOURCE, help="hf:// 源目录") parser.add_argument("--dest-prefix", default=DEFAULT_DEST_PREFIX, help="bucket 内子路径") parser.add_argument( "--skip-create-bucket", action="store_true", help="不在本机预先 create_bucket(bucket 必须已存在,否则挂载失败)", ) parser.add_argument( "--no-extract", action="store_true", help="只做 copy_files,不解压 tar", ) parser.add_argument( "--max-tars", type=int, default=None, help="传给 ingest_hub_to_bucket.py --max-tars", ) parser.add_argument( "--extract-out-prefix", default=None, metavar="NAME", help="解压输出子路径(默认 {dest-prefix}_unpacked)", ) parser.add_argument( "--extract-beside-tar", action="store_true", help="旧行为:在每条 tar 旁解压为 _extracted", ) parser.add_argument("--image", default=DEFAULT_IMAGE) parser.add_argument("--timeout", default=DEFAULT_TIMEOUT) parser.add_argument("--follow", action="store_true") parser.add_argument("--no-secrets", action="store_true") args = parser.parse_args() bucket_mount = "/mnt/cosmos" code_mount = "/workspace" max_tars = "" if args.max_tars is not None: max_tars = f" --max-tars {args.max_tars}" extract_flag = "" if args.no_extract else " --extract-tars" extract_beside = " --extract-beside-tar" if args.extract_beside_tar else "" out_prefix = "" if args.extract_out_prefix: out_prefix = f" --extract-out-prefix '{args.extract_out_prefix}'" script = f"""set -euo pipefail Eph="{bucket_mount}/.wjad_ephemeral" mkdir -p "$Eph/tmp" "$Eph/hf_home/hub" "$Eph/xdg_cache" export TMPDIR="$Eph/tmp" export TMP="$TMPDIR" TEMP="$TMPDIR" export HF_HOME="$Eph/hf_home" export HF_HUB_CACHE="$HF_HOME/hub" export XDG_CACHE_HOME="$Eph/xdg_cache" pip install --root-user-action=ignore --no-cache-dir 'huggingface_hub>=0.30' python {code_mount}/scripts/ingest_hub_to_bucket.py \\ --bucket '{args.bucket}' \\ --source '{args.source}' \\ --dest-prefix '{args.dest_prefix}' \\ --bucket-mount '{bucket_mount}'{extract_flag}{max_tars}{out_prefix}{extract_beside} """ secrets = None if args.no_secrets else _secrets_for_job() if secrets is None and not args.no_secrets: print("[push_cpu_ingest] 警告: 无 HF_TOKEN,gated 数据会失败。", file=sys.stderr) if not args.skip_create_bucket: create_bucket(args.bucket, exist_ok=True) print(f"[push_cpu_ingest] bucket 已确保存在(或已存在): {args.bucket}") volumes = [ Volume(type=args.code_type, source=args.code_repo, mount_path=code_mount), Volume(type="bucket", source=args.bucket, mount_path=bucket_mount), ] api = HfApi() job = api.run_job( image=args.image, command=["bash", "-lc", script], flavor="cpu-basic", volumes=volumes, secrets=secrets, timeout=args.timeout, ) print(f"[push_cpu_ingest] Job ID: {job.id}") print(f"[push_cpu_ingest] URL: {job.url}") if args.follow: for line in api.fetch_job_logs(job_id=job.id, namespace=job.owner.name, follow=True): print(line, end="" if str(line).endswith("\n") else "\n") if __name__ == "__main__": main()