OpenTransformer's picture
download
raw
17.4 kB
#!/usr/bin/env python3
"""Single-file AGILLM4.1 lease coordinator for trusted or untrusted helpers.
The server exposes HTTPS lease/request and result/submit endpoints. It never
exposes coordinator SSH. Results from public helpers are written to quarantine;
an operator or a separate validator decides what becomes merge-eligible.
"""
from __future__ import annotations
import argparse
import base64
import hashlib
import hmac
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
import json
import os
from pathlib import Path
import secrets
import shutil
import ssl
import sys
import threading
import time
from typing import Any
from urllib.parse import urlparse
VERSION = "2026-06-02"
MAX_JSON = 256 * 1024
def utc() -> int:
return int(time.time())
def b64u(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii")
def read_json(path: Path, default: Any = None) -> Any:
if not path.exists():
return default
return json.loads(path.read_text(encoding="utf-8"))
def write_json(path: Path, data: Any) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
tmp.write_text(json.dumps(data, indent=2, sort_keys=True) + "\n", encoding="utf-8")
os.replace(tmp, path)
def sha256_file(path: Path) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
h.update(chunk)
return h.hexdigest()
def parse_kv(items: list[str]) -> dict[str, Any]:
out: dict[str, Any] = {}
for item in items:
if "=" not in item:
raise SystemExit(f"expected key=value, got {item!r}")
k, v = item.split("=", 1)
try:
out[k] = json.loads(v)
except Exception:
out[k] = v
return out
def load_secret(path: Path) -> bytes:
if path.exists():
return path.read_bytes().strip()
secret = b64u(secrets.token_bytes(32)).encode("ascii")
path.parent.mkdir(parents=True, exist_ok=True)
path.write_bytes(secret + b"\n")
try:
os.chmod(path, 0o600)
except Exception:
pass
return secret
class LeaseStore:
def __init__(self, spool: Path, secret: bytes, public_base_url: str):
self.spool = spool
self.secret = secret
self.public_base_url = public_base_url.rstrip("/")
self.lock = threading.Lock()
for name in ("available", "leased", "quarantine", "accepted", "artifacts"):
(spool / name).mkdir(parents=True, exist_ok=True)
def token(self, lease_id: str, expires_at: int, package_sha256: str) -> str:
msg = f"{lease_id}.{expires_at}.{package_sha256}".encode("utf-8")
return b64u(hmac.new(self.secret, msg, hashlib.sha256).digest())
def verify_token(self, lease: dict[str, Any], token: str) -> bool:
expected = self.token(lease["lease_id"], int(lease["expires_at"]), lease["package"]["sha256"])
return hmac.compare_digest(expected, token)
def add_lease(
self,
package: Path,
ttl_sec: int,
worker_args: dict[str, Any],
metadata: dict[str, Any],
frozen: Path | None,
max_result_bytes: int,
copy_artifacts: bool,
) -> dict[str, Any]:
package = package.resolve()
if not package.exists():
raise SystemExit(f"package not found: {package}")
artifact_pkg = package
artifact_frozen = frozen.resolve() if frozen else None
if copy_artifacts:
dst = self.spool / "artifacts" / package.name
shutil.copy2(package, dst)
artifact_pkg = dst.resolve()
if frozen:
fdst = self.spool / "artifacts" / frozen.name
shutil.copy2(frozen, fdst)
artifact_frozen = fdst.resolve()
lease_name = f"{utc()}_{secrets.token_hex(6)}_{package.name}.json"
data: dict[str, Any] = {
"state": "available",
"created_at": utc(),
"ttl_sec": ttl_sec,
"package": {
"path": str(artifact_pkg),
"name": artifact_pkg.name,
"sha256": sha256_file(artifact_pkg),
"bytes": artifact_pkg.stat().st_size,
},
"worker_args": worker_args,
"metadata": metadata,
"result": {"max_bytes": max_result_bytes, "min_bytes": 1},
}
if artifact_frozen:
data["frozen"] = {
"path": str(artifact_frozen),
"name": artifact_frozen.name,
"sha256": sha256_file(artifact_frozen),
"bytes": artifact_frozen.stat().st_size,
}
write_json(self.spool / "available" / lease_name, data)
return data
def request(self, node_id: str, capabilities: dict[str, Any]) -> dict[str, Any] | None:
with self.lock:
candidates = sorted((self.spool / "available").glob("*.json"))
if not candidates:
return None
src = candidates[0]
lease = read_json(src, {})
lease_id = secrets.token_urlsafe(18)
expires_at = utc() + int(lease.get("ttl_sec", 900))
lease.update(
{
"state": "leased",
"lease_id": lease_id,
"node_id": node_id,
"capabilities": capabilities,
"leased_at": utc(),
"expires_at": expires_at,
}
)
token = self.token(lease_id, expires_at, lease["package"]["sha256"])
lease["token_hint"] = "stored server-side hash only; token returned once"
dst = self.spool / "leased" / f"{lease_id}.json"
write_json(dst, lease)
src.unlink()
return self.public_lease(lease, token)
def public_lease(self, lease: dict[str, Any], token: str) -> dict[str, Any]:
lease_id = lease["lease_id"]
out = {
"version": VERSION,
"lease_id": lease_id,
"token": token,
"expires_at": lease["expires_at"],
"package": {
"url": f"{self.public_base_url}/api/v1/leases/{lease_id}/package",
"sha256": lease["package"]["sha256"],
"bytes": lease["package"]["bytes"],
"name": lease["package"]["name"],
},
"worker_args": lease.get("worker_args", {}),
"metadata": lease.get("metadata", {}),
"submit_url": f"{self.public_base_url}/api/v1/leases/{lease_id}/submit",
"security": {
"transport": "https strongly recommended; http is test-only",
"result_policy": "quarantine",
"ssh": "not exposed",
},
}
if "frozen" in lease:
out["frozen"] = {
"url": f"{self.public_base_url}/api/v1/leases/{lease_id}/frozen",
"sha256": lease["frozen"]["sha256"],
"bytes": lease["frozen"]["bytes"],
"name": lease["frozen"]["name"],
}
return out
def leased(self, lease_id: str) -> dict[str, Any] | None:
return read_json(self.spool / "leased" / f"{lease_id}.json")
def quarantine_result(self, lease: dict[str, Any], result_path: Path, result_sha: str) -> None:
lease_id = lease["lease_id"]
meta = dict(lease)
meta.pop("token_hint", None)
meta["state"] = "quarantined"
meta["submitted_at"] = utc()
meta["result_file"] = str(result_path)
meta["result_sha256"] = result_sha
write_json(self.spool / "quarantine" / f"{lease_id}.json", meta)
def bearer(headers: Any) -> str:
auth = headers.get("Authorization", "")
if not auth.startswith("Bearer "):
return ""
return auth.split(" ", 1)[1].strip()
class Handler(BaseHTTPRequestHandler):
server_version = "AGILLM41LeaseHost/1"
@property
def store(self) -> LeaseStore:
return self.server.store # type: ignore[attr-defined]
def send_json(self, code: int, data: Any) -> None:
body = json.dumps(data, indent=2).encode("utf-8")
self.send_response(code)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def read_json_body(self) -> dict[str, Any]:
n = int(self.headers.get("Content-Length", "0"))
if n > MAX_JSON:
raise ValueError("JSON body too large")
return json.loads(self.rfile.read(n) or b"{}")
def auth_lease(self, lease_id: str) -> dict[str, Any] | None:
lease = self.store.leased(lease_id)
if not lease:
self.send_json(404, {"error": "unknown lease"})
return None
if utc() > int(lease["expires_at"]):
self.send_json(410, {"error": "lease expired"})
return None
token = bearer(self.headers)
if not token or not self.store.verify_token(lease, token):
self.send_json(401, {"error": "bad token"})
return None
return lease
def do_GET(self) -> None:
parsed = urlparse(self.path)
if parsed.path == "/health":
self.send_json(200, {"ok": True, "version": VERSION})
return
parts = parsed.path.strip("/").split("/")
if len(parts) == 5 and parts[:3] == ["api", "v1", "leases"]:
lease_id, kind = parts[3], parts[4]
if kind not in ("package", "frozen"):
self.send_json(404, {"error": "bad artifact"})
return
lease = self.auth_lease(lease_id)
if not lease:
return
if kind not in lease:
self.send_json(404, {"error": f"lease has no {kind}"})
return
path = Path(lease[kind]["path"])
if not path.exists():
self.send_json(404, {"error": "artifact missing"})
return
self.send_response(200)
self.send_header("Content-Type", "application/octet-stream")
self.send_header("Content-Length", str(path.stat().st_size))
self.send_header("X-Sha256", lease[kind]["sha256"])
self.end_headers()
with path.open("rb") as f:
shutil.copyfileobj(f, self.wfile, length=1024 * 1024)
return
self.send_json(404, {"error": "not found"})
def do_POST(self) -> None:
parsed = urlparse(self.path)
if parsed.path == "/api/v1/leases/request":
try:
body = self.read_json_body()
except Exception as exc:
self.send_json(400, {"error": str(exc)})
return
lease = self.store.request(str(body.get("node_id", "unknown")), body.get("capabilities", {}))
if lease is None:
self.send_response(204)
self.end_headers()
else:
self.send_json(200, lease)
return
parts = parsed.path.strip("/").split("/")
if len(parts) == 5 and parts[:3] == ["api", "v1", "leases"] and parts[4] == "submit":
lease_id = parts[3]
lease = self.auth_lease(lease_id)
if not lease:
return
n = int(self.headers.get("Content-Length", "0"))
max_bytes = int(lease.get("result", {}).get("max_bytes", 500_000_000))
if n <= 0 or n > max_bytes:
self.send_json(413, {"error": "result size out of bounds", "bytes": n, "max": max_bytes})
return
expected_sha = self.headers.get("X-Result-Sha256", "").lower()
out = self.store.spool / "quarantine" / f"{lease_id}.result"
h = hashlib.sha256()
remaining = n
with out.open("wb") as f:
while remaining:
chunk = self.rfile.read(min(1024 * 1024, remaining))
if not chunk:
break
remaining -= len(chunk)
h.update(chunk)
f.write(chunk)
actual = h.hexdigest()
if remaining != 0:
out.unlink(missing_ok=True)
self.send_json(400, {"error": "short upload"})
return
if expected_sha and expected_sha != actual:
out.unlink(missing_ok=True)
self.send_json(400, {"error": "sha256 mismatch", "actual": actual})
return
self.store.quarantine_result(lease, out, actual)
self.send_json(202, {"status": "quarantined", "lease_id": lease_id, "sha256": actual})
return
self.send_json(404, {"error": "not found"})
def log_message(self, fmt: str, *args: Any) -> None:
sys.stderr.write("[%s] %s\n" % (time.strftime("%FT%TZ", time.gmtime()), fmt % args))
def serve(args: argparse.Namespace) -> None:
public = args.public_base_url or f"http://{args.host}:{args.port}"
secret = load_secret(Path(args.secret_file))
store = LeaseStore(Path(args.spool), secret, public)
bind_public = args.host not in ("127.0.0.1", "localhost", "::1")
if bind_public and not (args.tls_cert and args.tls_key) and not args.allow_http:
raise SystemExit("refusing public HTTP without TLS; pass --tls-cert/--tls-key or --allow-http for testing")
httpd = ThreadingHTTPServer((args.host, args.port), Handler)
httpd.store = store # type: ignore[attr-defined]
if args.tls_cert and args.tls_key:
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ctx.load_cert_chain(args.tls_cert, args.tls_key)
httpd.socket = ctx.wrap_socket(httpd.socket, server_side=True)
print(json.dumps({"event": "serve", "bind": [args.host, args.port], "public_base_url": public}), flush=True)
httpd.serve_forever()
def add_lease(args: argparse.Namespace) -> None:
secret = load_secret(Path(args.secret_file))
store = LeaseStore(Path(args.spool), secret, args.public_base_url or "http://127.0.0.1:8787")
worker_args = parse_kv(args.worker_arg)
if args.worker_args_json:
worker_args.update(json.loads(args.worker_args_json))
metadata = parse_kv(args.metadata)
data = store.add_lease(
Path(args.package),
args.ttl_sec,
worker_args,
metadata,
Path(args.frozen) if args.frozen else None,
args.max_result_bytes,
args.copy_artifacts,
)
print(json.dumps({"event": "lease_added", "package": data["package"]}, indent=2))
def list_spool(args: argparse.Namespace) -> None:
root = Path(args.spool)
for state in ("available", "leased", "quarantine", "accepted"):
files = sorted((root / state).glob("*.json"))
print(f"{state}: {len(files)}")
for path in files[: args.limit]:
data = read_json(path, {})
print(" ", path.name, data.get("lease_id", "-"), data.get("package", {}).get("name", "-"))
def main() -> int:
ap = argparse.ArgumentParser(description="AGILLM4.1 single-file lease coordinator")
sub = ap.add_subparsers(dest="cmd", required=True)
common = argparse.ArgumentParser(add_help=False)
common.add_argument("--spool", default=os.environ.get("AGILLM41_LEASE_SPOOL") or os.environ.get("AGILLM35_LEASE_SPOOL", "./agillm41_lease_spool"))
common.add_argument("--secret-file", default=os.environ.get("AGILLM41_LEASE_SECRET_FILE") or os.environ.get("AGILLM35_LEASE_SECRET_FILE", "./agillm41_lease_spool/lease_secret.txt"))
common.add_argument("--public-base-url", default=os.environ.get("AGILLM41_PUBLIC_BASE_URL") or os.environ.get("AGILLM35_PUBLIC_BASE_URL", ""))
p = sub.add_parser("serve", parents=[common])
p.add_argument("--host", default="127.0.0.1")
p.add_argument("--port", type=int, default=8787)
p.add_argument("--tls-cert")
p.add_argument("--tls-key")
p.add_argument("--allow-http", action="store_true")
p.set_defaults(func=serve)
p = sub.add_parser("add-lease", parents=[common])
p.add_argument("--package", required=True)
p.add_argument("--frozen")
p.add_argument("--ttl-sec", type=int, default=900)
p.add_argument("--worker-arg", action="append", default=[])
p.add_argument("--worker-args-json", default="")
p.add_argument("--metadata", action="append", default=[])
p.add_argument("--max-result-bytes", type=int, default=500_000_000)
p.add_argument("--copy-artifacts", action="store_true")
p.set_defaults(func=add_lease)
p = sub.add_parser("list", parents=[common])
p.add_argument("--limit", type=int, default=20)
p.set_defaults(func=list_spool)
args = ap.parse_args()
args.func(args)
return 0
if __name__ == "__main__":
raise SystemExit(main())

Xet Storage Details

Size:
17.4 kB
·
Xet hash:
400684b618209631d40e02ea4b85620837b323895cd5f2c949605f6c934fde73

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.