waltgrace's picture
initial release: deploy code + split scripts
0e41b61 verified
#!/usr/bin/env python3
"""
mac-tensor join — register this Mac as a worker peer and start an expert node.
Usage:
mac-tensor join http://leader-ip:8500 \\
--model qwen35 \\
--model-dir ~/models/qwen35-stream \\
--port 9301
Workflow:
1. POST /swarm/register to the leader → get assigned partition + peer_id
2. Start the expert node with that partition
3. Send heartbeats every 20s
4. On Ctrl-C: POST /swarm/leave and shut down the node
5. If leader changes our partition (other peer joined), restart node
with new partition (NOT YET — phase 2 work)
"""
import argparse
import json
import os
import signal
import sys
import socket
import subprocess
import threading
import time
import urllib.request
import urllib.error
HEARTBEAT_INTERVAL = 20.0 # seconds
def get_local_ip():
"""Best-effort: figure out our outward-facing IP.
Note: returns the public/LAN IP, not the Tailscale IP. In Tailscale
userspace mode, local processes can't bind to or connect to tailnet
IPs directly without going through the SOCKS5 proxy, so the leader
must reach us via our regular public address.
"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
except Exception:
return "localhost"
def get_mem_gb():
"""Get total system RAM in GB."""
try:
out = subprocess.check_output(["sysctl", "-n", "hw.memsize"]).decode().strip()
return round(int(out) / 1e9, 1)
except Exception:
return 16 # default guess
def post_json(url, body, timeout=10):
"""POST JSON, return parsed response."""
data = json.dumps(body).encode()
req = urllib.request.Request(
url, data=data, headers={"Content-Type": "application/json"},
)
with urllib.request.urlopen(req, timeout=timeout) as resp:
return json.loads(resp.read())
def start_expert_node(model_key, partition, model_dir, port, scripts_dir):
"""Spawn the expert node subprocess. Returns the Popen handle."""
from .cli import SUPPORTED_MODELS
model = SUPPORTED_MODELS[model_key]
script = os.path.join(scripts_dir, model["node_script"])
if not os.path.exists(script):
raise FileNotFoundError(f"Expert node script not found: {script}")
cmd = [
"python3", script,
"--partition", partition,
"--model-dir", os.path.expanduser(model_dir),
"--port", str(port),
"--memory-limit-gb", "14",
]
print(f" Spawning: {' '.join(cmd)}")
return subprocess.Popen(cmd)
def wait_for_node_ready(url, timeout=180):
"""Poll /health until the expert node responds OK."""
print(f" Waiting for {url}/health to come up...", flush=True)
start = time.time()
while time.time() - start < timeout:
try:
with urllib.request.urlopen(f"{url}/health", timeout=2) as r:
data = json.loads(r.read())
if data.get("status") == "ok":
return True
except Exception:
pass
time.sleep(2)
return False
def heartbeat_loop(leader_url, peer_id, stop_event):
"""Background thread: send heartbeats until told to stop."""
while not stop_event.is_set():
try:
resp = post_json(
f"{leader_url}/swarm/heartbeat",
{"peer_id": peer_id},
timeout=5,
)
# The leader may tell us our partition changed — for now we just log it
# (Phase 2: trigger graceful re-partition by restarting the node)
except Exception as e:
print(f" [heartbeat] failed: {e}", flush=True)
stop_event.wait(HEARTBEAT_INTERVAL)
def main(args):
leader_url = args.leader.rstrip("/")
model_key = args.model
model_dir = args.model_dir or f"~/models/{model_key}-stream"
port = args.port
print("=" * 60)
print(f"mac-tensor swarm join")
print(f" Leader: {leader_url}")
print(f" Model: {model_key}")
print(f" Local IP: {get_local_ip()}")
print(f" Port: {port}")
print(f" RAM: {get_mem_gb()} GB")
print("=" * 60)
# Step 1: Register with the leader
print("\n[1/3] Registering with leader...")
self_url = f"http://{get_local_ip()}:{port}"
try:
resp = post_json(f"{leader_url}/swarm/register", {
"url": self_url,
"mem_gb": get_mem_gb(),
"meta": {
"hostname": socket.gethostname(),
"model": model_key,
},
}, timeout=10)
except Exception as e:
print(f" ERROR: could not register: {e}")
sys.exit(1)
peer_id = resp["peer_id"]
partition = resp["partition"]
print(f" Peer ID: {peer_id}")
print(f" Partition: {partition}")
# Step 2: Start the expert node with the assigned partition
print(f"\n[2/3] Starting expert node...")
scripts_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..")
node_proc = start_expert_node(model_key, partition, model_dir, port, scripts_dir)
if not wait_for_node_ready(self_url, timeout=300):
print(" ERROR: expert node did not come up in time")
node_proc.terminate()
try:
post_json(f"{leader_url}/swarm/leave", {"peer_id": peer_id}, timeout=5)
except Exception:
pass
sys.exit(1)
print(f" Expert node ready at {self_url}")
# Step 3: Heartbeat loop until interrupted
print(f"\n[3/3] Sending heartbeats every {HEARTBEAT_INTERVAL}s. Ctrl-C to leave.")
stop_event = threading.Event()
hb_thread = threading.Thread(
target=heartbeat_loop, args=(leader_url, peer_id, stop_event), daemon=True
)
hb_thread.start()
def graceful_exit(signum=None, frame=None):
print("\n Sending leave signal to leader...")
stop_event.set()
try:
post_json(f"{leader_url}/swarm/leave", {"peer_id": peer_id}, timeout=5)
except Exception as e:
print(f" Warning: leave failed: {e}")
print(" Stopping expert node...")
node_proc.terminate()
try:
node_proc.wait(timeout=5)
except subprocess.TimeoutExpired:
node_proc.kill()
print(" Bye.")
sys.exit(0)
signal.signal(signal.SIGINT, graceful_exit)
signal.signal(signal.SIGTERM, graceful_exit)
# Block on the node process
try:
node_proc.wait()
except KeyboardInterrupt:
graceful_exit()