| |
| """ |
| mac-tensor ui — Web chat UI for the distributed agent. |
| |
| Serves a single-page HTML chat interface and a Server-Sent Events |
| endpoint that streams agent events (steps, tool calls, results, final answer). |
| |
| Usage: |
| mac-tensor ui --model gemma4 --nodes http://mac2:8401,http://mac3:8401 |
| # Then open http://localhost:8500 in your browser |
| """ |
|
|
| import json |
| import os |
| import sys |
| import time |
| import threading |
| from queue import Queue, Empty |
|
|
|
|
| def run_server(model_key, node_urls=None, host="0.0.0.0", port=8500, allow_write=False, |
| vision=False, stream_dir=None, source_dir=None, |
| falcon=False, falcon_model=None, |
| swarm_leader=False, |
| turbo_url=None): |
| """Start the FastAPI server with the agent backend pre-loaded. |
| |
| Modes: |
| - Distributed text-only: pass node_urls |
| - Single-machine vision (Gemma 4 only): vision=True |
| - Vision + Falcon Perception (segmentation): vision=True, falcon=True |
| - Swarm leader: swarm_leader=True (peer registry + dynamic coordinator) |
| """ |
| from fastapi import FastAPI, Request, UploadFile, File, Form |
| from fastapi.responses import HTMLResponse, StreamingResponse, JSONResponse |
| from .agent import AgentBackend, run_agent_turn_stream |
|
|
| vision_engine = None |
| falcon_tools = None |
| swarm_registry = None |
|
|
| if swarm_leader: |
| from .swarm import SwarmRegistry, reaper_loop |
| swarm_registry = SwarmRegistry(model_key=model_key) |
| threading.Thread(target=reaper_loop, args=(swarm_registry,), daemon=True).start() |
| print(f"Swarm registry started (model={model_key})") |
|
|
| if swarm_leader: |
| |
| |
| backend = None |
| print("Running as swarm leader. Workers join via mac-tensor join.") |
| elif vision: |
| print(f"Loading vision Gemma 4 sniper (single-machine)...") |
| from .vision_engine import VisionGemma4Sniper |
| vision_engine = VisionGemma4Sniper( |
| stream_dir=stream_dir or "~/models/gemma4-stream", |
| source_dir=source_dir or "~/models/gemma4-26b-4bit", |
| ) |
| vision_engine.load() |
| print("Vision engine ready.") |
|
|
| if falcon: |
| print(f"Loading Falcon Perception...") |
| from .falcon_perception import FalconPerceptionTools |
| falcon_tools = FalconPerceptionTools.load( |
| model_path=falcon_model or "/Users/bigneek/models/falcon-perception" |
| ) |
| print("Falcon Perception ready.") |
|
|
| backend = None |
| elif falcon: |
| |
| |
| print("Loading Falcon Perception (standalone, no Gemma)...") |
| from .falcon_perception import FalconPerceptionTools |
| falcon_tools = FalconPerceptionTools.load( |
| model_path=falcon_model or "/Users/bigneek/models/falcon-perception" |
| ) |
| print("Falcon Perception ready. (~1.5 GB resident, no Gemma loaded)") |
| backend = None |
| else: |
| print(f"Loading {model_key} distributed engine...") |
| backend = AgentBackend(model_key=model_key, node_urls=node_urls) |
| backend.load() |
| print(f"Backend ready. Connected to {len(node_urls)} expert nodes.") |
|
|
| app = FastAPI(title="mac-tensor agent UI") |
|
|
| |
| static_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "static") |
| html_path = os.path.join(static_dir, "chat.html") |
| with open(html_path) as f: |
| chat_html = f.read() |
|
|
| |
| if vision: |
| model_label = "Gemma 4-26B-A4B (Vision)" |
| node_count_label = "single Mac · vision enabled" |
| elif falcon_tools is not None and vision_engine is None: |
| model_label = "Falcon Perception (labeling factory)" |
| node_count_label = "single Mac · Falcon-only" |
| elif swarm_leader: |
| model_label = {"gemma4": "Gemma 4-26B-A4B", |
| "qwen35": "Qwen 3.5-35B-A3B"}.get(model_key, model_key) |
| node_count_label = "swarm leader · waiting for peers" |
| else: |
| model_label = {"gemma4": "Gemma 4-26B-A4B", |
| "qwen35": "Qwen 3.5-35B-A3B"}.get(model_key, model_key) |
| node_count_label = f"{len(node_urls)} expert nodes" |
|
|
| chat_html = chat_html.replace("{{MODEL_NAME}}", model_label) \ |
| .replace("{{NODE_COUNT}}", node_count_label) \ |
| .replace("{{VISION_ENABLED}}", "true" if vision else "false") \ |
| .replace("{{FALCON_ENABLED}}", "true" if falcon_tools is not None else "false") |
|
|
| |
| lock = threading.Lock() |
|
|
| @app.get("/") |
| async def index(): |
| return HTMLResponse(chat_html) |
|
|
| @app.get("/api/info") |
| async def info(): |
| return { |
| "model": model_key, |
| "nodes": node_urls, |
| "allow_write": allow_write, |
| "vision": vision, |
| "falcon": falcon_tools is not None, |
| "swarm_leader": swarm_registry is not None, |
| } |
|
|
| |
| |
| |
|
|
| if swarm_registry is not None: |
|
|
| @app.post("/swarm/register") |
| async def swarm_register(request: Request): |
| body = await request.json() |
| url = body.get("url") |
| mem_gb = body.get("mem_gb", 0) |
| meta = body.get("meta", {}) |
| if not url: |
| return JSONResponse({"error": "url required"}, status_code=400) |
| peer_id, partition = swarm_registry.register(url, mem_gb, meta) |
| print(f"[swarm] +peer {peer_id} at {url} → partition {partition}") |
| return { |
| "peer_id": peer_id, |
| "partition": partition, |
| "model": model_key, |
| "partition_version": swarm_registry.partition_version, |
| } |
|
|
| @app.post("/swarm/heartbeat") |
| async def swarm_heartbeat(request: Request): |
| body = await request.json() |
| peer_id = body.get("peer_id") |
| if not peer_id: |
| return JSONResponse({"error": "peer_id required"}, status_code=400) |
| ok, version = swarm_registry.heartbeat(peer_id) |
| if not ok: |
| return JSONResponse({"error": "unknown peer"}, status_code=404) |
|
|
| |
| current_partition = None |
| with swarm_registry.lock: |
| if peer_id in swarm_registry.peers: |
| current_partition = swarm_registry.peers[peer_id]["partition"] |
|
|
| return { |
| "ok": True, |
| "partition_version": version, |
| "partition": current_partition, |
| } |
|
|
| @app.post("/swarm/leave") |
| async def swarm_leave(request: Request): |
| body = await request.json() |
| peer_id = body.get("peer_id") |
| if not peer_id: |
| return JSONResponse({"error": "peer_id required"}, status_code=400) |
| swarm_registry.leave(peer_id) |
| print(f"[swarm] -peer {peer_id} (graceful leave)") |
| return {"ok": True} |
|
|
| @app.get("/swarm/peers") |
| async def swarm_peers(): |
| return swarm_registry.status() |
|
|
| @app.post("/api/reset") |
| async def reset(): |
| with lock: |
| if vision_engine: |
| vision_engine.sniper.reset_cache() |
| elif backend: |
| backend.reset() |
| return {"ok": True} |
|
|
| |
| |
| leader_backend_state = {"backend": None, "version": -1} |
|
|
| def get_swarm_backend(): |
| """Lazy backend that uses live peers from the swarm registry. |
| |
| Rebuilds when partition_version changes. Returns None if no live peers. |
| """ |
| if swarm_registry is None: |
| return backend |
|
|
| live_peers = [p for p in swarm_registry.get_live_peers() if p.get("alive")] |
| if not live_peers: |
| return None |
|
|
| |
| current_version = swarm_registry.partition_version |
| if leader_backend_state["version"] != current_version or leader_backend_state["backend"] is None: |
| print(f"[swarm] building backend for {len(live_peers)} peers (v{current_version})") |
| from .agent import AgentBackend |
| node_urls = [p["url"] for p in live_peers] |
| try: |
| bk = AgentBackend(model_key=model_key, node_urls=node_urls) |
| bk.load() |
| leader_backend_state["backend"] = bk |
| leader_backend_state["version"] = current_version |
| except Exception as e: |
| print(f"[swarm] backend load failed: {e}") |
| return None |
|
|
| return leader_backend_state["backend"] |
|
|
| @app.post("/api/chat") |
| async def chat(request: Request): |
| |
| active = get_swarm_backend() if swarm_registry is not None else backend |
|
|
| if active is None: |
| return JSONResponse({ |
| "error": "no live peers in swarm yet — run `mac-tensor join " |
| f"http://{_local_ip()}:{port}` on a worker Mac first" |
| }, status_code=503) |
|
|
| body = await request.json() |
| message = body.get("message", "").strip() |
| if not message: |
| return JSONResponse({"error": "empty message"}, status_code=400) |
|
|
| max_iterations = int(body.get("max_iterations", 5)) |
| max_tokens = int(body.get("max_tokens", 300)) |
|
|
| def event_stream(): |
| with lock: |
| try: |
| for event in run_agent_turn_stream( |
| active, message, |
| max_iterations=max_iterations, |
| max_tokens=max_tokens, |
| allow_write=allow_write, |
| ): |
| yield f"data: {json.dumps(event)}\n\n" |
| except Exception as e: |
| yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n" |
|
|
| return StreamingResponse( |
| event_stream(), |
| media_type="text/event-stream", |
| headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no", |
| "Connection": "keep-alive"}, |
| ) |
|
|
| |
| vision_agent = None |
| if vision_engine is not None and falcon_tools is not None: |
| from .agent import VisionAgentBackend |
| vision_agent = VisionAgentBackend( |
| vision_engine=vision_engine, falcon_tools=falcon_tools |
| ) |
| print("Vision agent ready (Gemma 4 + Falcon Perception chained).") |
|
|
| @app.post("/api/chat_vision") |
| async def chat_vision( |
| message: str = Form(...), |
| max_tokens: int = Form(300), |
| image: UploadFile = File(None), |
| ): |
| """Vision chat endpoint — accepts an optional image upload. |
| |
| If Falcon Perception is loaded, uses the vision agent loop with |
| tool calling. Otherwise falls back to plain Gemma 4 vision. |
| """ |
| if vision_engine is None: |
| return JSONResponse({"error": "vision mode not enabled"}, status_code=400) |
|
|
| image_path = None |
| if image is not None and image.filename: |
| import tempfile |
| tmp = tempfile.NamedTemporaryFile(suffix="_" + image.filename, delete=False) |
| tmp.write(await image.read()) |
| tmp.close() |
| image_path = tmp.name |
|
|
| def event_stream(): |
| with lock: |
| try: |
| if vision_agent is not None and image_path: |
| |
| from .agent import run_vision_agent_turn_stream |
| for event in run_vision_agent_turn_stream( |
| vision_agent, message, image_path, |
| max_iterations=4, |
| max_tokens=max_tokens, |
| ): |
| yield f"data: {json.dumps(event)}\n\n" |
| else: |
| |
| yield f"data: {json.dumps({'type': 'step_start', 'step': 1, 'max': 1})}\n\n" |
| chunks = [] |
| def on_chunk(text): |
| chunks.append(text) |
| output = vision_engine.generate( |
| message, image_path=image_path, |
| max_tokens=max_tokens, temperature=0.6, |
| on_chunk=on_chunk, |
| ) |
| for chunk in chunks: |
| yield f"data: {json.dumps({'type': 'token', 'text': chunk})}\n\n" |
| yield f"data: {json.dumps({'type': 'final', 'text': output.strip()})}\n\n" |
| yield f"data: {json.dumps({'type': 'done'})}\n\n" |
| except Exception as e: |
| import traceback |
| traceback.print_exc() |
| yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n" |
| finally: |
| if image_path and os.path.exists(image_path): |
| try: |
| os.unlink(image_path) |
| except Exception: |
| pass |
|
|
| return StreamingResponse( |
| event_stream(), |
| media_type="text/event-stream", |
| headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no", |
| "Connection": "keep-alive"}, |
| ) |
|
|
| @app.post("/api/turbo_chat") |
| async def turbo_chat( |
| message: str = Form(...), |
| max_tokens: int = Form(300), |
| image: UploadFile = File(None), |
| ): |
| """Turbo mode: Gemma 4 encodes image once, then a fast small LLM |
| (Qwen3-1.7B 4-bit via mlx-lm) handles the reasoning loop. |
| |
| Streams SSE events compatible with the existing chat UI. |
| """ |
| if vision_engine is None: |
| return JSONResponse({"error": "vision mode not enabled"}, status_code=400) |
| if not turbo_url: |
| return JSONResponse( |
| {"error": "turbo brain not configured (start server with --turbo-url)"}, |
| status_code=400, |
| ) |
|
|
| image_path = None |
| if image is not None and image.filename: |
| import tempfile |
| tmp = tempfile.NamedTemporaryFile(suffix="_" + image.filename, delete=False) |
| tmp.write(await image.read()) |
| tmp.close() |
| image_path = tmp.name |
|
|
| def event_stream(): |
| with lock: |
| try: |
| |
| yield f"data: {json.dumps({'type': 'step_start', 'step': 1, 'max': 2})}\n\n" |
| yield f"data: {json.dumps({'type': 'token', 'text': '🔍 Encoding image with Gemma 4 vision...\\n'})}\n\n" |
|
|
| description = "" |
| if image_path: |
| try: |
| description = vision_engine.generate( |
| "Describe this image briefly: what's in it, where is it, what colors and notable details. Be concise (3-4 sentences max).", |
| image_path=image_path, |
| max_tokens=150, |
| temperature=0.5, |
| ).strip() |
| yield f"data: {json.dumps({'type': 'tool_call', 'tool': 'vision_describe', 'args': 'image'})}\n\n" |
| yield f"data: {json.dumps({'type': 'tool_result', 'result': description})}\n\n" |
| except Exception as e: |
| yield f"data: {json.dumps({'type': 'error', 'message': f'vision encode failed: {e}'})}\n\n" |
| return |
|
|
| |
| yield f"data: {json.dumps({'type': 'step_start', 'step': 2, 'max': 2})}\n\n" |
|
|
| |
| if description: |
| system_msg = ( |
| "You are a fast assistant that answers questions about images. " |
| "A vision model has already described the image for you. " |
| "Give a SHORT, direct answer (1-3 sentences). Do not show your reasoning." |
| ) |
| user_msg = ( |
| f"Image description: {description}\n\n" |
| f"User's question: {message}" |
| ) |
| else: |
| system_msg = "You are a fast helpful assistant. Give SHORT, direct answers." |
| user_msg = message |
|
|
| |
| import urllib.request |
| payload = { |
| "model": "qwen3-1.7b", |
| "messages": [ |
| {"role": "system", "content": system_msg}, |
| {"role": "user", "content": user_msg}, |
| ], |
| "max_tokens": max_tokens, |
| "temperature": 0.5, |
| "stream": True, |
| } |
| req = urllib.request.Request( |
| f"{turbo_url}/v1/chat/completions", |
| data=json.dumps(payload).encode(), |
| headers={"Content-Type": "application/json"}, |
| ) |
| final_text = "" |
| in_think = False |
| try: |
| with urllib.request.urlopen(req, timeout=120) as resp: |
| buffer = "" |
| for chunk in iter(lambda: resp.read(512), b""): |
| buffer += chunk.decode("utf-8", errors="ignore") |
| while "\n" in buffer: |
| line, buffer = buffer.split("\n", 1) |
| line = line.strip() |
| if not line.startswith("data: "): |
| continue |
| data = line[6:] |
| if data == "[DONE]": |
| break |
| try: |
| ev = json.loads(data) |
| delta = ev.get("choices", [{}])[0].get("delta", {}) |
| text = delta.get("content", "") |
| if not text: |
| continue |
| |
| for ch in text: |
| if "<think>" in (final_text + ch)[-7:]: |
| in_think = True |
| continue |
| if "</think>" in (final_text + ch)[-8:]: |
| in_think = False |
| final_text = "" |
| continue |
| if not in_think: |
| final_text += ch |
| yield f"data: {json.dumps({'type': 'token', 'text': ch})}\n\n" |
| except Exception: |
| continue |
| except Exception as e: |
| yield f"data: {json.dumps({'type': 'error', 'message': f'turbo brain failed: {e}'})}\n\n" |
| return |
|
|
| yield f"data: {json.dumps({'type': 'final', 'text': final_text.strip()})}\n\n" |
| yield f"data: {json.dumps({'type': 'done'})}\n\n" |
| except Exception as e: |
| import traceback |
| traceback.print_exc() |
| yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n" |
| finally: |
| if image_path and os.path.exists(image_path): |
| try: |
| os.unlink(image_path) |
| except Exception: |
| pass |
|
|
| return StreamingResponse( |
| event_stream(), |
| media_type="text/event-stream", |
| headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no", |
| "Connection": "keep-alive"}, |
| ) |
|
|
| @app.post("/api/falcon") |
| async def falcon_ground( |
| query: str = Form(...), |
| image: UploadFile = File(...), |
| ): |
| """Run Falcon Perception on uploaded image with text query. |
| |
| Returns JSON with detected masks (count, IDs, metadata) plus a |
| base64-encoded annotated image showing bounding boxes + labels. |
| """ |
| if falcon_tools is None: |
| return JSONResponse({"error": "Falcon Perception not loaded"}, status_code=400) |
|
|
| |
| import tempfile, base64, io |
| tmp = tempfile.NamedTemporaryFile(suffix="_" + image.filename, delete=False) |
| tmp.write(await image.read()) |
| tmp.close() |
|
|
| try: |
| with lock: |
| |
| falcon_tools.set_image(tmp.name) |
| |
| t0 = time.time() |
| result = falcon_tools.ground(query, slot=query.replace(" ", "_")[:32]) |
| elapsed = time.time() - t0 |
|
|
| if "error" in result: |
| return JSONResponse(result, status_code=500) |
|
|
| |
| annotated = falcon_tools.annotate_image(mask_ids=result["mask_ids"]) |
|
|
| |
| buf = io.BytesIO() |
| annotated.save(buf, format="PNG") |
| annotated_b64 = base64.b64encode(buf.getvalue()).decode() |
|
|
| return JSONResponse({ |
| "query": query, |
| "count": result["count"], |
| "mask_ids": result["mask_ids"], |
| "masks": result["masks"], |
| "annotated_image": f"data:image/png;base64,{annotated_b64}", |
| "elapsed_seconds": round(elapsed, 2), |
| }) |
| except Exception as e: |
| import traceback |
| traceback.print_exc() |
| return JSONResponse({"error": str(e)}, status_code=500) |
| finally: |
| try: |
| os.unlink(tmp.name) |
| except Exception: |
| pass |
|
|
| print() |
| print("=" * 60) |
| print(f" mac-tensor UI ready") |
| print(f" Open: http://localhost:{port}") |
| print(f" http://{_local_ip()}:{port} (LAN access)") |
| print("=" * 60) |
| print() |
|
|
| import uvicorn |
| uvicorn.run(app, host=host, port=port, log_level="warning") |
|
|
|
|
| def _local_ip(): |
| """Best-effort detection of the LAN IP.""" |
| import socket |
| try: |
| s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
| s.connect(("8.8.8.8", 80)) |
| ip = s.getsockname()[0] |
| s.close() |
| return ip |
| except Exception: |
| return "localhost" |
|
|
|
|
| def main(args): |
| vision = getattr(args, "vision", False) |
| falcon_only = getattr(args, "falcon_only", False) |
|
|
| if falcon_only: |
| |
| |
| run_server( |
| model_key="falcon", |
| node_urls=None, |
| host=args.host or "0.0.0.0", |
| port=args.port or 8500, |
| allow_write=False, |
| vision=False, |
| falcon=True, |
| falcon_model=getattr(args, "falcon_model", None), |
| ) |
| return |
|
|
| if vision: |
| |
| run_server( |
| model_key="gemma4", |
| node_urls=None, |
| host=args.host or "0.0.0.0", |
| port=args.port or 8500, |
| allow_write=getattr(args, "write", False), |
| vision=True, |
| stream_dir=getattr(args, "stream_dir", None), |
| source_dir=getattr(args, "source_dir", None), |
| falcon=getattr(args, "falcon", False), |
| falcon_model=getattr(args, "falcon_model", None), |
| ) |
| else: |
| if not args.nodes: |
| print("Error: --nodes is required (or pass --vision for single-machine mode)") |
| sys.exit(1) |
| node_urls = [u.strip() for u in args.nodes.split(",")] |
| run_server( |
| model_key=args.model or "gemma4", |
| node_urls=node_urls, |
| host=args.host or "0.0.0.0", |
| port=args.port or 8500, |
| allow_write=args.write, |
| ) |
|
|