File size: 5,619 Bytes
196e6e0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
"""提交 **CPU Basic** Job:把 Hub 上 Cosmos(或其它源)服务端复制到你的 Bucket,并尝试解压 tar。



- 计费:见 https://huggingface.co/docs/hub/jobs-pricing(CPU Basic 约 \\$0.01/ 小时量级,以官网为准)。

- 默认挂载:代码 ``fuzirui/WJAD``、可写 Bucket;超时默认 48h(大仓库复制可能很久)。

- 须 ``hf auth login``;NVIDIA 数据集须在网页接受条款。



用法::



    python scripts/push_cpu_ingest_job.py --bucket fuzirui/wjad-cosmos-data

    python scripts/push_cpu_ingest_job.py --bucket fuzirui/wjad-cosmos-data --follow

    python scripts/push_cpu_ingest_job.py --bucket fuzirui/x --source 'hf://datasets/foo/bar/'

"""

from __future__ import annotations

import argparse
import os
import sys

from huggingface_hub import HfApi, Volume, create_bucket

try:
    from huggingface_hub.cli._cli_utils import parse_env_map
except Exception:  # pragma: no cover
    parse_env_map = None

DEFAULT_CODE_REPO = "fuzirui/WJAD"
DEFAULT_BUCKET = "fuzirui/WJAD"
DEFAULT_SOURCE = "hf://datasets/nvidia/PhysicalAI-Autonomous-Vehicle-Cosmos-Drive-Dreams/"
DEFAULT_DEST_PREFIX = "cosmos_hub_mirror"
DEFAULT_IMAGE = "python:3.12"
DEFAULT_TIMEOUT = "7d"


def _secrets_for_job() -> dict | None:
    if parse_env_map is not None:
        try:
            m = parse_env_map(["HF_TOKEN"])
            if m.get("HF_TOKEN"):
                return m
        except Exception:
            pass
    t = os.environ.get("HF_TOKEN") or os.environ.get("HUGGING_FACE_HUB_TOKEN")
    return {"HF_TOKEN": t} if t else None


def main() -> None:
    parser = argparse.ArgumentParser(description="HF Jobs:CPU 拉取 Hub → Bucket + 解压")
    parser.add_argument("--bucket", default=DEFAULT_BUCKET, help="目标 Storage Bucket id(须已创建或加 --ensure-bucket)")
    parser.add_argument("--code-repo", default=DEFAULT_CODE_REPO, help="含 ingest 脚本的 Hub model/space id")
    parser.add_argument("--code-type", default="model", choices=("model", "space", "dataset"))
    parser.add_argument("--source", default=DEFAULT_SOURCE, help="hf:// 源目录")
    parser.add_argument("--dest-prefix", default=DEFAULT_DEST_PREFIX, help="bucket 内子路径")
    parser.add_argument(
        "--skip-create-bucket",
        action="store_true",
        help="不在本机预先 create_bucket(bucket 必须已存在,否则挂载失败)",
    )
    parser.add_argument(
        "--no-extract",
        action="store_true",
        help="只做 copy_files,不解压 tar",
    )
    parser.add_argument(
        "--max-tars",
        type=int,
        default=None,
        help="传给 ingest_hub_to_bucket.py --max-tars",
    )
    parser.add_argument(
        "--extract-out-prefix",
        default=None,
        metavar="NAME",
        help="解压输出子路径(默认 {dest-prefix}_unpacked)",
    )
    parser.add_argument(
        "--extract-beside-tar",
        action="store_true",
        help="旧行为:在每条 tar 旁解压为 _extracted",
    )
    parser.add_argument("--image", default=DEFAULT_IMAGE)
    parser.add_argument("--timeout", default=DEFAULT_TIMEOUT)
    parser.add_argument("--follow", action="store_true")
    parser.add_argument("--no-secrets", action="store_true")
    args = parser.parse_args()

    bucket_mount = "/mnt/cosmos"
    code_mount = "/workspace"

    max_tars = ""
    if args.max_tars is not None:
        max_tars = f" --max-tars {args.max_tars}"

    extract_flag = "" if args.no_extract else " --extract-tars"

    extract_beside = " --extract-beside-tar" if args.extract_beside_tar else ""
    out_prefix = ""
    if args.extract_out_prefix:
        out_prefix = f" --extract-out-prefix '{args.extract_out_prefix}'"

    script = f"""set -euo pipefail

Eph="{bucket_mount}/.wjad_ephemeral"

mkdir -p "$Eph/tmp" "$Eph/hf_home/hub" "$Eph/xdg_cache"

export TMPDIR="$Eph/tmp"

export TMP="$TMPDIR" TEMP="$TMPDIR"

export HF_HOME="$Eph/hf_home"

export HF_HUB_CACHE="$HF_HOME/hub"

export XDG_CACHE_HOME="$Eph/xdg_cache"

pip install --root-user-action=ignore --no-cache-dir 'huggingface_hub>=0.30'

python {code_mount}/scripts/ingest_hub_to_bucket.py \\

  --bucket '{args.bucket}' \\

  --source '{args.source}' \\

  --dest-prefix '{args.dest_prefix}' \\

  --bucket-mount '{bucket_mount}'{extract_flag}{max_tars}{out_prefix}{extract_beside}

"""

    secrets = None if args.no_secrets else _secrets_for_job()
    if secrets is None and not args.no_secrets:
        print("[push_cpu_ingest] 警告: 无 HF_TOKEN,gated 数据会失败。", file=sys.stderr)

    if not args.skip_create_bucket:
        create_bucket(args.bucket, exist_ok=True)
        print(f"[push_cpu_ingest] bucket 已确保存在(或已存在): {args.bucket}")

    volumes = [
        Volume(type=args.code_type, source=args.code_repo, mount_path=code_mount),
        Volume(type="bucket", source=args.bucket, mount_path=bucket_mount),
    ]

    api = HfApi()
    job = api.run_job(
        image=args.image,
        command=["bash", "-lc", script],
        flavor="cpu-basic",
        volumes=volumes,
        secrets=secrets,
        timeout=args.timeout,
    )
    print(f"[push_cpu_ingest] Job ID: {job.id}")
    print(f"[push_cpu_ingest] URL:    {job.url}")

    if args.follow:
        for line in api.fetch_job_logs(job_id=job.id, namespace=job.owner.name, follow=True):
            print(line, end="" if str(line).endswith("\n") else "\n")


if __name__ == "__main__":
    main()