| |
| """ |
| mac-tensor CLI β Distributed MoE inference across multiple Macs. |
| |
| The idea: you run this from YOUR Mac (or laptop). It SSHes into |
| your remote Macs, sets everything up, and starts inference. |
| |
| Workflow: |
| 1. mac-tensor init β Save your cluster config (IPs, credentials) |
| 2. mac-tensor deploy β Push code + download model on all nodes |
| 3. mac-tensor up β Start expert nodes on all remotes |
| 4. mac-tensor chat β Chat with the model |
| 5. mac-tensor down β Stop all nodes |
| 6. mac-tensor status β Check what's running |
| |
| Or all-in-one: |
| mac-tensor run --model qwen35 β deploy + up + chat in one command |
| """ |
|
|
| import argparse |
| import json |
| import os |
| import subprocess |
| import sys |
| import time |
| import shutil |
|
|
|
|
| |
| |
| |
|
|
| CONFIG_DIR = os.path.expanduser("~/.mac-tensor") |
| CONFIG_FILE = os.path.join(CONFIG_DIR, "cluster.json") |
| SCRIPT_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..") |
|
|
| SUPPORTED_MODELS = { |
| "qwen35": { |
| "name": "Qwen 3.5-35B-A3B", |
| "hf_id": "mlx-community/Qwen3.5-35B-A3B-4bit", |
| "num_experts": 256, |
| "num_layers": 40, |
| "node_script": "expert_node_fast.py", |
| "coordinator_script": "distributed_interactive.py", |
| "split_script": "split_qwen.py", |
| "needs_split": True, |
| "default_port": 8301, |
| "model_dir": "~/models/qwen35-4bit", |
| "stream_dir": "~/models/qwen35-stream", |
| "deploy_files": [ |
| "expert_node_fast.py", |
| "distributed_interactive.py", |
| "distributed_reader_fast.py", |
| "split_qwen.py", |
| ], |
| }, |
| "gemma4": { |
| "name": "Gemma 4-26B-A4B", |
| "hf_id": "mlx-community/gemma-4-26b-a4b-it-4bit", |
| "num_experts": 128, |
| "num_layers": 30, |
| "node_script": "gemma4_expert_node.py", |
| "coordinator_script": "gemma4_distributed.py", |
| "split_script": "split_gemma4.py", |
| "needs_split": False, |
| "default_port": 8401, |
| "model_dir": "~/models/gemma4-4bit", |
| "stream_dir": "~/models/gemma4-4bit", |
| "deploy_files": [ |
| "gemma4_expert_node.py", |
| "gemma4_distributed.py", |
| "distributed_reader_fast.py", |
| "models_gemma4.py", |
| ], |
| }, |
| } |
|
|
| |
| COORDINATOR_FILES = [ |
| "distributed_reader_fast.py", |
| "distributed_interactive.py", |
| "gemma4_distributed.py", |
| "models_gemma4.py", |
| ] |
|
|
|
|
| def get_model(name): |
| if name not in SUPPORTED_MODELS: |
| print(f"Error: Unknown model '{name}'") |
| print(f"Supported: {', '.join(SUPPORTED_MODELS.keys())}") |
| sys.exit(1) |
| return SUPPORTED_MODELS[name] |
|
|
|
|
| def load_config(): |
| if not os.path.exists(CONFIG_FILE): |
| return None |
| with open(CONFIG_FILE) as f: |
| return json.load(f) |
|
|
|
|
| def save_config(cfg): |
| os.makedirs(CONFIG_DIR, exist_ok=True) |
| with open(CONFIG_FILE, "w") as f: |
| json.dump(cfg, f, indent=2) |
|
|
|
|
| def ssh_cmd(host, user, password, cmd, timeout=300): |
| """Run a command on a remote Mac via SSH.""" |
| if password: |
| full = f"sshpass -p '{password}' ssh -o StrictHostKeyChecking=no {user}@{host} \"{cmd}\"" |
| else: |
| full = f"ssh -o StrictHostKeyChecking=no {user}@{host} \"{cmd}\"" |
| result = subprocess.run(full, shell=True, capture_output=True, text=True, timeout=timeout) |
| return result.stdout.strip(), result.stderr.strip(), result.returncode |
|
|
|
|
| def scp_file(host, user, password, local_path, remote_path): |
| """Copy a file to a remote Mac.""" |
| if password: |
| cmd = f"sshpass -p '{password}' scp -o StrictHostKeyChecking=no {local_path} {user}@{host}:{remote_path}" |
| else: |
| cmd = f"scp -o StrictHostKeyChecking=no {local_path} {user}@{host}:{remote_path}" |
| result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=120) |
| return result.returncode == 0 |
|
|
|
|
| def require_config(): |
| cfg = load_config() |
| if not cfg: |
| print("No cluster configured. Run 'mac-tensor init' first.") |
| sys.exit(1) |
| return cfg |
|
|
|
|
| def print_step(step, msg): |
| print(f"\n [{step}] {msg}") |
|
|
|
|
| |
| |
| |
|
|
|
|
| def cmd_init(args): |
| """Configure your cluster β save node IPs and credentials.""" |
| print("mac-tensor cluster setup\n") |
| print("You need 2-3 Macs: 1 coordinator (this Mac or a remote) + 1-2 expert nodes.") |
| print("The expert nodes hold the model weights. The coordinator runs the chat.\n") |
|
|
| |
| existing = load_config() |
| if existing: |
| print(f"Existing config found with {len(existing.get('nodes', []))} nodes.") |
| resp = input("Overwrite? [y/N] ").strip().lower() |
| if resp != "y": |
| print("Keeping existing config.") |
| return |
|
|
| nodes = [] |
| print("Add your expert nodes (the Macs that will hold model weights):") |
| print("Format: user@ip:password (or user@ip if using SSH keys)\n") |
|
|
| while True: |
| entry = input(f" Node {len(nodes) + 1} (or 'done'): ").strip() |
| if entry.lower() in ("done", "d", ""): |
| if len(nodes) == 0: |
| print(" Need at least 1 node.") |
| continue |
| break |
|
|
| |
| password = None |
| if ":" in entry and "@" in entry: |
| userhost, password = entry.rsplit(":", 1) |
| else: |
| userhost = entry |
|
|
| if "@" not in userhost: |
| print(" Format: user@ip or user@ip:password") |
| continue |
|
|
| user, host = userhost.split("@", 1) |
|
|
| |
| print(f" Testing SSH to {user}@{host}...", end="", flush=True) |
| out, err, rc = ssh_cmd(host, user, password, "echo ok", timeout=10) |
| if rc == 0 and "ok" in out: |
| print(" connected!") |
| nodes.append({"host": host, "user": user, "password": password}) |
| else: |
| print(f" FAILED: {err[:80]}") |
| print(" Check IP, username, and password. Skipping.") |
|
|
| |
| print("\nCoordinator (where you'll run 'mac-tensor chat'):") |
| print(" [1] This Mac (default)") |
| print(" [2] A remote Mac") |
| coord_choice = input(" Choice [1]: ").strip() |
|
|
| coordinator = None |
| if coord_choice == "2": |
| entry = input(" Coordinator user@ip:password: ").strip() |
| password = None |
| if ":" in entry and "@" in entry: |
| userhost, password = entry.rsplit(":", 1) |
| else: |
| userhost = entry |
| user, host = userhost.split("@", 1) |
| coordinator = {"host": host, "user": user, "password": password} |
|
|
| |
| print("\nModel to run:") |
| for i, (key, m) in enumerate(SUPPORTED_MODELS.items()): |
| print(f" [{i+1}] {key} β {m['name']} ({m['num_experts']} experts, {m['num_layers']} layers)") |
| model_choice = input(" Choice [1]: ").strip() |
| model_keys = list(SUPPORTED_MODELS.keys()) |
| model_idx = int(model_choice) - 1 if model_choice.isdigit() else 0 |
| model_key = model_keys[min(model_idx, len(model_keys) - 1)] |
|
|
| cfg = { |
| "model": model_key, |
| "nodes": nodes, |
| "coordinator": coordinator, |
| "created": time.strftime("%Y-%m-%d %H:%M"), |
| } |
|
|
| save_config(cfg) |
|
|
| model = SUPPORTED_MODELS[model_key] |
| print(f"\nCluster saved to {CONFIG_FILE}") |
| print(f" Model: {model['name']}") |
| print(f" Nodes: {len(nodes)} expert node(s)") |
| if coordinator: |
| print(f" Coordinator: {coordinator['user']}@{coordinator['host']}") |
| else: |
| print(f" Coordinator: this Mac") |
| print(f"\nNext: run 'mac-tensor deploy' to push code and download the model.") |
|
|
|
|
| def cmd_deploy(args): |
| """Deploy code + download model on all nodes.""" |
| cfg = require_config() |
| model_key = args.model or cfg.get("model", "qwen35") |
| model = get_model(model_key) |
| nodes = cfg["nodes"] |
| coordinator = cfg.get("coordinator") |
|
|
| print(f"Deploying {model['name']} to {len(nodes)} node(s)...") |
|
|
| all_targets = list(nodes) |
| if coordinator: |
| all_targets.append(coordinator) |
|
|
| for i, node in enumerate(all_targets): |
| role = "coordinator" if node == coordinator else f"node {i+1}" |
| host, user, pw = node["host"], node["user"], node.get("password") |
|
|
| print_step(f"{role}", f"Deploying to {user}@{host}") |
|
|
| |
| ssh_cmd(host, user, pw, "mkdir -p ~/expert-sniper-mlx") |
|
|
| |
| files_to_copy = model["deploy_files"] |
| if node == coordinator: |
| files_to_copy = list(set(files_to_copy + COORDINATOR_FILES)) |
|
|
| for fname in files_to_copy: |
| local = os.path.join(SCRIPT_DIR, fname) |
| if os.path.exists(local): |
| ok = scp_file(host, user, pw, local, f"~/expert-sniper-mlx/{fname}") |
| if ok: |
| print(f" Copied {fname}") |
| else: |
| print(f" FAILED to copy {fname}") |
|
|
| |
| check_dir = model["model_dir"] |
| out, _, rc = ssh_cmd(host, user, pw, f"ls {check_dir}/config.json 2>/dev/null && echo EXISTS") |
| if "EXISTS" in out: |
| print(f" Model already downloaded at {check_dir}") |
| else: |
| print(f" Downloading {model['hf_id']}... (this takes a few minutes)") |
| dl_cmd = ( |
| f"python3 -c \"" |
| f"from huggingface_hub import snapshot_download; " |
| f"snapshot_download('{model['hf_id']}', local_dir='{check_dir}')" |
| f"\"" |
| ) |
| out, err, rc = ssh_cmd(host, user, pw, dl_cmd, timeout=600) |
| if rc == 0: |
| print(f" Download complete!") |
| else: |
| print(f" Download failed: {err[:120]}") |
|
|
| |
| if model["needs_split"] and node != coordinator: |
| stream_dir = model["stream_dir"] |
| out, _, _ = ssh_cmd(host, user, pw, f"ls {stream_dir}/pinned.safetensors 2>/dev/null && echo EXISTS") |
| if "EXISTS" in out: |
| print(f" Model already split at {stream_dir}") |
| else: |
| print(f" Splitting model...") |
| split_cmd = ( |
| f"cd ~/expert-sniper-mlx && python3 {model['split_script']} " |
| f"--input {check_dir} --output {stream_dir}" |
| ) |
| out, err, rc = ssh_cmd(host, user, pw, split_cmd, timeout=600) |
| if rc == 0: |
| print(f" Split complete!") |
| else: |
| print(f" Split failed: {err[:120]}") |
|
|
| cfg["model"] = model_key |
| cfg["deployed"] = True |
| save_config(cfg) |
| print(f"\nDeploy complete! Run 'mac-tensor up' to start expert nodes.") |
|
|
|
|
| def cmd_up(args): |
| """Start expert nodes on all remote Macs.""" |
| cfg = require_config() |
| model_key = args.model or cfg.get("model", "qwen35") |
| model = get_model(model_key) |
| nodes = cfg["nodes"] |
| n_experts = model["num_experts"] |
| port = model["default_port"] |
|
|
| |
| per_node = n_experts // len(nodes) |
| node_urls = [] |
|
|
| for i, node in enumerate(nodes): |
| host, user, pw = node["host"], node["user"], node.get("password") |
|
|
| p_start = i * per_node |
| p_end = (i + 1) * per_node - 1 if i < len(nodes) - 1 else n_experts - 1 |
| partition = f"{p_start}-{p_end}" |
|
|
| model_dir = model["stream_dir"] |
|
|
| print_step(i + 1, f"Starting {user}@{host} β partition {partition}") |
|
|
| |
| ssh_cmd(host, user, pw, f"lsof -i :{port} -t 2>/dev/null | xargs kill -9 2>/dev/null") |
| time.sleep(1) |
|
|
| |
| start_cmd = ( |
| f"cd ~/expert-sniper-mlx && " |
| f"nohup python3 {model['node_script']} " |
| f"--partition {partition} --model-dir {model_dir} --port {port} " |
| f"> /tmp/mac-tensor-node.log 2>&1 &" |
| ) |
| ssh_cmd(host, user, pw, start_cmd) |
| node_urls.append(f"http://{host}:{port}") |
| print(f" Started on port {port}") |
|
|
| |
| cfg["node_urls"] = node_urls |
| cfg["model"] = model_key |
| save_config(cfg) |
|
|
| print(f"\nExpert nodes starting... they need ~90 seconds to load weights.") |
| print(f"Run 'mac-tensor status' to check, then 'mac-tensor chat' when ready.") |
|
|
|
|
| def cmd_down(args): |
| """Stop all expert nodes.""" |
| cfg = require_config() |
| model = get_model(cfg.get("model", "qwen35")) |
| port = model["default_port"] |
|
|
| for i, node in enumerate(cfg["nodes"]): |
| host, user, pw = node["host"], node["user"], node.get("password") |
| print(f" Stopping {user}@{host}...") |
| ssh_cmd(host, user, pw, f"lsof -i :{port} -t 2>/dev/null | xargs kill -9 2>/dev/null") |
|
|
| cfg.pop("node_urls", None) |
| save_config(cfg) |
| print("All nodes stopped.") |
|
|
|
|
| def cmd_status(args): |
| """Check cluster status β are nodes loaded and ready?""" |
| cfg = require_config() |
| node_urls = cfg.get("node_urls", []) |
|
|
| if not node_urls: |
| print("No nodes running. Run 'mac-tensor up' first.") |
| return |
|
|
| import urllib.request |
|
|
| print(f"Checking {len(node_urls)} node(s)...\n") |
|
|
| all_ok = True |
| total_experts = 0 |
| total_mem = 0 |
|
|
| for url in node_urls: |
| try: |
| resp = urllib.request.urlopen(f"{url}/health", timeout=5) |
| data = json.loads(resp.read()) |
| status = data.get("status", "?") |
| partition = data.get("partition", "?") |
| mem = data.get("memory_gb", 0) |
| experts = data.get("total_experts_loaded", data.get("total_layers_loaded", 0)) |
| reqs = data.get("compute_requests", 0) |
| avg_ms = data.get("avg_compute_ms", 0) |
| model_name = data.get("model", "") |
|
|
| total_mem += mem |
| total_experts += experts if isinstance(experts, int) else 0 |
|
|
| print(f" [OK] {url}") |
| print(f" Partition: {partition} | RAM: {mem} GB | Requests: {reqs}") |
| except Exception as e: |
| print(f" [WAIT] {url}") |
| print(f" Not ready yet ({e.__class__.__name__})") |
| all_ok = False |
|
|
| print() |
| if all_ok: |
| print(f"All nodes ready! Total: {total_mem:.1f} GB RAM across {len(node_urls)} nodes.") |
| model_key = cfg.get("model", "qwen35") |
| print(f"\nRun: mac-tensor chat") |
| else: |
| print("Some nodes still loading. Wait ~90 seconds and try again.") |
|
|
|
|
| def cmd_chat(args): |
| """Start interactive chat β connects to running expert nodes. |
| |
| Two modes: |
| 1. With --nodes: connect directly without needing a saved cluster |
| 2. Without --nodes: use the saved cluster config (init/up flow) |
| """ |
| cfg = load_config() or {} |
|
|
| |
| if args.nodes: |
| node_urls = args.nodes |
| model_key = args.model or "qwen35" |
| coordinator = None |
| else: |
| |
| if not cfg: |
| print("No cluster configured and no --nodes provided.") |
| print("Either run 'mac-tensor init' first, or pass --nodes directly:") |
| print(" mac-tensor chat --model gemma4 --nodes http://mac2:8401,http://mac3:8401") |
| sys.exit(1) |
| node_urls = ",".join(cfg.get("node_urls", [])) |
| if not node_urls: |
| print("No nodes running. Run 'mac-tensor up' first.") |
| sys.exit(1) |
| model_key = args.model or cfg.get("model", "qwen35") |
| coordinator = cfg.get("coordinator") |
|
|
| model = get_model(model_key) |
| script = model["coordinator_script"] |
| max_tokens = args.max_tokens or 300 |
| temperature = args.temperature or 0.7 |
|
|
| if coordinator: |
| |
| host, user, pw = coordinator["host"], coordinator["user"], coordinator.get("password") |
| print(f"Starting chat on coordinator {user}@{host}...") |
| cmd = ( |
| f"cd ~/expert-sniper-mlx && python3 {script} " |
| f"--nodes {node_urls} " |
| f"--max-tokens {max_tokens} --temperature {temperature}" |
| ) |
| if pw: |
| full = f"sshpass -p '{pw}' ssh -t -o StrictHostKeyChecking=no {user}@{host} \"{cmd}\"" |
| else: |
| full = f"ssh -t -o StrictHostKeyChecking=no {user}@{host} \"{cmd}\"" |
| os.system(full) |
| else: |
| |
| local_script = os.path.join(SCRIPT_DIR, script) |
| if not os.path.exists(local_script): |
| print(f"Error: Coordinator script not found: {local_script}") |
| sys.exit(1) |
|
|
| cmd = [ |
| "python3", local_script, |
| "--nodes", node_urls, |
| "--max-tokens", str(max_tokens), |
| "--temperature", str(temperature), |
| ] |
| if args.model_dir: |
| cmd.extend(["--model-dir", os.path.expanduser(args.model_dir)]) |
| os.execvp("python3", cmd) |
|
|
|
|
| def cmd_run(args): |
| """All-in-one: deploy + up + wait + chat.""" |
| cfg = require_config() |
| model_key = args.model or cfg.get("model", "qwen35") |
|
|
| |
| print("=== Step 1: Deploy ===") |
| args.model = model_key |
| cmd_deploy(args) |
|
|
| |
| print("\n=== Step 2: Start Nodes ===") |
| cmd_up(args) |
|
|
| |
| print("\n=== Step 3: Waiting for nodes to load ===") |
| cfg = load_config() |
| node_urls = cfg.get("node_urls", []) |
| import urllib.request |
|
|
| for attempt in range(30): |
| time.sleep(10) |
| ready = 0 |
| for url in node_urls: |
| try: |
| resp = urllib.request.urlopen(f"{url}/health", timeout=5) |
| data = json.loads(resp.read()) |
| if data.get("status") == "ok": |
| ready += 1 |
| except Exception: |
| pass |
|
|
| print(f" {ready}/{len(node_urls)} nodes ready... ({(attempt+1)*10}s)") |
| if ready == len(node_urls): |
| break |
| else: |
| print("Warning: Not all nodes loaded in time. Trying anyway...") |
|
|
| |
| print("\n=== Step 4: Chat ===") |
| args.nodes = None |
| args.max_tokens = args.max_tokens or 300 |
| args.temperature = args.temperature or 0.7 |
| args.model_dir = None |
| cmd_chat(args) |
|
|
|
|
| def cmd_info(args): |
| """Show supported models and current config.""" |
| print("mac-tensor β Distributed MoE Inference\n") |
|
|
| |
| cfg = load_config() |
| if cfg: |
| model_key = cfg.get("model", "?") |
| nodes = cfg.get("nodes", []) |
| node_urls = cfg.get("node_urls", []) |
| print(f" Current cluster:") |
| print(f" Model: {model_key}") |
| print(f" Nodes: {len(nodes)}") |
| for n in nodes: |
| print(f" {n['user']}@{n['host']}") |
| if node_urls: |
| print(f" Status: nodes running ({', '.join(node_urls)})") |
| else: |
| print(f" Status: not started") |
| print() |
|
|
| print(" Supported Models:\n") |
| for key, m in SUPPORTED_MODELS.items(): |
| print(f" {key:10s} {m['name']:30s} {m['num_experts']} experts, {m['num_layers']} layers") |
| print() |
|
|
| print(" Workflow:") |
| print(" mac-tensor init Configure cluster (IPs, credentials)") |
| print(" mac-tensor deploy Push code + download model on all nodes") |
| print(" mac-tensor up Start expert nodes") |
| print(" mac-tensor status Check if nodes are ready") |
| print(" mac-tensor chat Interactive chat") |
| print(" mac-tensor down Stop all nodes") |
| print(" mac-tensor run All-in-one (deploy + up + wait + chat)") |
|
|
|
|
| |
| |
| |
|
|
|
|
| def cmd_node_local(args): |
| """Start an expert node locally (run this on the Mac itself).""" |
| model = get_model(args.model) |
| port = args.port or model["default_port"] |
| partition = args.partition |
| model_dir = os.path.expanduser(args.model_dir or model["stream_dir"]) |
|
|
| if not partition: |
| half = model["num_experts"] // 2 |
| partition = f"0-{half - 1}" |
|
|
| script = os.path.join(SCRIPT_DIR, model["node_script"]) |
| if not os.path.exists(script): |
| print(f"Error: {script} not found") |
| sys.exit(1) |
|
|
| print(f"Starting {model['name']} expert node locally") |
| print(f" Partition: {partition} | Port: {port} | Model: {model_dir}") |
|
|
| mem_limit = args.memory_limit or 14.0 |
| os.execvp("python3", [ |
| "python3", script, |
| "--partition", partition, |
| "--model-dir", model_dir, |
| "--port", str(port), |
| "--memory-limit-gb", str(mem_limit), |
| ]) |
|
|
|
|
| def cmd_download_local(args): |
| """Download model locally.""" |
| model = get_model(args.model) |
| output = os.path.expanduser(args.output or model["model_dir"]) |
|
|
| print(f"Downloading {model['name']} to {output}...") |
| from huggingface_hub import snapshot_download |
| snapshot_download(model["hf_id"], local_dir=output) |
| print(f"Done!") |
|
|
|
|
| |
| |
| |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser( |
| prog="mac-tensor", |
| description="Distributed MoE inference across multiple Macs", |
| formatter_class=argparse.RawDescriptionHelpFormatter, |
| epilog=""" |
| Workflow (remote cluster): |
| mac-tensor init Save cluster config (IPs, creds) |
| mac-tensor deploy Push code + model to all nodes |
| mac-tensor up Start expert nodes |
| mac-tensor status Check if nodes are loaded |
| mac-tensor chat Start chatting |
| mac-tensor down Stop all nodes |
| |
| All-in-one: |
| mac-tensor run --model qwen35 Deploy + start + wait + chat |
| |
| Local commands (run on the Mac itself): |
| mac-tensor node --model qwen35 -p 0-127 Start a local expert node |
| mac-tensor download --model qwen35 Download model locally |
| """, |
| ) |
| sub = parser.add_subparsers(dest="command") |
|
|
| |
| sub.add_parser("init", help="Configure cluster (IPs, credentials)") |
|
|
| p_dep = sub.add_parser("deploy", help="Push code + download model on all nodes") |
| p_dep.add_argument("--model", choices=SUPPORTED_MODELS.keys()) |
|
|
| p_up = sub.add_parser("up", help="Start expert nodes on all remotes") |
| p_up.add_argument("--model", choices=SUPPORTED_MODELS.keys()) |
|
|
| sub.add_parser("down", help="Stop all expert nodes") |
| sub.add_parser("status", help="Check cluster status") |
|
|
| p_chat = sub.add_parser("chat", help="Interactive chat") |
| p_chat.add_argument("--model", choices=SUPPORTED_MODELS.keys()) |
| p_chat.add_argument("--nodes", help="Override node URLs (comma-separated)") |
| p_chat.add_argument("--max-tokens", type=int) |
| p_chat.add_argument("--temperature", type=float) |
| p_chat.add_argument("--model-dir", help="Override model directory") |
|
|
| p_run = sub.add_parser("run", help="All-in-one: deploy + up + wait + chat") |
| p_run.add_argument("--model", choices=SUPPORTED_MODELS.keys()) |
| p_run.add_argument("--max-tokens", type=int) |
| p_run.add_argument("--temperature", type=float) |
|
|
| sub.add_parser("info", help="Show models and cluster config") |
|
|
| |
| p_nd = sub.add_parser("node", help="Start a local expert node") |
| p_nd.add_argument("--model", required=True, choices=SUPPORTED_MODELS.keys()) |
| p_nd.add_argument("--partition", "-p") |
| p_nd.add_argument("--port", type=int) |
| p_nd.add_argument("--model-dir") |
| p_nd.add_argument("--memory-limit", type=float) |
|
|
| p_dl = sub.add_parser("download", help="Download model locally") |
| p_dl.add_argument("--model", required=True, choices=SUPPORTED_MODELS.keys()) |
| p_dl.add_argument("--output", "-o") |
|
|
| |
| p_hl = sub.add_parser("health", help="Check node health (standalone)") |
| p_hl.add_argument("--nodes", required=True, help="Comma-separated URLs") |
|
|
| |
| p_ag = sub.add_parser("agent", help="Interactive agentic REPL with tools") |
| p_ag.add_argument("--model", choices=SUPPORTED_MODELS.keys(), default="gemma4") |
| p_ag.add_argument("--nodes", required=True, help="Comma-separated expert node URLs") |
| p_ag.add_argument("--max-iterations", type=int, default=8, |
| help="Max tool-call iterations per question (default 8)") |
| p_ag.add_argument("--max-tokens", type=int, default=400, |
| help="Max tokens per model response (default 400)") |
| p_ag.add_argument("--write", action="store_true", |
| help="Allow file writes and destructive shell commands") |
|
|
| |
| p_ui = sub.add_parser("ui", help="Web chat UI for the distributed agent") |
| p_ui.add_argument("--model", choices=SUPPORTED_MODELS.keys(), default="gemma4") |
| p_ui.add_argument("--nodes", help="Comma-separated expert node URLs (omit if --vision)") |
| p_ui.add_argument("--host", default="0.0.0.0", help="Bind host (default 0.0.0.0)") |
| p_ui.add_argument("--port", type=int, default=8500, help="Port (default 8500)") |
| p_ui.add_argument("--write", action="store_true", |
| help="Allow file writes and destructive shell commands") |
| p_ui.add_argument("--vision", action="store_true", |
| help="Single-machine vision mode (Gemma 4 + image upload)") |
| p_ui.add_argument("--stream-dir", help="Vision: path to gemma4-stream split dir") |
| p_ui.add_argument("--source-dir", help="Vision: path to gemma4-26b-4bit source dir") |
| p_ui.add_argument("--falcon", action="store_true", |
| help="Vision: also load Falcon Perception for grounded segmentation") |
| p_ui.add_argument("--falcon-model", help="Vision: path to Falcon Perception model dir") |
| p_ui.add_argument("--falcon-only", action="store_true", |
| help="Falcon-only mode: load Falcon Perception, skip Gemma. " |
| "Used by the data labeling factory (~1.5 GB resident).") |
|
|
| |
| p_ld = sub.add_parser("leader", help="Run as swarm leader (peer registry + chat UI)") |
| p_ld.add_argument("--model", choices=SUPPORTED_MODELS.keys(), default="gemma4") |
| p_ld.add_argument("--host", default="0.0.0.0") |
| p_ld.add_argument("--port", type=int, default=8500) |
|
|
| |
| p_jn = sub.add_parser("join", help="Join a swarm as a worker peer") |
| p_jn.add_argument("leader", help="Leader URL, e.g. http://leader-ip:8500") |
| p_jn.add_argument("--model", choices=SUPPORTED_MODELS.keys(), default="gemma4") |
| p_jn.add_argument("--model-dir", help="Path to streaming model dir (default ~/models/<model>-stream)") |
| p_jn.add_argument("--port", type=int, default=9301, help="Port for the local expert node") |
|
|
| |
| p_pr = sub.add_parser("peers", help="List peers in a swarm") |
| p_pr.add_argument("leader", help="Leader URL, e.g. http://leader-ip:8500") |
|
|
| |
| p_bn = sub.add_parser("bench", help="Throughput benchmark β concurrent clients hitting a UI server") |
| p_bn.add_argument("--server", default="http://localhost:8500", help="UI server URL") |
| p_bn.add_argument("--concurrent", "-c", type=int, default=4, help="Concurrent clients") |
| p_bn.add_argument("--requests", "-r", type=int, default=3, help="Requests per client") |
| p_bn.add_argument("--max-tokens", type=int, default=80, help="Tokens per request") |
| p_bn.add_argument("--hourly-cost", type=float, default=0.40, |
| help="Cluster $/hr for cost calc (default 0.40)") |
|
|
| args = parser.parse_args() |
|
|
| if args.command is None: |
| parser.print_help() |
| sys.exit(0) |
|
|
| def cmd_agent(args): |
| from .agent import main as agent_main |
| agent_main(args) |
|
|
| def cmd_ui(args): |
| from .server import main as ui_main |
| ui_main(args) |
|
|
| def cmd_bench(args): |
| from .bench import main as bench_main |
| bench_main(args) |
|
|
| def cmd_leader(args): |
| |
| from .server import run_server |
| run_server( |
| model_key=args.model, |
| node_urls=None, |
| host=args.host, |
| port=args.port, |
| swarm_leader=True, |
| ) |
|
|
| def cmd_join(args): |
| from .join import main as join_main |
| join_main(args) |
|
|
| def cmd_peers(args): |
| import urllib.request |
| url = args.leader.rstrip("/") + "/swarm/peers" |
| try: |
| with urllib.request.urlopen(url, timeout=5) as r: |
| data = json.loads(r.read()) |
| except Exception as e: |
| print(f"Error fetching peers from {args.leader}: {e}") |
| sys.exit(1) |
|
|
| print(f"Swarm @ {args.leader}") |
| print(f" Model: {data['model']}") |
| print(f" Peers: {data['peer_count']}") |
| print(f" Version: {data['partition_version']}") |
| print() |
| if not data["peers"]: |
| print(" (no peers registered)") |
| return |
| for p in data["peers"]: |
| status = "OK" if p["alive"] else "DEAD" |
| print(f" [{status}] {p['id']} partition={p['partition']:>10s} " |
| f"ram={p['mem_gb']}GB uptime={p['uptime_s']:.0f}s " |
| f" hb={p['last_heartbeat_s_ago']:.0f}s {p['url']}") |
|
|
| commands = { |
| "init": cmd_init, |
| "deploy": cmd_deploy, |
| "up": cmd_up, |
| "down": cmd_down, |
| "status": cmd_status, |
| "chat": cmd_chat, |
| "run": cmd_run, |
| "info": cmd_info, |
| "node": cmd_node_local, |
| "download": cmd_download_local, |
| "health": lambda a: cmd_health_standalone(a), |
| "agent": cmd_agent, |
| "ui": cmd_ui, |
| "bench": cmd_bench, |
| "leader": cmd_leader, |
| "join": cmd_join, |
| "peers": cmd_peers, |
| } |
|
|
| commands[args.command](args) |
|
|
|
|
| def cmd_health_standalone(args): |
| """Standalone health check β no config needed.""" |
| import urllib.request |
| nodes = [n.strip() for n in args.nodes.split(",")] |
| print(f"Checking {len(nodes)} node(s)...\n") |
| for url in nodes: |
| try: |
| resp = urllib.request.urlopen(f"{url}/health", timeout=5) |
| data = json.loads(resp.read()) |
| partition = data.get("partition", "?") |
| mem = data.get("memory_gb", "?") |
| reqs = data.get("compute_requests", 0) |
| print(f" [OK] {url} β partition {partition}, {mem} GB, {reqs} reqs") |
| except Exception as e: |
| print(f" [FAIL] {url} β {e}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|