#!/usr/bin/env python3 """adaption_pipeline.py — drive the Adaption augmentation pipeline via API key. Steps (each gated, never blindly spends credits): 1. upload/initiate -> presigned S3 PUT url 2. PUT seed jsonl to S3 3. upload/complete -> creates+processes Dataset (returns dataset_id) 4. GET status -> wait until READY/processed 5. launch estimate -> credit cost for augmentation (estimate:true) [GATE] 6. (only if --go) launch real run -> poll status -> download Uses curl for TLS (macOS framework python lacks a CA bundle). Never prints key. """ from __future__ import annotations import json, os, subprocess, sys, time from pathlib import Path ROOT = Path(__file__).resolve().parents[1] SEED = ROOT / "data" / "adaption_seed.jsonl" URL = os.environ["ADAPTION_URL"].rstrip("/") KEY = os.environ["ADAPTION_API_KEY"] def curl(args, timeout=120): p = subprocess.run(["curl","-sS","-m",str(timeout),*args], capture_output=True, text=True) return p.returncode, p.stdout, p.stderr def api(method, path, body=None, timeout=120): args = ["-X", method, URL+path, "-H", f"Authorization: Bearer {KEY}", "-H", "Content-Type: application/json", "-w", "\n__HTTP__%{http_code}"] if body is not None: args += ["--data-binary", json.dumps(body)] rc,out,err = curl(args, timeout) code = "" if "__HTTP__" in out: out, code = out.rsplit("__HTTP__",1) try: data = json.loads(out) if out.strip() else {} except json.JSONDecodeError: data = {"_raw": out[:500]} return code.strip(), data def main(): go = "--go" in sys.argv seed_bytes = SEED.read_bytes() print(f"seed={SEED} ({len(seed_bytes)} bytes)") code, init = api("POST", "/api/v1/datasets/upload/initiate", {"name":"spec_rl_seed","file_format":"jsonl"}) print("1 initiate:", code, "keys=", list(init.keys())) upload_url = init.get("upload_url") if not upload_url: print("ABORT: no upload_url"); return 2 # s3_key = the object path after the bucket host, before the query string from urllib.parse import urlparse s3_key = urlparse(upload_url).path.lstrip("/") print(" s3_key=", s3_key) # 2. PUT to S3 (no auth header; presigned). content-type must match if signed; try plain. tmp = ROOT/"data"/".seed_put.jsonl"; tmp.write_bytes(seed_bytes) rc,out,err = curl(["-X","PUT",upload_url,"--upload-file",str(tmp), "-w","__HTTP__%{http_code}"], timeout=120) put_code = out.rsplit("__HTTP__",1)[-1] if "__HTTP__" in out else "?" print("2 s3 PUT:", put_code, (err[:120] if rc else "")) if put_code not in ("200","204"): print(" PUT body:", out[:300]); if put_code not in ("200","204"): print("ABORT: S3 PUT failed"); return 3 code, comp = api("POST","/api/v1/datasets/upload/complete", {"s3_key":s3_key,"name":"spec_rl_seed","file_format":"jsonl", "file_size_bytes":len(seed_bytes)}) print("3 complete:", code, "keys=", list(comp.keys())) ds_id = comp.get("dataset_id") or comp.get("id") or (comp.get("dataset") or {}).get("id") if not ds_id: print(" complete body:", json.dumps(comp)[:600]); print("ABORT: no dataset_id"); return 4 print(" dataset_id=", ds_id) # 4. poll status for i in range(20): code, st = api("GET", f"/api/v1/datasets/{ds_id}/status") s = st.get("status") or st.get("state") or json.dumps(st)[:120] print(f"4 status[{i}]:", code, s) if str(s).upper() in ("READY","PROCESSED","COMPLETED","ACTIVE","SUCCEEDED","DONE"): break if str(s).upper() in ("FAILED","ERROR"): print(" status body:", json.dumps(st)[:600]); return 5 time.sleep(6) # 5. credit estimate for augmentation code, est = api("POST", f"/api/v1/datasets/{ds_id}/launch", {"samples_to_process":12,"estimate":True}) print("5 launch ESTIMATE:", code, json.dumps(est)[:600]) if not go: print("\nGATE: re-run with --go to actually launch the augmentation.") print("dataset_id:", ds_id) return 0 code, run = api("POST", f"/api/v1/datasets/{ds_id}/launch", {"samples_to_process":12,"estimate":False, "idempotency_key":f"specrl-{ds_id}"}) print("6 launch RUN:", code, json.dumps(run)[:400]) for i in range(40): code, st = api("GET", f"/api/v1/datasets/{ds_id}/status") s = st.get("status") or st.get("state") or json.dumps(st)[:120] print(f" run-status[{i}]:", code, s) if str(s).upper() in ("READY","PROCESSED","COMPLETED","SUCCEEDED","DONE"): break if str(s).upper() in ("FAILED","ERROR"): print(" FAILED:", json.dumps(st)[:400]); return 6 time.sleep(10) # download code, dl = api("GET", f"/api/v1/datasets/{ds_id}/download") print("7 download:", code, json.dumps(dl)[:400] if isinstance(dl,dict) else str(dl)[:400]) (ROOT/"data"/"adaption_download.json").write_text(json.dumps(dl)) print(" wrote data/adaption_download.json") return 0 if __name__ == "__main__": raise SystemExit(main())