waltgrace's picture
sync: data_label_factory/runpod/cli.py
ef0fc31 verified
#!/usr/bin/env python3
"""
data_label_factory.runpod β€” orchestration CLI for the optional GPU path.
Provisions a RunPod GPU pod, runs the data_label_factory pipeline on it,
pulls results back, and (optionally) publishes the labeled dataset to
Hugging Face β€” all in one place.
Subcommands
-----------
up Provision a GPU pod
push Copy a project YAML + image manifest to the pod
run Execute a shell command on the pod
pull Download an experiment dir from the pod
publish Push a labeled experiment to a Hugging Face dataset repo
down Destroy the pod
pipeline One-shot: up β†’ push β†’ run β†’ pull β†’ publish β†’ down
build Build the worker Docker image
serverless Manage RunPod serverless endpoints (create / test / destroy)
Usage
-----
export RUNPOD_API_KEY=rpa_xxxxxxxxxx
python3 -m data_label_factory.runpod pipeline \\
--project projects/drones.yaml \\
--gpu L40S \\
--publish-to waltgrace/my-drone-dataset
See README.md in this folder for architecture, costs, and trade-offs.
"""
from __future__ import annotations
import argparse
import json
import os
import shlex
import subprocess
import sys
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Optional
# ---------- lazy SDK imports ----------
# We deliberately do NOT import `runpod` or `huggingface_hub` at module load,
# so a user who runs `data_label_factory ...` (without ever touching the
# runpod path) doesn't pay the import cost or get an ImportError if they
# haven't installed the optional deps.
def _runpod_sdk():
try:
import runpod # type: ignore
except ImportError:
raise SystemExit(
"the `runpod` package is not installed. install it with:\n"
" pip install -e \".[runpod]\"\n"
"or directly: pip install runpod"
)
api_key = os.environ.get("RUNPOD_API_KEY")
if not api_key:
raise SystemExit(
"RUNPOD_API_KEY is not set in your environment. get one at\n"
" https://runpod.io/console/user/settings\n"
"then: export RUNPOD_API_KEY=rpa_xxxxxxxxxx"
)
runpod.api_key = api_key
return runpod
def _hf_api(token: Optional[str] = None):
try:
from huggingface_hub import HfApi # type: ignore
except ImportError:
raise SystemExit(
"the `huggingface_hub` package is not installed. install it with:\n"
" pip install -e \".[runpod]\"\n"
"or directly: pip install huggingface_hub"
)
return HfApi(token=token or os.environ.get("HF_TOKEN"))
# ---------- pod state file ----------
# Tracks the currently active pod so subcommands like `push` and `run` can
# find it without the user having to copy/paste pod IDs around.
STATE_DIR = Path(os.path.expanduser("~/.data-label-factory"))
STATE_FILE = STATE_DIR / "active-pod.json"
def _save_state(pod_info: dict) -> None:
STATE_DIR.mkdir(parents=True, exist_ok=True)
STATE_FILE.write_text(json.dumps(pod_info, indent=2))
def _load_state() -> Optional[dict]:
if not STATE_FILE.exists():
return None
try:
return json.loads(STATE_FILE.read_text())
except Exception:
return None
def _clear_state() -> None:
if STATE_FILE.exists():
STATE_FILE.unlink()
# ---------- ssh helpers ----------
# RunPod pods expose SSH via a public proxy. We shell out to `ssh` rather
# than using paramiko so users can rely on their existing ~/.ssh/config.
def _ssh_command(pod: dict) -> list[str]:
"""Build the ssh command list for a given pod's runtime info."""
host = pod.get("ssh_host")
user = pod.get("ssh_user", "root")
port = pod.get("ssh_port", 22)
if not host:
raise SystemExit(
"pod has no SSH host yet β€” wait a few seconds for it to finish booting "
"and run `python3 -m data_label_factory.runpod up --refresh` to update."
)
return [
"ssh", "-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null",
"-p", str(port), f"{user}@{host}",
]
def _scp_command(pod: dict) -> list[str]:
host = pod.get("ssh_host")
user = pod.get("ssh_user", "root")
port = pod.get("ssh_port", 22)
return [
"scp", "-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null",
"-P", str(port),
]
def _run_remote(pod: dict, command: str, *, capture: bool = False) -> subprocess.CompletedProcess:
full = _ssh_command(pod) + [command]
print(f"$ {shlex.join(full)}", file=sys.stderr)
return subprocess.run(full, capture_output=capture, text=capture)
# ============================================================
# SUBCOMMANDS
# ============================================================
def cmd_up(args):
"""Provision a GPU pod."""
runpod = _runpod_sdk()
print(f"provisioning {args.gpu} pod ({args.gpu_count}x) using image {args.image}…")
pod = runpod.create_pod(
name=args.name,
image_name=args.image,
gpu_type_id=args.gpu,
gpu_count=args.gpu_count,
cloud_type=args.cloud,
volume_in_gb=args.disk_gb,
container_disk_in_gb=20,
ports="22/tcp,8000/http",
env={
"HF_TOKEN": os.environ.get("HF_TOKEN", ""),
"QWEN_URL": os.environ.get("QWEN_URL", ""),
"GEMMA_URL": os.environ.get("GEMMA_URL", ""),
},
volume_mount_path="/workspace",
network_volume_id=args.network_volume,
)
pod_id = pod["id"]
print(f" pod id: {pod_id}")
print(f" waiting for pod to become ready (may take 1-5 minutes)…")
# Poll until SSH is reachable
info = {}
for attempt in range(60):
try:
full = runpod.get_pod(pod_id)
except Exception as e:
print(f" poll {attempt}: {e}")
time.sleep(5)
continue
runtime = full.get("runtime") or {}
ports = runtime.get("ports") or []
ssh_port_info = next((p for p in ports if p.get("privatePort") == 22), None)
if ssh_port_info and ssh_port_info.get("publicPort"):
info = {
"pod_id": pod_id,
"ssh_host": ssh_port_info["ip"],
"ssh_port": ssh_port_info["publicPort"],
"ssh_user": "root",
"gpu": args.gpu,
"image": args.image,
"started": time.time(),
}
break
time.sleep(5)
else:
raise SystemExit("timed out waiting for pod to expose SSH")
_save_state(info)
print(f" ready: ssh -p {info['ssh_port']} root@{info['ssh_host']}")
print(f" state cached at {STATE_FILE}")
def cmd_push(args):
"""Copy a project YAML (and optionally a local image dir) to the pod."""
pod = _load_state()
if not pod:
raise SystemExit("no active pod. run `up` first.")
project_path = Path(args.project)
if not project_path.exists():
raise SystemExit(f"project not found: {project_path}")
# 1. Make sure the remote workspace exists
_run_remote(pod, "mkdir -p /workspace/projects /workspace/data /workspace/experiments")
# 2. SCP the project YAML
scp = _scp_command(pod)
dest = f"root@{pod['ssh_host']}:/workspace/projects/{project_path.name}"
print(f"$ scp {project_path} β†’ {dest}")
subprocess.run(scp + [str(project_path), dest], check=True)
# 3. Optionally rsync local image dir
if args.images:
images_dir = Path(args.images).expanduser()
if not images_dir.exists():
raise SystemExit(f"images dir not found: {images_dir}")
print(f"$ rsync -avz {images_dir}/ β†’ root@{pod['ssh_host']}:/workspace/data/")
rsync_cmd = [
"rsync", "-avz",
"-e", f"ssh -p {pod['ssh_port']} -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null",
f"{images_dir}/",
f"root@{pod['ssh_host']}:/workspace/data/",
]
subprocess.run(rsync_cmd, check=True)
print(f" pushed {project_path.name} to /workspace/projects/")
def cmd_run(args):
"""Run an arbitrary command on the pod."""
pod = _load_state()
if not pod:
raise SystemExit("no active pod. run `up` first.")
cmd = args.command or "data_label_factory --help"
result = _run_remote(pod, f"cd /workspace && {cmd}")
sys.exit(result.returncode)
def cmd_pull(args):
"""Download an experiment directory from the pod back to your machine."""
pod = _load_state()
if not pod:
raise SystemExit("no active pod. run `up` first.")
name = args.experiment
local_target = Path(args.out).expanduser()
local_target.mkdir(parents=True, exist_ok=True)
rsync_cmd = [
"rsync", "-avz",
"-e", f"ssh -p {pod['ssh_port']} -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null",
f"root@{pod['ssh_host']}:/workspace/experiments/{name}/",
str(local_target / name) + "/",
]
print(f"$ {shlex.join(rsync_cmd)}")
subprocess.run(rsync_cmd, check=True)
print(f" pulled {name} β†’ {local_target / name}")
def cmd_publish(args):
"""Push a labeled experiment to a Hugging Face dataset repo.
Reuses the same Parquet builder as the local path so the schema matches
the reference dataset at waltgrace/fiber-optic-drones.
"""
api = _hf_api(token=args.hf_token)
exp_dir = Path(args.experiment).expanduser()
if not exp_dir.exists():
raise SystemExit(f"experiment dir not found: {exp_dir}")
# Find the COCO + verified files in the experiment dir
coco = _find_one(exp_dir, "*.coco.json", "label_falcon")
verified = _find_one(exp_dir, "*verified*.json", "verify_qwen")
if not coco:
raise SystemExit(
f"no COCO JSON found in {exp_dir}. did the label stage finish?"
)
print(f" COCO: {coco}")
print(f" verified: {verified or '(none β€” using raw Falcon labels)'}")
# Lazy import β€” pyarrow / datasets is heavy
from .builder import build_parquet_from_experiment
parquet_path = build_parquet_from_experiment(coco, verified, exp_dir / "data.parquet")
print(f" built parquet: {parquet_path} ({parquet_path.stat().st_size / 1024 / 1024:.1f} MB)")
repo_id = args.to
print(f" creating HF dataset repo: {repo_id}")
from huggingface_hub import create_repo
create_repo(repo_id, repo_type="dataset", private=args.private, exist_ok=True,
token=os.environ.get("HF_TOKEN"))
print(f" uploading parquet…")
api.upload_file(
repo_id=repo_id,
repo_type="dataset",
path_or_fileobj=str(parquet_path),
path_in_repo="data.parquet",
commit_message=f"add data.parquet from {exp_dir.name}",
)
# Auto-generate a minimal dataset card if one doesn't exist yet
if args.card:
print(f" uploading dataset card…")
api.upload_file(
repo_id=repo_id, repo_type="dataset",
path_or_fileobj=args.card, path_in_repo="README.md",
commit_message="add dataset card",
)
print(f" βœ“ published β†’ https://huggingface.co/datasets/{repo_id}")
def cmd_down(args):
"""Destroy the active pod."""
runpod = _runpod_sdk()
pod = _load_state()
if not pod:
raise SystemExit("no active pod to destroy.")
pod_id = pod["pod_id"]
print(f"terminating pod {pod_id}…")
runpod.terminate_pod(pod_id)
_clear_state()
print(f" done. uptime: {(time.time() - pod.get('started', time.time())) / 60:.1f} minutes")
def cmd_pipeline(args):
"""One-shot: up β†’ push β†’ run β†’ pull β†’ publish β†’ down."""
print("=" * 60)
print("ONE-SHOT RUNPOD PIPELINE")
print("=" * 60)
cmd_up(args)
cmd_push(args)
# Run the data_label_factory pipeline on the pod
project_name = Path(args.project).name
args.command = (
f"data_label_factory pipeline --project /workspace/projects/{project_name} "
f"--max-per-query {args.max_per_query} --backend qwen"
)
cmd_run(args)
# Pull the latest experiment
args.experiment = "latest"
args.out = str(Path.cwd() / "experiments")
cmd_pull(args)
if args.publish_to:
args.experiment = str(Path(args.out) / "latest")
args.to = args.publish_to
args.card = None
args.private = False
args.hf_token = os.environ.get("HF_TOKEN")
cmd_publish(args)
if not args.keep_pod:
cmd_down(args)
else:
print("--keep-pod set; pod left running. terminate with `down` when done.")
def cmd_build(args):
"""Build the worker Docker image (and optionally push to a registry)."""
here = Path(__file__).parent
dockerfile = here / "Dockerfile"
if not dockerfile.exists():
raise SystemExit(f"Dockerfile not found at {dockerfile}")
# The build context MUST be the repo root because the Dockerfile COPYs
# files from outside the runpod/ folder (pyproject.toml, the package dir, etc.)
repo_root = here.parent.parent # data_label_factory/runpod β†’ repo root
if not (repo_root / "pyproject.toml").exists():
raise SystemExit(f"expected pyproject.toml at {repo_root}; build context resolution failed")
print(f"building image {args.tag}")
print(f" context: {repo_root}")
print(f" dockerfile: {dockerfile}")
subprocess.run(
["docker", "build", "-t", args.tag, "-f", str(dockerfile), str(repo_root)],
check=True,
)
if args.push:
print(f"pushing {args.tag}…")
subprocess.run(["docker", "push", args.tag], check=True)
print("done.")
def cmd_serverless(args):
"""Manage RunPod serverless endpoints."""
runpod = _runpod_sdk()
if args.serverless_action == "create":
print(f"creating serverless endpoint for image {args.image}…")
endpoint = runpod.create_endpoint(
name=args.name,
template_id=None,
gpu_ids=args.gpu,
workers_min=args.workers_min,
workers_max=args.workers_max,
idle_timeout=args.idle_timeout,
execution_timeout_ms=300_000,
)
print(f" βœ“ endpoint id: {endpoint.get('id')}")
print(f" url: https://api.runpod.ai/v2/{endpoint.get('id')}/runsync")
elif args.serverless_action == "test":
endpoint_id = args.endpoint
if not endpoint_id:
raise SystemExit("--endpoint <id> required")
import base64, urllib.request, json as _json
with open(args.image_path, "rb") as f:
b64 = base64.b64encode(f.read()).decode()
payload = {"input": {"image_base64": b64, "query": args.query, "task": "segmentation"}}
url = f"https://api.runpod.ai/v2/{endpoint_id}/runsync"
req = urllib.request.Request(
url,
data=_json.dumps(payload).encode(),
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {os.environ['RUNPOD_API_KEY']}",
},
)
with urllib.request.urlopen(req, timeout=300) as r:
print(_json.dumps(_json.loads(r.read()), indent=2))
elif args.serverless_action == "destroy":
endpoint_id = args.endpoint
runpod.delete_endpoint(endpoint_id)
print(f" destroyed {endpoint_id}")
else:
raise SystemExit(f"unknown serverless action: {args.serverless_action}")
# ---------- helpers ----------
def _find_one(root: Path, pattern: str, hint_subdir: str = "") -> Optional[Path]:
"""Find one matching file under root. Prefer files inside hint_subdir."""
if hint_subdir:
hint = root / hint_subdir
if hint.exists():
for p in hint.rglob(pattern):
return p
for p in root.rglob(pattern):
return p
return None
# ============================================================
# MAIN
# ============================================================
def main():
p = argparse.ArgumentParser(
prog="python3 -m data_label_factory.runpod",
description=(
"Optional GPU path for data-label-factory via RunPod. "
"Provision a pod, run the pipeline, pull results, optionally "
"publish to Hugging Face. See README.md in this folder for the "
"full architecture and cost notes."
),
formatter_class=argparse.RawDescriptionHelpFormatter,
)
sub = p.add_subparsers(dest="command", required=True)
def add_up_args(parser):
parser.add_argument("--gpu", default="L40S", help="GPU type (e.g. L40S, RTX_A4000, RTX_4090)")
parser.add_argument("--gpu-count", type=int, default=1)
parser.add_argument("--cloud", choices=["SECURE", "COMMUNITY"], default="COMMUNITY",
help="COMMUNITY is cheaper but slower to start")
parser.add_argument("--name", default="dlf-pod")
parser.add_argument("--image", default="walter-grace/data-label-factory-worker:latest",
help="Docker image to run on the pod")
parser.add_argument("--disk-gb", type=int, default=50)
parser.add_argument("--network-volume", default=None,
help="Optional persistent volume id (use for repeated runs)")
sup = sub.add_parser("up", help="Provision a GPU pod")
add_up_args(sup)
spu = sub.add_parser("push", help="Copy project YAML + images to pod")
spu.add_argument("--project", required=True, help="Path to a project YAML")
spu.add_argument("--images", default=None, help="Optional local image dir to rsync")
sr = sub.add_parser("run", help="Run a shell command on the pod")
sr.add_argument("--command", default=None, help="Shell command (defaults to `data_label_factory --help`)")
spl = sub.add_parser("pull", help="Download an experiment from the pod")
spl.add_argument("--experiment", default="latest", help="Experiment dir name (or 'latest')")
spl.add_argument("--out", default="./experiments", help="Local destination")
spb = sub.add_parser("publish", help="Publish an experiment to a HF dataset repo")
spb.add_argument("--experiment", required=True, help="Local experiment dir to publish")
spb.add_argument("--to", required=True, help="HF repo id, e.g. waltgrace/my-dataset")
spb.add_argument("--private", action="store_true")
spb.add_argument("--hf-token", default=None)
spb.add_argument("--card", default=None, help="Path to a README.md to use as the dataset card")
sd = sub.add_parser("down", help="Destroy the active pod")
spi = sub.add_parser("pipeline", help="One-shot: up β†’ push β†’ run β†’ pull β†’ publish β†’ down")
add_up_args(spi)
spi.add_argument("--project", required=True)
spi.add_argument("--images", default=None)
spi.add_argument("--max-per-query", type=int, default=30)
spi.add_argument("--publish-to", default=None, help="HF repo id to publish results to")
spi.add_argument("--keep-pod", action="store_true",
help="Don't destroy the pod when done (you'll be billed for idle time)")
sb = sub.add_parser("build", help="Build the worker Docker image")
sb.add_argument("--tag", default="walter-grace/data-label-factory-worker:latest")
sb.add_argument("--push", action="store_true", help="Push to registry after building")
ss = sub.add_parser("serverless", help="Manage RunPod serverless endpoints")
ss_sub = ss.add_subparsers(dest="serverless_action", required=True)
ss_create = ss_sub.add_parser("create")
ss_create.add_argument("--image", required=True)
ss_create.add_argument("--name", default="dlf-serverless")
ss_create.add_argument("--gpu", default="NVIDIA RTX A4000")
ss_create.add_argument("--workers-min", type=int, default=0)
ss_create.add_argument("--workers-max", type=int, default=3)
ss_create.add_argument("--idle-timeout", type=int, default=5)
ss_test = ss_sub.add_parser("test")
ss_test.add_argument("--endpoint", required=True)
ss_test.add_argument("--image-path", required=True)
ss_test.add_argument("--query", required=True)
ss_destroy = ss_sub.add_parser("destroy")
ss_destroy.add_argument("--endpoint", required=True)
args = p.parse_args()
handlers = {
"up": cmd_up,
"push": cmd_push,
"run": cmd_run,
"pull": cmd_pull,
"publish": cmd_publish,
"down": cmd_down,
"pipeline": cmd_pipeline,
"build": cmd_build,
"serverless": cmd_serverless,
}
handler = handlers.get(args.command)
if handler is None:
p.print_help()
sys.exit(1)
handler(args)
if __name__ == "__main__":
main()