| |
| """ |
| data_label_factory.runpod β orchestration CLI for the optional GPU path. |
| |
| Provisions a RunPod GPU pod, runs the data_label_factory pipeline on it, |
| pulls results back, and (optionally) publishes the labeled dataset to |
| Hugging Face β all in one place. |
| |
| Subcommands |
| ----------- |
| up Provision a GPU pod |
| push Copy a project YAML + image manifest to the pod |
| run Execute a shell command on the pod |
| pull Download an experiment dir from the pod |
| publish Push a labeled experiment to a Hugging Face dataset repo |
| down Destroy the pod |
| pipeline One-shot: up β push β run β pull β publish β down |
| |
| build Build the worker Docker image |
| serverless Manage RunPod serverless endpoints (create / test / destroy) |
| |
| Usage |
| ----- |
| export RUNPOD_API_KEY=rpa_xxxxxxxxxx |
| python3 -m data_label_factory.runpod pipeline \\ |
| --project projects/drones.yaml \\ |
| --gpu L40S \\ |
| --publish-to waltgrace/my-drone-dataset |
| |
| See README.md in this folder for architecture, costs, and trade-offs. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import os |
| import shlex |
| import subprocess |
| import sys |
| import time |
| from dataclasses import dataclass |
| from pathlib import Path |
| from typing import Any, Optional |
|
|
| |
| |
| |
| |
| |
|
|
| def _runpod_sdk(): |
| try: |
| import runpod |
| except ImportError: |
| raise SystemExit( |
| "the `runpod` package is not installed. install it with:\n" |
| " pip install -e \".[runpod]\"\n" |
| "or directly: pip install runpod" |
| ) |
| api_key = os.environ.get("RUNPOD_API_KEY") |
| if not api_key: |
| raise SystemExit( |
| "RUNPOD_API_KEY is not set in your environment. get one at\n" |
| " https://runpod.io/console/user/settings\n" |
| "then: export RUNPOD_API_KEY=rpa_xxxxxxxxxx" |
| ) |
| runpod.api_key = api_key |
| return runpod |
|
|
|
|
| def _hf_api(token: Optional[str] = None): |
| try: |
| from huggingface_hub import HfApi |
| except ImportError: |
| raise SystemExit( |
| "the `huggingface_hub` package is not installed. install it with:\n" |
| " pip install -e \".[runpod]\"\n" |
| "or directly: pip install huggingface_hub" |
| ) |
| return HfApi(token=token or os.environ.get("HF_TOKEN")) |
|
|
|
|
| |
| |
| |
| STATE_DIR = Path(os.path.expanduser("~/.data-label-factory")) |
| STATE_FILE = STATE_DIR / "active-pod.json" |
|
|
|
|
| def _save_state(pod_info: dict) -> None: |
| STATE_DIR.mkdir(parents=True, exist_ok=True) |
| STATE_FILE.write_text(json.dumps(pod_info, indent=2)) |
|
|
|
|
| def _load_state() -> Optional[dict]: |
| if not STATE_FILE.exists(): |
| return None |
| try: |
| return json.loads(STATE_FILE.read_text()) |
| except Exception: |
| return None |
|
|
|
|
| def _clear_state() -> None: |
| if STATE_FILE.exists(): |
| STATE_FILE.unlink() |
|
|
|
|
| |
| |
| |
|
|
| def _ssh_command(pod: dict) -> list[str]: |
| """Build the ssh command list for a given pod's runtime info.""" |
| host = pod.get("ssh_host") |
| user = pod.get("ssh_user", "root") |
| port = pod.get("ssh_port", 22) |
| if not host: |
| raise SystemExit( |
| "pod has no SSH host yet β wait a few seconds for it to finish booting " |
| "and run `python3 -m data_label_factory.runpod up --refresh` to update." |
| ) |
| return [ |
| "ssh", "-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null", |
| "-p", str(port), f"{user}@{host}", |
| ] |
|
|
|
|
| def _scp_command(pod: dict) -> list[str]: |
| host = pod.get("ssh_host") |
| user = pod.get("ssh_user", "root") |
| port = pod.get("ssh_port", 22) |
| return [ |
| "scp", "-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null", |
| "-P", str(port), |
| ] |
|
|
|
|
| def _run_remote(pod: dict, command: str, *, capture: bool = False) -> subprocess.CompletedProcess: |
| full = _ssh_command(pod) + [command] |
| print(f"$ {shlex.join(full)}", file=sys.stderr) |
| return subprocess.run(full, capture_output=capture, text=capture) |
|
|
|
|
| |
| |
| |
|
|
| def cmd_up(args): |
| """Provision a GPU pod.""" |
| runpod = _runpod_sdk() |
|
|
| print(f"provisioning {args.gpu} pod ({args.gpu_count}x) using image {args.image}β¦") |
| pod = runpod.create_pod( |
| name=args.name, |
| image_name=args.image, |
| gpu_type_id=args.gpu, |
| gpu_count=args.gpu_count, |
| cloud_type=args.cloud, |
| volume_in_gb=args.disk_gb, |
| container_disk_in_gb=20, |
| ports="22/tcp,8000/http", |
| env={ |
| "HF_TOKEN": os.environ.get("HF_TOKEN", ""), |
| "QWEN_URL": os.environ.get("QWEN_URL", ""), |
| "GEMMA_URL": os.environ.get("GEMMA_URL", ""), |
| }, |
| volume_mount_path="/workspace", |
| network_volume_id=args.network_volume, |
| ) |
| pod_id = pod["id"] |
| print(f" pod id: {pod_id}") |
| print(f" waiting for pod to become ready (may take 1-5 minutes)β¦") |
|
|
| |
| info = {} |
| for attempt in range(60): |
| try: |
| full = runpod.get_pod(pod_id) |
| except Exception as e: |
| print(f" poll {attempt}: {e}") |
| time.sleep(5) |
| continue |
| runtime = full.get("runtime") or {} |
| ports = runtime.get("ports") or [] |
| ssh_port_info = next((p for p in ports if p.get("privatePort") == 22), None) |
| if ssh_port_info and ssh_port_info.get("publicPort"): |
| info = { |
| "pod_id": pod_id, |
| "ssh_host": ssh_port_info["ip"], |
| "ssh_port": ssh_port_info["publicPort"], |
| "ssh_user": "root", |
| "gpu": args.gpu, |
| "image": args.image, |
| "started": time.time(), |
| } |
| break |
| time.sleep(5) |
| else: |
| raise SystemExit("timed out waiting for pod to expose SSH") |
|
|
| _save_state(info) |
| print(f" ready: ssh -p {info['ssh_port']} root@{info['ssh_host']}") |
| print(f" state cached at {STATE_FILE}") |
|
|
|
|
| def cmd_push(args): |
| """Copy a project YAML (and optionally a local image dir) to the pod.""" |
| pod = _load_state() |
| if not pod: |
| raise SystemExit("no active pod. run `up` first.") |
|
|
| project_path = Path(args.project) |
| if not project_path.exists(): |
| raise SystemExit(f"project not found: {project_path}") |
|
|
| |
| _run_remote(pod, "mkdir -p /workspace/projects /workspace/data /workspace/experiments") |
|
|
| |
| scp = _scp_command(pod) |
| dest = f"root@{pod['ssh_host']}:/workspace/projects/{project_path.name}" |
| print(f"$ scp {project_path} β {dest}") |
| subprocess.run(scp + [str(project_path), dest], check=True) |
|
|
| |
| if args.images: |
| images_dir = Path(args.images).expanduser() |
| if not images_dir.exists(): |
| raise SystemExit(f"images dir not found: {images_dir}") |
| print(f"$ rsync -avz {images_dir}/ β root@{pod['ssh_host']}:/workspace/data/") |
| rsync_cmd = [ |
| "rsync", "-avz", |
| "-e", f"ssh -p {pod['ssh_port']} -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null", |
| f"{images_dir}/", |
| f"root@{pod['ssh_host']}:/workspace/data/", |
| ] |
| subprocess.run(rsync_cmd, check=True) |
|
|
| print(f" pushed {project_path.name} to /workspace/projects/") |
|
|
|
|
| def cmd_run(args): |
| """Run an arbitrary command on the pod.""" |
| pod = _load_state() |
| if not pod: |
| raise SystemExit("no active pod. run `up` first.") |
| cmd = args.command or "data_label_factory --help" |
| result = _run_remote(pod, f"cd /workspace && {cmd}") |
| sys.exit(result.returncode) |
|
|
|
|
| def cmd_pull(args): |
| """Download an experiment directory from the pod back to your machine.""" |
| pod = _load_state() |
| if not pod: |
| raise SystemExit("no active pod. run `up` first.") |
| name = args.experiment |
| local_target = Path(args.out).expanduser() |
| local_target.mkdir(parents=True, exist_ok=True) |
|
|
| rsync_cmd = [ |
| "rsync", "-avz", |
| "-e", f"ssh -p {pod['ssh_port']} -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null", |
| f"root@{pod['ssh_host']}:/workspace/experiments/{name}/", |
| str(local_target / name) + "/", |
| ] |
| print(f"$ {shlex.join(rsync_cmd)}") |
| subprocess.run(rsync_cmd, check=True) |
| print(f" pulled {name} β {local_target / name}") |
|
|
|
|
| def cmd_publish(args): |
| """Push a labeled experiment to a Hugging Face dataset repo. |
| Reuses the same Parquet builder as the local path so the schema matches |
| the reference dataset at waltgrace/fiber-optic-drones. |
| """ |
| api = _hf_api(token=args.hf_token) |
| exp_dir = Path(args.experiment).expanduser() |
| if not exp_dir.exists(): |
| raise SystemExit(f"experiment dir not found: {exp_dir}") |
|
|
| |
| coco = _find_one(exp_dir, "*.coco.json", "label_falcon") |
| verified = _find_one(exp_dir, "*verified*.json", "verify_qwen") |
| if not coco: |
| raise SystemExit( |
| f"no COCO JSON found in {exp_dir}. did the label stage finish?" |
| ) |
|
|
| print(f" COCO: {coco}") |
| print(f" verified: {verified or '(none β using raw Falcon labels)'}") |
|
|
| |
| from .builder import build_parquet_from_experiment |
| parquet_path = build_parquet_from_experiment(coco, verified, exp_dir / "data.parquet") |
| print(f" built parquet: {parquet_path} ({parquet_path.stat().st_size / 1024 / 1024:.1f} MB)") |
|
|
| repo_id = args.to |
| print(f" creating HF dataset repo: {repo_id}") |
| from huggingface_hub import create_repo |
| create_repo(repo_id, repo_type="dataset", private=args.private, exist_ok=True, |
| token=os.environ.get("HF_TOKEN")) |
|
|
| print(f" uploading parquetβ¦") |
| api.upload_file( |
| repo_id=repo_id, |
| repo_type="dataset", |
| path_or_fileobj=str(parquet_path), |
| path_in_repo="data.parquet", |
| commit_message=f"add data.parquet from {exp_dir.name}", |
| ) |
|
|
| |
| if args.card: |
| print(f" uploading dataset cardβ¦") |
| api.upload_file( |
| repo_id=repo_id, repo_type="dataset", |
| path_or_fileobj=args.card, path_in_repo="README.md", |
| commit_message="add dataset card", |
| ) |
|
|
| print(f" β published β https://huggingface.co/datasets/{repo_id}") |
|
|
|
|
| def cmd_down(args): |
| """Destroy the active pod.""" |
| runpod = _runpod_sdk() |
| pod = _load_state() |
| if not pod: |
| raise SystemExit("no active pod to destroy.") |
| pod_id = pod["pod_id"] |
| print(f"terminating pod {pod_id}β¦") |
| runpod.terminate_pod(pod_id) |
| _clear_state() |
| print(f" done. uptime: {(time.time() - pod.get('started', time.time())) / 60:.1f} minutes") |
|
|
|
|
| def cmd_pipeline(args): |
| """One-shot: up β push β run β pull β publish β down.""" |
| print("=" * 60) |
| print("ONE-SHOT RUNPOD PIPELINE") |
| print("=" * 60) |
|
|
| cmd_up(args) |
| cmd_push(args) |
|
|
| |
| project_name = Path(args.project).name |
| args.command = ( |
| f"data_label_factory pipeline --project /workspace/projects/{project_name} " |
| f"--max-per-query {args.max_per_query} --backend qwen" |
| ) |
| cmd_run(args) |
|
|
| |
| args.experiment = "latest" |
| args.out = str(Path.cwd() / "experiments") |
| cmd_pull(args) |
|
|
| if args.publish_to: |
| args.experiment = str(Path(args.out) / "latest") |
| args.to = args.publish_to |
| args.card = None |
| args.private = False |
| args.hf_token = os.environ.get("HF_TOKEN") |
| cmd_publish(args) |
|
|
| if not args.keep_pod: |
| cmd_down(args) |
| else: |
| print("--keep-pod set; pod left running. terminate with `down` when done.") |
|
|
|
|
| def cmd_build(args): |
| """Build the worker Docker image (and optionally push to a registry).""" |
| here = Path(__file__).parent |
| dockerfile = here / "Dockerfile" |
| if not dockerfile.exists(): |
| raise SystemExit(f"Dockerfile not found at {dockerfile}") |
| |
| |
| repo_root = here.parent.parent |
| if not (repo_root / "pyproject.toml").exists(): |
| raise SystemExit(f"expected pyproject.toml at {repo_root}; build context resolution failed") |
| print(f"building image {args.tag}") |
| print(f" context: {repo_root}") |
| print(f" dockerfile: {dockerfile}") |
| subprocess.run( |
| ["docker", "build", "-t", args.tag, "-f", str(dockerfile), str(repo_root)], |
| check=True, |
| ) |
| if args.push: |
| print(f"pushing {args.tag}β¦") |
| subprocess.run(["docker", "push", args.tag], check=True) |
| print("done.") |
|
|
|
|
| def cmd_serverless(args): |
| """Manage RunPod serverless endpoints.""" |
| runpod = _runpod_sdk() |
| if args.serverless_action == "create": |
| print(f"creating serverless endpoint for image {args.image}β¦") |
| endpoint = runpod.create_endpoint( |
| name=args.name, |
| template_id=None, |
| gpu_ids=args.gpu, |
| workers_min=args.workers_min, |
| workers_max=args.workers_max, |
| idle_timeout=args.idle_timeout, |
| execution_timeout_ms=300_000, |
| ) |
| print(f" β endpoint id: {endpoint.get('id')}") |
| print(f" url: https://api.runpod.ai/v2/{endpoint.get('id')}/runsync") |
| elif args.serverless_action == "test": |
| endpoint_id = args.endpoint |
| if not endpoint_id: |
| raise SystemExit("--endpoint <id> required") |
| import base64, urllib.request, json as _json |
| with open(args.image_path, "rb") as f: |
| b64 = base64.b64encode(f.read()).decode() |
| payload = {"input": {"image_base64": b64, "query": args.query, "task": "segmentation"}} |
| url = f"https://api.runpod.ai/v2/{endpoint_id}/runsync" |
| req = urllib.request.Request( |
| url, |
| data=_json.dumps(payload).encode(), |
| headers={ |
| "Content-Type": "application/json", |
| "Authorization": f"Bearer {os.environ['RUNPOD_API_KEY']}", |
| }, |
| ) |
| with urllib.request.urlopen(req, timeout=300) as r: |
| print(_json.dumps(_json.loads(r.read()), indent=2)) |
| elif args.serverless_action == "destroy": |
| endpoint_id = args.endpoint |
| runpod.delete_endpoint(endpoint_id) |
| print(f" destroyed {endpoint_id}") |
| else: |
| raise SystemExit(f"unknown serverless action: {args.serverless_action}") |
|
|
|
|
| |
|
|
| def _find_one(root: Path, pattern: str, hint_subdir: str = "") -> Optional[Path]: |
| """Find one matching file under root. Prefer files inside hint_subdir.""" |
| if hint_subdir: |
| hint = root / hint_subdir |
| if hint.exists(): |
| for p in hint.rglob(pattern): |
| return p |
| for p in root.rglob(pattern): |
| return p |
| return None |
|
|
|
|
| |
| |
| |
|
|
| def main(): |
| p = argparse.ArgumentParser( |
| prog="python3 -m data_label_factory.runpod", |
| description=( |
| "Optional GPU path for data-label-factory via RunPod. " |
| "Provision a pod, run the pipeline, pull results, optionally " |
| "publish to Hugging Face. See README.md in this folder for the " |
| "full architecture and cost notes." |
| ), |
| formatter_class=argparse.RawDescriptionHelpFormatter, |
| ) |
| sub = p.add_subparsers(dest="command", required=True) |
|
|
| def add_up_args(parser): |
| parser.add_argument("--gpu", default="L40S", help="GPU type (e.g. L40S, RTX_A4000, RTX_4090)") |
| parser.add_argument("--gpu-count", type=int, default=1) |
| parser.add_argument("--cloud", choices=["SECURE", "COMMUNITY"], default="COMMUNITY", |
| help="COMMUNITY is cheaper but slower to start") |
| parser.add_argument("--name", default="dlf-pod") |
| parser.add_argument("--image", default="walter-grace/data-label-factory-worker:latest", |
| help="Docker image to run on the pod") |
| parser.add_argument("--disk-gb", type=int, default=50) |
| parser.add_argument("--network-volume", default=None, |
| help="Optional persistent volume id (use for repeated runs)") |
|
|
| sup = sub.add_parser("up", help="Provision a GPU pod") |
| add_up_args(sup) |
|
|
| spu = sub.add_parser("push", help="Copy project YAML + images to pod") |
| spu.add_argument("--project", required=True, help="Path to a project YAML") |
| spu.add_argument("--images", default=None, help="Optional local image dir to rsync") |
|
|
| sr = sub.add_parser("run", help="Run a shell command on the pod") |
| sr.add_argument("--command", default=None, help="Shell command (defaults to `data_label_factory --help`)") |
|
|
| spl = sub.add_parser("pull", help="Download an experiment from the pod") |
| spl.add_argument("--experiment", default="latest", help="Experiment dir name (or 'latest')") |
| spl.add_argument("--out", default="./experiments", help="Local destination") |
|
|
| spb = sub.add_parser("publish", help="Publish an experiment to a HF dataset repo") |
| spb.add_argument("--experiment", required=True, help="Local experiment dir to publish") |
| spb.add_argument("--to", required=True, help="HF repo id, e.g. waltgrace/my-dataset") |
| spb.add_argument("--private", action="store_true") |
| spb.add_argument("--hf-token", default=None) |
| spb.add_argument("--card", default=None, help="Path to a README.md to use as the dataset card") |
|
|
| sd = sub.add_parser("down", help="Destroy the active pod") |
|
|
| spi = sub.add_parser("pipeline", help="One-shot: up β push β run β pull β publish β down") |
| add_up_args(spi) |
| spi.add_argument("--project", required=True) |
| spi.add_argument("--images", default=None) |
| spi.add_argument("--max-per-query", type=int, default=30) |
| spi.add_argument("--publish-to", default=None, help="HF repo id to publish results to") |
| spi.add_argument("--keep-pod", action="store_true", |
| help="Don't destroy the pod when done (you'll be billed for idle time)") |
|
|
| sb = sub.add_parser("build", help="Build the worker Docker image") |
| sb.add_argument("--tag", default="walter-grace/data-label-factory-worker:latest") |
| sb.add_argument("--push", action="store_true", help="Push to registry after building") |
|
|
| ss = sub.add_parser("serverless", help="Manage RunPod serverless endpoints") |
| ss_sub = ss.add_subparsers(dest="serverless_action", required=True) |
| ss_create = ss_sub.add_parser("create") |
| ss_create.add_argument("--image", required=True) |
| ss_create.add_argument("--name", default="dlf-serverless") |
| ss_create.add_argument("--gpu", default="NVIDIA RTX A4000") |
| ss_create.add_argument("--workers-min", type=int, default=0) |
| ss_create.add_argument("--workers-max", type=int, default=3) |
| ss_create.add_argument("--idle-timeout", type=int, default=5) |
| ss_test = ss_sub.add_parser("test") |
| ss_test.add_argument("--endpoint", required=True) |
| ss_test.add_argument("--image-path", required=True) |
| ss_test.add_argument("--query", required=True) |
| ss_destroy = ss_sub.add_parser("destroy") |
| ss_destroy.add_argument("--endpoint", required=True) |
|
|
| args = p.parse_args() |
| handlers = { |
| "up": cmd_up, |
| "push": cmd_push, |
| "run": cmd_run, |
| "pull": cmd_pull, |
| "publish": cmd_publish, |
| "down": cmd_down, |
| "pipeline": cmd_pipeline, |
| "build": cmd_build, |
| "serverless": cmd_serverless, |
| } |
| handler = handlers.get(args.command) |
| if handler is None: |
| p.print_help() |
| sys.exit(1) |
| handler(args) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|