WJAD / scripts /push_cpu_ingest_job.py
fuzirui's picture
Upload folder using huggingface_hub
196e6e0 verified
"""提交 **CPU Basic** Job:把 Hub 上 Cosmos(或其它源)服务端复制到你的 Bucket,并尝试解压 tar。
- 计费:见 https://huggingface.co/docs/hub/jobs-pricing(CPU Basic 约 \\$0.01/ 小时量级,以官网为准)。
- 默认挂载:代码 ``fuzirui/WJAD``、可写 Bucket;超时默认 48h(大仓库复制可能很久)。
- 须 ``hf auth login``;NVIDIA 数据集须在网页接受条款。
用法::
python scripts/push_cpu_ingest_job.py --bucket fuzirui/wjad-cosmos-data
python scripts/push_cpu_ingest_job.py --bucket fuzirui/wjad-cosmos-data --follow
python scripts/push_cpu_ingest_job.py --bucket fuzirui/x --source 'hf://datasets/foo/bar/'
"""
from __future__ import annotations
import argparse
import os
import sys
from huggingface_hub import HfApi, Volume, create_bucket
try:
from huggingface_hub.cli._cli_utils import parse_env_map
except Exception: # pragma: no cover
parse_env_map = None
DEFAULT_CODE_REPO = "fuzirui/WJAD"
DEFAULT_BUCKET = "fuzirui/WJAD"
DEFAULT_SOURCE = "hf://datasets/nvidia/PhysicalAI-Autonomous-Vehicle-Cosmos-Drive-Dreams/"
DEFAULT_DEST_PREFIX = "cosmos_hub_mirror"
DEFAULT_IMAGE = "python:3.12"
DEFAULT_TIMEOUT = "7d"
def _secrets_for_job() -> dict | None:
if parse_env_map is not None:
try:
m = parse_env_map(["HF_TOKEN"])
if m.get("HF_TOKEN"):
return m
except Exception:
pass
t = os.environ.get("HF_TOKEN") or os.environ.get("HUGGING_FACE_HUB_TOKEN")
return {"HF_TOKEN": t} if t else None
def main() -> None:
parser = argparse.ArgumentParser(description="HF Jobs:CPU 拉取 Hub → Bucket + 解压")
parser.add_argument("--bucket", default=DEFAULT_BUCKET, help="目标 Storage Bucket id(须已创建或加 --ensure-bucket)")
parser.add_argument("--code-repo", default=DEFAULT_CODE_REPO, help="含 ingest 脚本的 Hub model/space id")
parser.add_argument("--code-type", default="model", choices=("model", "space", "dataset"))
parser.add_argument("--source", default=DEFAULT_SOURCE, help="hf:// 源目录")
parser.add_argument("--dest-prefix", default=DEFAULT_DEST_PREFIX, help="bucket 内子路径")
parser.add_argument(
"--skip-create-bucket",
action="store_true",
help="不在本机预先 create_bucket(bucket 必须已存在,否则挂载失败)",
)
parser.add_argument(
"--no-extract",
action="store_true",
help="只做 copy_files,不解压 tar",
)
parser.add_argument(
"--max-tars",
type=int,
default=None,
help="传给 ingest_hub_to_bucket.py --max-tars",
)
parser.add_argument(
"--extract-out-prefix",
default=None,
metavar="NAME",
help="解压输出子路径(默认 {dest-prefix}_unpacked)",
)
parser.add_argument(
"--extract-beside-tar",
action="store_true",
help="旧行为:在每条 tar 旁解压为 _extracted",
)
parser.add_argument("--image", default=DEFAULT_IMAGE)
parser.add_argument("--timeout", default=DEFAULT_TIMEOUT)
parser.add_argument("--follow", action="store_true")
parser.add_argument("--no-secrets", action="store_true")
args = parser.parse_args()
bucket_mount = "/mnt/cosmos"
code_mount = "/workspace"
max_tars = ""
if args.max_tars is not None:
max_tars = f" --max-tars {args.max_tars}"
extract_flag = "" if args.no_extract else " --extract-tars"
extract_beside = " --extract-beside-tar" if args.extract_beside_tar else ""
out_prefix = ""
if args.extract_out_prefix:
out_prefix = f" --extract-out-prefix '{args.extract_out_prefix}'"
script = f"""set -euo pipefail
Eph="{bucket_mount}/.wjad_ephemeral"
mkdir -p "$Eph/tmp" "$Eph/hf_home/hub" "$Eph/xdg_cache"
export TMPDIR="$Eph/tmp"
export TMP="$TMPDIR" TEMP="$TMPDIR"
export HF_HOME="$Eph/hf_home"
export HF_HUB_CACHE="$HF_HOME/hub"
export XDG_CACHE_HOME="$Eph/xdg_cache"
pip install --root-user-action=ignore --no-cache-dir 'huggingface_hub>=0.30'
python {code_mount}/scripts/ingest_hub_to_bucket.py \\
--bucket '{args.bucket}' \\
--source '{args.source}' \\
--dest-prefix '{args.dest_prefix}' \\
--bucket-mount '{bucket_mount}'{extract_flag}{max_tars}{out_prefix}{extract_beside}
"""
secrets = None if args.no_secrets else _secrets_for_job()
if secrets is None and not args.no_secrets:
print("[push_cpu_ingest] 警告: 无 HF_TOKEN,gated 数据会失败。", file=sys.stderr)
if not args.skip_create_bucket:
create_bucket(args.bucket, exist_ok=True)
print(f"[push_cpu_ingest] bucket 已确保存在(或已存在): {args.bucket}")
volumes = [
Volume(type=args.code_type, source=args.code_repo, mount_path=code_mount),
Volume(type="bucket", source=args.bucket, mount_path=bucket_mount),
]
api = HfApi()
job = api.run_job(
image=args.image,
command=["bash", "-lc", script],
flavor="cpu-basic",
volumes=volumes,
secrets=secrets,
timeout=args.timeout,
)
print(f"[push_cpu_ingest] Job ID: {job.id}")
print(f"[push_cpu_ingest] URL: {job.url}")
if args.follow:
for line in api.fetch_job_logs(job_id=job.id, namespace=job.owner.name, follow=True):
print(line, end="" if str(line).endswith("\n") else "\n")
if __name__ == "__main__":
main()