lean-laguna / scripts /adaption_pipeline.py
art87able's picture
Upload folder using huggingface_hub
8cc969e verified
#!/usr/bin/env python3
"""adaption_pipeline.py — drive the Adaption augmentation pipeline via API key.
Steps (each gated, never blindly spends credits):
1. upload/initiate -> presigned S3 PUT url
2. PUT seed jsonl to S3
3. upload/complete -> creates+processes Dataset (returns dataset_id)
4. GET status -> wait until READY/processed
5. launch estimate -> credit cost for augmentation (estimate:true) [GATE]
6. (only if --go) launch real run -> poll status -> download
Uses curl for TLS (macOS framework python lacks a CA bundle). Never prints key.
"""
from __future__ import annotations
import json, os, subprocess, sys, time
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
SEED = ROOT / "data" / "adaption_seed.jsonl"
URL = os.environ["ADAPTION_URL"].rstrip("/")
KEY = os.environ["ADAPTION_API_KEY"]
def curl(args, timeout=120):
p = subprocess.run(["curl","-sS","-m",str(timeout),*args],
capture_output=True, text=True)
return p.returncode, p.stdout, p.stderr
def api(method, path, body=None, timeout=120):
args = ["-X", method, URL+path,
"-H", f"Authorization: Bearer {KEY}",
"-H", "Content-Type: application/json", "-w", "\n__HTTP__%{http_code}"]
if body is not None:
args += ["--data-binary", json.dumps(body)]
rc,out,err = curl(args, timeout)
code = ""
if "__HTTP__" in out:
out, code = out.rsplit("__HTTP__",1)
try:
data = json.loads(out) if out.strip() else {}
except json.JSONDecodeError:
data = {"_raw": out[:500]}
return code.strip(), data
def main():
go = "--go" in sys.argv
seed_bytes = SEED.read_bytes()
print(f"seed={SEED} ({len(seed_bytes)} bytes)")
code, init = api("POST", "/api/v1/datasets/upload/initiate",
{"name":"spec_rl_seed","file_format":"jsonl"})
print("1 initiate:", code, "keys=", list(init.keys()))
upload_url = init.get("upload_url")
if not upload_url:
print("ABORT: no upload_url"); return 2
# s3_key = the object path after the bucket host, before the query string
from urllib.parse import urlparse
s3_key = urlparse(upload_url).path.lstrip("/")
print(" s3_key=", s3_key)
# 2. PUT to S3 (no auth header; presigned). content-type must match if signed; try plain.
tmp = ROOT/"data"/".seed_put.jsonl"; tmp.write_bytes(seed_bytes)
rc,out,err = curl(["-X","PUT",upload_url,"--upload-file",str(tmp),
"-w","__HTTP__%{http_code}"], timeout=120)
put_code = out.rsplit("__HTTP__",1)[-1] if "__HTTP__" in out else "?"
print("2 s3 PUT:", put_code, (err[:120] if rc else ""))
if put_code not in ("200","204"):
print(" PUT body:", out[:300]);
if put_code not in ("200","204"): print("ABORT: S3 PUT failed"); return 3
code, comp = api("POST","/api/v1/datasets/upload/complete",
{"s3_key":s3_key,"name":"spec_rl_seed","file_format":"jsonl",
"file_size_bytes":len(seed_bytes)})
print("3 complete:", code, "keys=", list(comp.keys()))
ds_id = comp.get("dataset_id") or comp.get("id") or (comp.get("dataset") or {}).get("id")
if not ds_id:
print(" complete body:", json.dumps(comp)[:600]); print("ABORT: no dataset_id"); return 4
print(" dataset_id=", ds_id)
# 4. poll status
for i in range(20):
code, st = api("GET", f"/api/v1/datasets/{ds_id}/status")
s = st.get("status") or st.get("state") or json.dumps(st)[:120]
print(f"4 status[{i}]:", code, s)
if str(s).upper() in ("READY","PROCESSED","COMPLETED","ACTIVE","SUCCEEDED","DONE"):
break
if str(s).upper() in ("FAILED","ERROR"):
print(" status body:", json.dumps(st)[:600]); return 5
time.sleep(6)
# 5. credit estimate for augmentation
code, est = api("POST", f"/api/v1/datasets/{ds_id}/launch",
{"samples_to_process":12,"estimate":True})
print("5 launch ESTIMATE:", code, json.dumps(est)[:600])
if not go:
print("\nGATE: re-run with --go to actually launch the augmentation.")
print("dataset_id:", ds_id)
return 0
code, run = api("POST", f"/api/v1/datasets/{ds_id}/launch",
{"samples_to_process":12,"estimate":False,
"idempotency_key":f"specrl-{ds_id}"})
print("6 launch RUN:", code, json.dumps(run)[:400])
for i in range(40):
code, st = api("GET", f"/api/v1/datasets/{ds_id}/status")
s = st.get("status") or st.get("state") or json.dumps(st)[:120]
print(f" run-status[{i}]:", code, s)
if str(s).upper() in ("READY","PROCESSED","COMPLETED","SUCCEEDED","DONE"):
break
if str(s).upper() in ("FAILED","ERROR"):
print(" FAILED:", json.dumps(st)[:400]); return 6
time.sleep(10)
# download
code, dl = api("GET", f"/api/v1/datasets/{ds_id}/download")
print("7 download:", code, json.dumps(dl)[:400] if isinstance(dl,dict) else str(dl)[:400])
(ROOT/"data"/"adaption_download.json").write_text(json.dumps(dl))
print(" wrote data/adaption_download.json")
return 0
if __name__ == "__main__":
raise SystemExit(main())