OpenTransformer's picture
download
raw
9.7 kB
#!/usr/bin/env python3
"""Single-file outbound AGILLM4.1 join worker.
The client only opens outbound HTTPS connections, verifies package hashes, runs
a local worker command, and submits the result with a short-lived lease token.
"""
from __future__ import annotations
import argparse
import hashlib
import http.client
import json
import os
from pathlib import Path
import socket
import ssl
import subprocess
import sys
import time
from typing import Any
from urllib.parse import urljoin, urlparse
from urllib.request import Request, urlopen
VERSION = "2026-06-02"
def sha256_file(path: Path) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
h.update(chunk)
return h.hexdigest()
def ssl_context(insecure: bool) -> ssl.SSLContext | None:
if insecure:
return ssl._create_unverified_context()
return ssl.create_default_context()
def http_json(url: str, payload: dict[str, Any], insecure: bool) -> tuple[int, dict[str, Any] | None]:
body = json.dumps(payload).encode("utf-8")
req = Request(url, data=body, method="POST", headers={"Content-Type": "application/json"})
try:
with urlopen(req, context=ssl_context(insecure), timeout=60) as r:
if r.status == 204:
return 204, None
return r.status, json.loads(r.read() or b"{}")
except Exception as exc:
raise RuntimeError(f"request failed: {url}: {exc}") from exc
def download(url: str, dest: Path, expected_sha: str, token: str, insecure: bool) -> None:
dest.parent.mkdir(parents=True, exist_ok=True)
req = Request(url, headers={"Authorization": f"Bearer {token}"})
tmp = dest.with_suffix(dest.suffix + ".part")
h = hashlib.sha256()
with urlopen(req, context=ssl_context(insecure), timeout=300) as r, tmp.open("wb") as f:
while True:
chunk = r.read(1024 * 1024)
if not chunk:
break
h.update(chunk)
f.write(chunk)
actual = h.hexdigest()
if actual.lower() != expected_sha.lower():
tmp.unlink(missing_ok=True)
raise RuntimeError(f"sha256 mismatch for {dest.name}: {actual} != {expected_sha}")
os.replace(tmp, dest)
def submit_file(url: str, path: Path, token: str, insecure: bool) -> dict[str, Any]:
parsed = urlparse(url)
result_sha = sha256_file(path)
conn_cls = http.client.HTTPSConnection if parsed.scheme == "https" else http.client.HTTPConnection
kwargs: dict[str, Any] = {"timeout": 300}
if parsed.scheme == "https":
kwargs["context"] = ssl_context(insecure)
conn = conn_cls(parsed.hostname, parsed.port, **kwargs)
target = parsed.path + (("?" + parsed.query) if parsed.query else "")
conn.putrequest("POST", target)
conn.putheader("Authorization", f"Bearer {token}")
conn.putheader("Content-Type", "application/octet-stream")
conn.putheader("Content-Length", str(path.stat().st_size))
conn.putheader("X-Result-Sha256", result_sha)
conn.endheaders()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
conn.send(chunk)
resp = conn.getresponse()
data = resp.read()
conn.close()
if resp.status not in (200, 202):
raise RuntimeError(f"submit failed {resp.status}: {data[:500]!r}")
return json.loads(data or b"{}")
def cache_artifact(lease: dict[str, Any], key: str, cache_dir: Path, token: str, insecure: bool) -> Path | None:
spec = lease.get(key)
if not spec:
return None
dest = cache_dir / f"{spec['sha256'][:16]}_{spec['name']}"
if dest.exists() and sha256_file(dest).lower() == spec["sha256"].lower():
return dest
download(spec["url"], dest, spec["sha256"], token, insecure)
return dest
def default_worker_cmd(args: argparse.Namespace, lease: dict[str, Any], package: Path, frozen: Path | None, out: Path) -> list[str]:
worker_args = lease.get("worker_args", {})
if not args.worker_script:
raise RuntimeError("default mode requires --worker-script or --worker-cmd")
cmd = [
args.worker_python,
"-u",
args.worker_script,
"--package",
str(package),
"--out",
str(out),
]
if frozen:
cmd += ["--frozen", str(frozen)]
for name, default in (
("device", args.device),
("threads", args.threads),
("steps", args.steps),
("vchunk", args.vchunk),
):
value = worker_args.get(name, default)
if value is not None:
cmd += [f"--{name}", str(value)]
cmd += ["--log-every", str(args.log_every)]
return cmd
def run_worker(args: argparse.Namespace, lease: dict[str, Any], package: Path, frozen: Path | None, out: Path) -> None:
env = os.environ.copy()
worker_args = lease.get("worker_args", {})
threads = str(worker_args.get("threads", args.threads or 1))
env["OMP_NUM_THREADS"] = threads
env["MKL_NUM_THREADS"] = threads
env["OPENBLAS_NUM_THREADS"] = threads
env.setdefault("TOKENIZER_ID", "deepseek-ai/DeepSeek-V4-Pro")
if args.worker_cmd:
template_data = {
"package": str(package),
"out": str(out),
"frozen": str(frozen or ""),
"lease_id": lease["lease_id"],
**{k: str(v) for k, v in worker_args.items()},
}
cmd = args.worker_cmd.format(**template_data)
subprocess.check_call(cmd, shell=True, env=env)
else:
subprocess.check_call(default_worker_cmd(args, lease, package, frozen, out), env=env)
if not out.exists() or out.stat().st_size <= 0:
raise RuntimeError(f"worker did not create {out}")
def once(args: argparse.Namespace) -> bool:
workdir = Path(args.workdir)
cache_dir = workdir / "cache"
leases_dir = workdir / "leases"
cache_dir.mkdir(parents=True, exist_ok=True)
leases_dir.mkdir(parents=True, exist_ok=True)
req_url = urljoin(args.coordinator_url.rstrip("/") + "/", "api/v1/leases/request")
node_id = args.node_id or f"{socket.gethostname()}-{os.getpid()}"
status, lease = http_json(
req_url,
{
"node_id": node_id,
"version": VERSION,
"capabilities": {
"device": args.device,
"threads": args.threads,
"vchunk": args.vchunk,
"max_result_bytes": args.max_result_bytes,
},
},
args.insecure,
)
if status == 204 or not lease:
print(json.dumps({"event": "no_lease"}), flush=True)
return False
token = lease["token"]
lease_dir = leases_dir / lease["lease_id"]
lease_dir.mkdir(parents=True, exist_ok=True)
(lease_dir / "lease.json").write_text(json.dumps(lease, indent=2), encoding="utf-8")
package = cache_artifact(lease, "package", cache_dir, token, args.insecure)
frozen = cache_artifact(lease, "frozen", cache_dir, token, args.insecure)
if package is None:
raise RuntimeError("lease did not include a package")
out = lease_dir / "result.pt"
started = time.time()
run_worker(args, lease, package, frozen, out)
response = submit_file(lease["submit_url"], out, token, args.insecure)
print(
json.dumps(
{
"event": "submitted",
"lease_id": lease["lease_id"],
"bytes": out.stat().st_size,
"sec": round(time.time() - started, 3),
"response": response,
}
),
flush=True,
)
return True
def main() -> int:
ap = argparse.ArgumentParser(description="AGILLM4.1 outbound-only untrusted join worker")
ap.add_argument("--coordinator-url", default=os.environ.get("AGILLM41_COORDINATOR_URL") or os.environ.get("AGILLM35_COORDINATOR_URL", ""))
ap.add_argument("--workdir", default=os.environ.get("AGILLM41_JOIN_WORKDIR") or "./agillm41_join_work")
ap.add_argument("--node-id", default=os.environ.get("AGILLM41_NODE_ID") or os.environ.get("AGILLM35_NODE_ID", ""))
ap.add_argument("--device", default="cpu")
ap.add_argument("--threads", type=int, default=max(1, (os.cpu_count() or 2) // 2))
ap.add_argument("--steps", type=int, default=8)
ap.add_argument("--vchunk", type=int, default=4096)
ap.add_argument("--max-result-bytes", type=int, default=500_000_000)
ap.add_argument("--worker-python", default=sys.executable)
ap.add_argument("--worker-script", default="agillm35_slice_worker.py")
ap.add_argument("--worker-cmd", default="", help="optional shell command template using {package}, {out}, {frozen}")
ap.add_argument("--log-every", type=int, default=4)
ap.add_argument("--loop", action="store_true")
ap.add_argument("--sleep-sec", type=int, default=30)
ap.add_argument("--insecure", action="store_true", help="allow invalid TLS certs; test only")
args = ap.parse_args()
if not args.coordinator_url:
raise SystemExit("set --coordinator-url or AGILLM41_COORDINATOR_URL")
while True:
try:
got = once(args)
except Exception as exc:
print(json.dumps({"event": "error", "error": str(exc)}), flush=True)
got = False
if not args.loop:
return 0 if got else 1
time.sleep(args.sleep_sec if not got else 1)
if __name__ == "__main__":
raise SystemExit(main())

Xet Storage Details

Size:
9.7 kB
·
Xet hash:
b03c7a634208cc5ca463b7e96398bb393924b218089899e3b2d5a928f8fddce6

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.