File size: 5,271 Bytes
8cc969e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#!/usr/bin/env python3
"""adaption_pipeline.py — drive the Adaption augmentation pipeline via API key.

Steps (each gated, never blindly spends credits):
  1. upload/initiate  -> presigned S3 PUT url
  2. PUT seed jsonl to S3
  3. upload/complete   -> creates+processes Dataset (returns dataset_id)
  4. GET status        -> wait until READY/processed
  5. launch estimate   -> credit cost for augmentation (estimate:true) [GATE]
  6. (only if --go)    launch real run -> poll status -> download

Uses curl for TLS (macOS framework python lacks a CA bundle). Never prints key.
"""
from __future__ import annotations
import json, os, subprocess, sys, time
from pathlib import Path

ROOT = Path(__file__).resolve().parents[1]
SEED = ROOT / "data" / "adaption_seed.jsonl"
URL = os.environ["ADAPTION_URL"].rstrip("/")
KEY = os.environ["ADAPTION_API_KEY"]

def curl(args, timeout=120):
    p = subprocess.run(["curl","-sS","-m",str(timeout),*args],
                       capture_output=True, text=True)
    return p.returncode, p.stdout, p.stderr

def api(method, path, body=None, timeout=120):
    args = ["-X", method, URL+path,
            "-H", f"Authorization: Bearer {KEY}",
            "-H", "Content-Type: application/json", "-w", "\n__HTTP__%{http_code}"]
    if body is not None:
        args += ["--data-binary", json.dumps(body)]
    rc,out,err = curl(args, timeout)
    code = ""
    if "__HTTP__" in out:
        out, code = out.rsplit("__HTTP__",1)
    try:
        data = json.loads(out) if out.strip() else {}
    except json.JSONDecodeError:
        data = {"_raw": out[:500]}
    return code.strip(), data

def main():
    go = "--go" in sys.argv
    seed_bytes = SEED.read_bytes()
    print(f"seed={SEED} ({len(seed_bytes)} bytes)")

    code, init = api("POST", "/api/v1/datasets/upload/initiate",
                     {"name":"spec_rl_seed","file_format":"jsonl"})
    print("1 initiate:", code, "keys=", list(init.keys()))
    upload_url = init.get("upload_url")
    if not upload_url:
        print("ABORT: no upload_url"); return 2
    # s3_key = the object path after the bucket host, before the query string
    from urllib.parse import urlparse
    s3_key = urlparse(upload_url).path.lstrip("/")
    print("   s3_key=", s3_key)

    # 2. PUT to S3 (no auth header; presigned). content-type must match if signed; try plain.
    tmp = ROOT/"data"/".seed_put.jsonl"; tmp.write_bytes(seed_bytes)
    rc,out,err = curl(["-X","PUT",upload_url,"--upload-file",str(tmp),
                       "-w","__HTTP__%{http_code}"], timeout=120)
    put_code = out.rsplit("__HTTP__",1)[-1] if "__HTTP__" in out else "?"
    print("2 s3 PUT:", put_code, (err[:120] if rc else ""))
    if put_code not in ("200","204"):
        print("   PUT body:", out[:300]);
        if put_code not in ("200","204"): print("ABORT: S3 PUT failed"); return 3

    code, comp = api("POST","/api/v1/datasets/upload/complete",
                     {"s3_key":s3_key,"name":"spec_rl_seed","file_format":"jsonl",
                      "file_size_bytes":len(seed_bytes)})
    print("3 complete:", code, "keys=", list(comp.keys()))
    ds_id = comp.get("dataset_id") or comp.get("id") or (comp.get("dataset") or {}).get("id")
    if not ds_id:
        print("   complete body:", json.dumps(comp)[:600]); print("ABORT: no dataset_id"); return 4
    print("   dataset_id=", ds_id)

    # 4. poll status
    for i in range(20):
        code, st = api("GET", f"/api/v1/datasets/{ds_id}/status")
        s = st.get("status") or st.get("state") or json.dumps(st)[:120]
        print(f"4 status[{i}]:", code, s)
        if str(s).upper() in ("READY","PROCESSED","COMPLETED","ACTIVE","SUCCEEDED","DONE"):
            break
        if str(s).upper() in ("FAILED","ERROR"):
            print("   status body:", json.dumps(st)[:600]); return 5
        time.sleep(6)

    # 5. credit estimate for augmentation
    code, est = api("POST", f"/api/v1/datasets/{ds_id}/launch",
                    {"samples_to_process":12,"estimate":True})
    print("5 launch ESTIMATE:", code, json.dumps(est)[:600])

    if not go:
        print("\nGATE: re-run with --go to actually launch the augmentation.")
        print("dataset_id:", ds_id)
        return 0

    code, run = api("POST", f"/api/v1/datasets/{ds_id}/launch",
                    {"samples_to_process":12,"estimate":False,
                     "idempotency_key":f"specrl-{ds_id}"})
    print("6 launch RUN:", code, json.dumps(run)[:400])
    for i in range(40):
        code, st = api("GET", f"/api/v1/datasets/{ds_id}/status")
        s = st.get("status") or st.get("state") or json.dumps(st)[:120]
        print(f"   run-status[{i}]:", code, s)
        if str(s).upper() in ("READY","PROCESSED","COMPLETED","SUCCEEDED","DONE"):
            break
        if str(s).upper() in ("FAILED","ERROR"):
            print("   FAILED:", json.dumps(st)[:400]); return 6
        time.sleep(10)
    # download
    code, dl = api("GET", f"/api/v1/datasets/{ds_id}/download")
    print("7 download:", code, json.dumps(dl)[:400] if isinstance(dl,dict) else str(dl)[:400])
    (ROOT/"data"/"adaption_download.json").write_text(json.dumps(dl))
    print("   wrote data/adaption_download.json")
    return 0

if __name__ == "__main__":
    raise SystemExit(main())