| |
| """adaption_pipeline.py — drive the Adaption augmentation pipeline via API key. |
| |
| Steps (each gated, never blindly spends credits): |
| 1. upload/initiate -> presigned S3 PUT url |
| 2. PUT seed jsonl to S3 |
| 3. upload/complete -> creates+processes Dataset (returns dataset_id) |
| 4. GET status -> wait until READY/processed |
| 5. launch estimate -> credit cost for augmentation (estimate:true) [GATE] |
| 6. (only if --go) launch real run -> poll status -> download |
| |
| Uses curl for TLS (macOS framework python lacks a CA bundle). Never prints key. |
| """ |
| from __future__ import annotations |
| import json, os, subprocess, sys, time |
| from pathlib import Path |
|
|
| ROOT = Path(__file__).resolve().parents[1] |
| SEED = ROOT / "data" / "adaption_seed.jsonl" |
| URL = os.environ["ADAPTION_URL"].rstrip("/") |
| KEY = os.environ["ADAPTION_API_KEY"] |
|
|
| def curl(args, timeout=120): |
| p = subprocess.run(["curl","-sS","-m",str(timeout),*args], |
| capture_output=True, text=True) |
| return p.returncode, p.stdout, p.stderr |
|
|
| def api(method, path, body=None, timeout=120): |
| args = ["-X", method, URL+path, |
| "-H", f"Authorization: Bearer {KEY}", |
| "-H", "Content-Type: application/json", "-w", "\n__HTTP__%{http_code}"] |
| if body is not None: |
| args += ["--data-binary", json.dumps(body)] |
| rc,out,err = curl(args, timeout) |
| code = "" |
| if "__HTTP__" in out: |
| out, code = out.rsplit("__HTTP__",1) |
| try: |
| data = json.loads(out) if out.strip() else {} |
| except json.JSONDecodeError: |
| data = {"_raw": out[:500]} |
| return code.strip(), data |
|
|
| def main(): |
| go = "--go" in sys.argv |
| seed_bytes = SEED.read_bytes() |
| print(f"seed={SEED} ({len(seed_bytes)} bytes)") |
|
|
| code, init = api("POST", "/api/v1/datasets/upload/initiate", |
| {"name":"spec_rl_seed","file_format":"jsonl"}) |
| print("1 initiate:", code, "keys=", list(init.keys())) |
| upload_url = init.get("upload_url") |
| if not upload_url: |
| print("ABORT: no upload_url"); return 2 |
| |
| from urllib.parse import urlparse |
| s3_key = urlparse(upload_url).path.lstrip("/") |
| print(" s3_key=", s3_key) |
|
|
| |
| tmp = ROOT/"data"/".seed_put.jsonl"; tmp.write_bytes(seed_bytes) |
| rc,out,err = curl(["-X","PUT",upload_url,"--upload-file",str(tmp), |
| "-w","__HTTP__%{http_code}"], timeout=120) |
| put_code = out.rsplit("__HTTP__",1)[-1] if "__HTTP__" in out else "?" |
| print("2 s3 PUT:", put_code, (err[:120] if rc else "")) |
| if put_code not in ("200","204"): |
| print(" PUT body:", out[:300]); |
| if put_code not in ("200","204"): print("ABORT: S3 PUT failed"); return 3 |
|
|
| code, comp = api("POST","/api/v1/datasets/upload/complete", |
| {"s3_key":s3_key,"name":"spec_rl_seed","file_format":"jsonl", |
| "file_size_bytes":len(seed_bytes)}) |
| print("3 complete:", code, "keys=", list(comp.keys())) |
| ds_id = comp.get("dataset_id") or comp.get("id") or (comp.get("dataset") or {}).get("id") |
| if not ds_id: |
| print(" complete body:", json.dumps(comp)[:600]); print("ABORT: no dataset_id"); return 4 |
| print(" dataset_id=", ds_id) |
|
|
| |
| for i in range(20): |
| code, st = api("GET", f"/api/v1/datasets/{ds_id}/status") |
| s = st.get("status") or st.get("state") or json.dumps(st)[:120] |
| print(f"4 status[{i}]:", code, s) |
| if str(s).upper() in ("READY","PROCESSED","COMPLETED","ACTIVE","SUCCEEDED","DONE"): |
| break |
| if str(s).upper() in ("FAILED","ERROR"): |
| print(" status body:", json.dumps(st)[:600]); return 5 |
| time.sleep(6) |
|
|
| |
| code, est = api("POST", f"/api/v1/datasets/{ds_id}/launch", |
| {"samples_to_process":12,"estimate":True}) |
| print("5 launch ESTIMATE:", code, json.dumps(est)[:600]) |
|
|
| if not go: |
| print("\nGATE: re-run with --go to actually launch the augmentation.") |
| print("dataset_id:", ds_id) |
| return 0 |
|
|
| code, run = api("POST", f"/api/v1/datasets/{ds_id}/launch", |
| {"samples_to_process":12,"estimate":False, |
| "idempotency_key":f"specrl-{ds_id}"}) |
| print("6 launch RUN:", code, json.dumps(run)[:400]) |
| for i in range(40): |
| code, st = api("GET", f"/api/v1/datasets/{ds_id}/status") |
| s = st.get("status") or st.get("state") or json.dumps(st)[:120] |
| print(f" run-status[{i}]:", code, s) |
| if str(s).upper() in ("READY","PROCESSED","COMPLETED","SUCCEEDED","DONE"): |
| break |
| if str(s).upper() in ("FAILED","ERROR"): |
| print(" FAILED:", json.dumps(st)[:400]); return 6 |
| time.sleep(10) |
| |
| code, dl = api("GET", f"/api/v1/datasets/{ds_id}/download") |
| print("7 download:", code, json.dumps(dl)[:400] if isinstance(dl,dict) else str(dl)[:400]) |
| (ROOT/"data"/"adaption_download.json").write_text(json.dumps(dl)) |
| print(" wrote data/adaption_download.json") |
| return 0 |
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|