| """ |
| MAC Worker Agent |
| ================ |
| Run this on every worker PC to join it to the MAC cluster. |
| |
| Usage: |
| pip install httpx psutil pynvml |
| python worker_agent.py |
| |
| Environment variables (or edit DEFAULTS below): |
| MAC_MASTER_URL e.g. http://192.168.1.100:8000 |
| MAC_ENROLL_TOKEN one-time token from admin (/cluster/enroll-token) |
| MAC_WORKER_NAME display name for this node (default: hostname) |
| MAC_VLLM_PORT port where vLLM is running (default: 8001) |
| MAC_NOTEBOOK_PORT port for Jupyter kernel gateway (optional) |
| MAC_TAGS comma-separated: llm,notebook,embedding |
| MAC_HEARTBEAT_SEC heartbeat interval seconds (default: 10) |
| """ |
|
|
| import os |
| import sys |
| import time |
| import socket |
| import asyncio |
| import logging |
| import subprocess |
| from typing import Optional |
|
|
| import httpx |
|
|
| log = logging.getLogger("mac-worker") |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s [%(levelname)s] %(message)s", |
| datefmt="%H:%M:%S", |
| ) |
|
|
| |
|
|
| MASTER_URL = os.environ.get("MAC_MASTER_URL", "http://localhost:8000").rstrip("/") |
| ENROLL_TOKEN = os.environ.get("MAC_ENROLL_TOKEN", "") |
| WORKER_NAME = os.environ.get("MAC_WORKER_NAME", socket.gethostname()) |
| VLLM_PORT = int(os.environ.get("MAC_VLLM_PORT", "8001")) |
| NOTEBOOK_PORT = int(os.environ.get("MAC_NOTEBOOK_PORT", "0")) or None |
| TAGS = os.environ.get("MAC_TAGS", "llm") |
| HEARTBEAT_SEC = int(os.environ.get("MAC_HEARTBEAT_SEC", "10")) |
|
|
| API = f"{MASTER_URL}/api/v1" |
|
|
|
|
| |
|
|
| def _local_ip() -> str: |
| """Best-guess local LAN IP.""" |
| try: |
| s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
| s.connect(("8.8.8.8", 80)) |
| ip = s.getsockname()[0] |
| s.close() |
| return ip |
| except OSError: |
| return "127.0.0.1" |
|
|
|
|
| def _gpu_metrics() -> dict: |
| """Read GPU metrics via pynvml (optional β degrades gracefully if not installed).""" |
| try: |
| import pynvml |
| pynvml.nvmlInit() |
| handle = pynvml.nvmlDeviceGetHandleByIndex(0) |
| util = pynvml.nvmlDeviceGetUtilizationRates(handle) |
| mem = pynvml.nvmlDeviceGetMemoryInfo(handle) |
| name = pynvml.nvmlDeviceGetName(handle) |
| if isinstance(name, bytes): |
| name = name.decode() |
| return { |
| "gpu_name": name, |
| "gpu_vram_mb": mem.total // (1024 * 1024), |
| "gpu_util_pct": float(util.gpu), |
| "gpu_vram_used_mb": mem.used // (1024 * 1024), |
| } |
| except Exception: |
| return {} |
|
|
|
|
| def _cpu_ram_metrics() -> dict: |
| """Read CPU / RAM metrics via psutil (optional).""" |
| try: |
| import psutil |
| cpu = psutil.cpu_percent(interval=None) |
| ram = psutil.virtual_memory() |
| cores = psutil.cpu_count(logical=False) or psutil.cpu_count() |
| return { |
| "cpu_util_pct": float(cpu), |
| "ram_total_mb": ram.total // (1024 * 1024), |
| "ram_used_mb": ram.used // (1024 * 1024), |
| "cpu_cores": cores, |
| } |
| except Exception: |
| return {} |
|
|
|
|
| def _active_models(vllm_port: int) -> list[str]: |
| """Query local vLLM /v1/models to find what's loaded.""" |
| try: |
| import httpx as _httpx |
| r = _httpx.get(f"http://localhost:{vllm_port}/v1/models", timeout=3) |
| if r.status_code == 200: |
| data = r.json().get("data", []) |
| return [m["id"] for m in data] |
| except Exception: |
| pass |
| return [] |
|
|
|
|
| |
|
|
| async def register(client: httpx.AsyncClient) -> tuple[str, str]: |
| """Register with master. Returns (node_id, node_token).""" |
| if not ENROLL_TOKEN: |
| log.error("MAC_ENROLL_TOKEN is not set. Get one from the admin panel.") |
| sys.exit(1) |
|
|
| gpu = _gpu_metrics() |
| sys_info = _cpu_ram_metrics() |
| ip = _local_ip() |
|
|
| payload = { |
| "enrollment_token": ENROLL_TOKEN, |
| "name": WORKER_NAME, |
| "hostname": socket.gethostname(), |
| "ip_address": ip, |
| "port": VLLM_PORT, |
| "notebook_port": NOTEBOOK_PORT, |
| "tags": TAGS, |
| **{k: gpu.get(k) for k in ("gpu_name", "gpu_vram_mb")}, |
| **{k: sys_info.get(k) for k in ("ram_total_mb", "cpu_cores")}, |
| } |
|
|
| log.info(f"Registering with master at {API} as '{WORKER_NAME}' ({ip}:{VLLM_PORT}) ...") |
| resp = await client.post(f"{API}/cluster/register", json=payload, timeout=15) |
|
|
| if resp.status_code == 401: |
| log.error("Invalid or expired enrollment token. Generate a new one from the admin panel.") |
| sys.exit(1) |
|
|
| resp.raise_for_status() |
| data = resp.json() |
| node_id = data["node_id"] |
| node_token = data.get("node_token") or _node_token_from_env() |
|
|
| log.info(f"Registered: node_id={node_id} status={data['status']}") |
| if data["status"] == "pending": |
| log.info("Waiting for admin to approve this node in the MAC admin panel ...") |
| return node_id, node_token |
|
|
|
|
| def _node_token_from_env() -> str: |
| """Fallback: read token from env (for re-registration after reboot).""" |
| t = os.environ.get("MAC_NODE_TOKEN", "") |
| if not t: |
| log.error( |
| "No node_token returned from master and MAC_NODE_TOKEN not set.\n" |
| "On first registration the token is the sha256 of your enrollment token.\n" |
| "Set MAC_NODE_TOKEN in your environment after the first run." |
| ) |
| sys.exit(1) |
| return t |
|
|
|
|
| |
|
|
| async def heartbeat_loop(client: httpx.AsyncClient, node_id: str, node_token: str): |
| """Send heartbeats every HEARTBEAT_SEC seconds.""" |
| consecutive_failures = 0 |
|
|
| while True: |
| try: |
| gpu = _gpu_metrics() |
| sys_info = _cpu_ram_metrics() |
| models = _active_models(VLLM_PORT) |
|
|
| payload = { |
| "node_id": node_id, |
| "node_token": node_token, |
| "active_models": models, |
| "queue_depth": 0, |
| **{k: gpu.get(k) for k in ("gpu_util_pct", "gpu_vram_used_mb")}, |
| **{k: sys_info.get(k) for k in ("cpu_util_pct", "ram_used_mb")}, |
| } |
|
|
| resp = await client.post(f"{API}/cluster/heartbeat", json=payload, timeout=10) |
|
|
| if resp.status_code == 403: |
| |
| await asyncio.sleep(HEARTBEAT_SEC * 3) |
| continue |
|
|
| if resp.status_code == 401: |
| log.error("Heartbeat auth failed β node_token may be wrong.") |
| sys.exit(1) |
|
|
| resp.raise_for_status() |
| data = resp.json() |
| if data.get("status") != "active": |
| log.warning(f"Node status: {data.get('status')}") |
|
|
| consecutive_failures = 0 |
|
|
| except httpx.RequestError as e: |
| consecutive_failures += 1 |
| log.warning(f"Heartbeat failed ({consecutive_failures}): {e}") |
| if consecutive_failures >= 12: |
| log.error("Master unreachable for 2 minutes. Check network.") |
|
|
| except Exception as e: |
| log.warning(f"Heartbeat error: {e}") |
|
|
| await asyncio.sleep(HEARTBEAT_SEC) |
|
|
|
|
| |
|
|
| async def main(): |
| log.info(f"MAC Worker Agent starting β master={MASTER_URL} name={WORKER_NAME}") |
| log.info(f"vLLM port={VLLM_PORT} notebook_port={NOTEBOOK_PORT} tags={TAGS}") |
|
|
| async with httpx.AsyncClient() as client: |
| node_id, node_token = await register(client) |
| log.info("Starting heartbeat loop ...") |
| await heartbeat_loop(client, node_id, node_token) |
|
|
|
|
| if __name__ == "__main__": |
| try: |
| asyncio.run(main()) |
| except KeyboardInterrupt: |
| log.info("Worker agent stopped.") |
|
|