waltgrace's picture
initial release: deploy code + split scripts
0e41b61 verified
#!/usr/bin/env python3
"""
mac-tensor ui — Web chat UI for the distributed agent.
Serves a single-page HTML chat interface and a Server-Sent Events
endpoint that streams agent events (steps, tool calls, results, final answer).
Usage:
mac-tensor ui --model gemma4 --nodes http://mac2:8401,http://mac3:8401
# Then open http://localhost:8500 in your browser
"""
import json
import os
import sys
import time
import threading
from queue import Queue, Empty
def run_server(model_key, node_urls=None, host="0.0.0.0", port=8500, allow_write=False,
vision=False, stream_dir=None, source_dir=None,
falcon=False, falcon_model=None,
swarm_leader=False,
turbo_url=None):
"""Start the FastAPI server with the agent backend pre-loaded.
Modes:
- Distributed text-only: pass node_urls
- Single-machine vision (Gemma 4 only): vision=True
- Vision + Falcon Perception (segmentation): vision=True, falcon=True
- Swarm leader: swarm_leader=True (peer registry + dynamic coordinator)
"""
from fastapi import FastAPI, Request, UploadFile, File, Form
from fastapi.responses import HTMLResponse, StreamingResponse, JSONResponse
from .agent import AgentBackend, run_agent_turn_stream
vision_engine = None
falcon_tools = None
swarm_registry = None
if swarm_leader:
from .swarm import SwarmRegistry, reaper_loop
swarm_registry = SwarmRegistry(model_key=model_key)
threading.Thread(target=reaper_loop, args=(swarm_registry,), daemon=True).start()
print(f"Swarm registry started (model={model_key})")
if swarm_leader:
# Leader mode: no LLM backend loaded yet — peers register dynamically.
# The leader can ALSO load a backend on demand (later) when peers exist.
backend = None
print("Running as swarm leader. Workers join via mac-tensor join.")
elif vision:
print(f"Loading vision Gemma 4 sniper (single-machine)...")
from .vision_engine import VisionGemma4Sniper
vision_engine = VisionGemma4Sniper(
stream_dir=stream_dir or "~/models/gemma4-stream",
source_dir=source_dir or "~/models/gemma4-26b-4bit",
)
vision_engine.load()
print("Vision engine ready.")
if falcon:
print(f"Loading Falcon Perception...")
from .falcon_perception import FalconPerceptionTools
falcon_tools = FalconPerceptionTools.load(
model_path=falcon_model or "/Users/bigneek/models/falcon-perception"
)
print("Falcon Perception ready.")
backend = None # Not used in vision mode
elif falcon:
# Falcon-only mode: no Gemma, no distributed nodes. Used by the
# data labeling factory where we only need /api/falcon.
print("Loading Falcon Perception (standalone, no Gemma)...")
from .falcon_perception import FalconPerceptionTools
falcon_tools = FalconPerceptionTools.load(
model_path=falcon_model or "/Users/bigneek/models/falcon-perception"
)
print("Falcon Perception ready. (~1.5 GB resident, no Gemma loaded)")
backend = None
else:
print(f"Loading {model_key} distributed engine...")
backend = AgentBackend(model_key=model_key, node_urls=node_urls)
backend.load()
print(f"Backend ready. Connected to {len(node_urls)} expert nodes.")
app = FastAPI(title="mac-tensor agent UI")
# Read the static HTML file shipped alongside this server
static_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "static")
html_path = os.path.join(static_dir, "chat.html")
with open(html_path) as f:
chat_html = f.read()
# Inject backend info into the HTML so the UI can show it
if vision:
model_label = "Gemma 4-26B-A4B (Vision)"
node_count_label = "single Mac · vision enabled"
elif falcon_tools is not None and vision_engine is None:
model_label = "Falcon Perception (labeling factory)"
node_count_label = "single Mac · Falcon-only"
elif swarm_leader:
model_label = {"gemma4": "Gemma 4-26B-A4B",
"qwen35": "Qwen 3.5-35B-A3B"}.get(model_key, model_key)
node_count_label = "swarm leader · waiting for peers"
else:
model_label = {"gemma4": "Gemma 4-26B-A4B",
"qwen35": "Qwen 3.5-35B-A3B"}.get(model_key, model_key)
node_count_label = f"{len(node_urls)} expert nodes"
chat_html = chat_html.replace("{{MODEL_NAME}}", model_label) \
.replace("{{NODE_COUNT}}", node_count_label) \
.replace("{{VISION_ENABLED}}", "true" if vision else "false") \
.replace("{{FALCON_ENABLED}}", "true" if falcon_tools is not None else "false")
# Lock so only one chat request runs at a time (single MoE engine)
lock = threading.Lock()
@app.get("/")
async def index():
return HTMLResponse(chat_html)
@app.get("/api/info")
async def info():
return {
"model": model_key,
"nodes": node_urls,
"allow_write": allow_write,
"vision": vision,
"falcon": falcon_tools is not None,
"swarm_leader": swarm_registry is not None,
}
# ============================================================
# Swarm endpoints (only when running as leader)
# ============================================================
if swarm_registry is not None:
@app.post("/swarm/register")
async def swarm_register(request: Request):
body = await request.json()
url = body.get("url")
mem_gb = body.get("mem_gb", 0)
meta = body.get("meta", {})
if not url:
return JSONResponse({"error": "url required"}, status_code=400)
peer_id, partition = swarm_registry.register(url, mem_gb, meta)
print(f"[swarm] +peer {peer_id} at {url} → partition {partition}")
return {
"peer_id": peer_id,
"partition": partition,
"model": model_key,
"partition_version": swarm_registry.partition_version,
}
@app.post("/swarm/heartbeat")
async def swarm_heartbeat(request: Request):
body = await request.json()
peer_id = body.get("peer_id")
if not peer_id:
return JSONResponse({"error": "peer_id required"}, status_code=400)
ok, version = swarm_registry.heartbeat(peer_id)
if not ok:
return JSONResponse({"error": "unknown peer"}, status_code=404)
# Tell the peer if its partition has been reassigned
current_partition = None
with swarm_registry.lock:
if peer_id in swarm_registry.peers:
current_partition = swarm_registry.peers[peer_id]["partition"]
return {
"ok": True,
"partition_version": version,
"partition": current_partition,
}
@app.post("/swarm/leave")
async def swarm_leave(request: Request):
body = await request.json()
peer_id = body.get("peer_id")
if not peer_id:
return JSONResponse({"error": "peer_id required"}, status_code=400)
swarm_registry.leave(peer_id)
print(f"[swarm] -peer {peer_id} (graceful leave)")
return {"ok": True}
@app.get("/swarm/peers")
async def swarm_peers():
return swarm_registry.status()
@app.post("/api/reset")
async def reset():
with lock:
if vision_engine:
vision_engine.sniper.reset_cache()
elif backend:
backend.reset()
return {"ok": True}
# When running as leader, lazily build a backend from the swarm registry.
# We cache it and rebuild when the partition_version changes.
leader_backend_state = {"backend": None, "version": -1}
def get_swarm_backend():
"""Lazy backend that uses live peers from the swarm registry.
Rebuilds when partition_version changes. Returns None if no live peers.
"""
if swarm_registry is None:
return backend # static mode
live_peers = [p for p in swarm_registry.get_live_peers() if p.get("alive")]
if not live_peers:
return None
# Check if registry version changed → rebuild backend
current_version = swarm_registry.partition_version
if leader_backend_state["version"] != current_version or leader_backend_state["backend"] is None:
print(f"[swarm] building backend for {len(live_peers)} peers (v{current_version})")
from .agent import AgentBackend
node_urls = [p["url"] for p in live_peers]
try:
bk = AgentBackend(model_key=model_key, node_urls=node_urls)
bk.load()
leader_backend_state["backend"] = bk
leader_backend_state["version"] = current_version
except Exception as e:
print(f"[swarm] backend load failed: {e}")
return None
return leader_backend_state["backend"]
@app.post("/api/chat")
async def chat(request: Request):
# Use the static backend if not in swarm mode, else build from registry
active = get_swarm_backend() if swarm_registry is not None else backend
if active is None:
return JSONResponse({
"error": "no live peers in swarm yet — run `mac-tensor join "
f"http://{_local_ip()}:{port}` on a worker Mac first"
}, status_code=503)
body = await request.json()
message = body.get("message", "").strip()
if not message:
return JSONResponse({"error": "empty message"}, status_code=400)
max_iterations = int(body.get("max_iterations", 5))
max_tokens = int(body.get("max_tokens", 300))
def event_stream():
with lock:
try:
for event in run_agent_turn_stream(
active, message,
max_iterations=max_iterations,
max_tokens=max_tokens,
allow_write=allow_write,
):
yield f"data: {json.dumps(event)}\n\n"
except Exception as e:
yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n"
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no",
"Connection": "keep-alive"},
)
# Build a vision agent backend if Falcon is loaded
vision_agent = None
if vision_engine is not None and falcon_tools is not None:
from .agent import VisionAgentBackend
vision_agent = VisionAgentBackend(
vision_engine=vision_engine, falcon_tools=falcon_tools
)
print("Vision agent ready (Gemma 4 + Falcon Perception chained).")
@app.post("/api/chat_vision")
async def chat_vision(
message: str = Form(...),
max_tokens: int = Form(300),
image: UploadFile = File(None),
):
"""Vision chat endpoint — accepts an optional image upload.
If Falcon Perception is loaded, uses the vision agent loop with
tool calling. Otherwise falls back to plain Gemma 4 vision.
"""
if vision_engine is None:
return JSONResponse({"error": "vision mode not enabled"}, status_code=400)
image_path = None
if image is not None and image.filename:
import tempfile
tmp = tempfile.NamedTemporaryFile(suffix="_" + image.filename, delete=False)
tmp.write(await image.read())
tmp.close()
image_path = tmp.name
def event_stream():
with lock:
try:
if vision_agent is not None and image_path:
# Chained mode: Gemma 4 + Falcon tool calls
from .agent import run_vision_agent_turn_stream
for event in run_vision_agent_turn_stream(
vision_agent, message, image_path,
max_iterations=4,
max_tokens=max_tokens,
):
yield f"data: {json.dumps(event)}\n\n"
else:
# Simple mode: just Gemma 4 vision (no tools)
yield f"data: {json.dumps({'type': 'step_start', 'step': 1, 'max': 1})}\n\n"
chunks = []
def on_chunk(text):
chunks.append(text)
output = vision_engine.generate(
message, image_path=image_path,
max_tokens=max_tokens, temperature=0.6,
on_chunk=on_chunk,
)
for chunk in chunks:
yield f"data: {json.dumps({'type': 'token', 'text': chunk})}\n\n"
yield f"data: {json.dumps({'type': 'final', 'text': output.strip()})}\n\n"
yield f"data: {json.dumps({'type': 'done'})}\n\n"
except Exception as e:
import traceback
traceback.print_exc()
yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n"
finally:
if image_path and os.path.exists(image_path):
try:
os.unlink(image_path)
except Exception:
pass
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no",
"Connection": "keep-alive"},
)
@app.post("/api/turbo_chat")
async def turbo_chat(
message: str = Form(...),
max_tokens: int = Form(300),
image: UploadFile = File(None),
):
"""Turbo mode: Gemma 4 encodes image once, then a fast small LLM
(Qwen3-1.7B 4-bit via mlx-lm) handles the reasoning loop.
Streams SSE events compatible with the existing chat UI.
"""
if vision_engine is None:
return JSONResponse({"error": "vision mode not enabled"}, status_code=400)
if not turbo_url:
return JSONResponse(
{"error": "turbo brain not configured (start server with --turbo-url)"},
status_code=400,
)
image_path = None
if image is not None and image.filename:
import tempfile
tmp = tempfile.NamedTemporaryFile(suffix="_" + image.filename, delete=False)
tmp.write(await image.read())
tmp.close()
image_path = tmp.name
def event_stream():
with lock:
try:
# Step 1: Gemma 4 vision encodes the image ONCE
yield f"data: {json.dumps({'type': 'step_start', 'step': 1, 'max': 2})}\n\n"
yield f"data: {json.dumps({'type': 'token', 'text': '🔍 Encoding image with Gemma 4 vision...\\n'})}\n\n"
description = ""
if image_path:
try:
description = vision_engine.generate(
"Describe this image briefly: what's in it, where is it, what colors and notable details. Be concise (3-4 sentences max).",
image_path=image_path,
max_tokens=150,
temperature=0.5,
).strip()
yield f"data: {json.dumps({'type': 'tool_call', 'tool': 'vision_describe', 'args': 'image'})}\n\n"
yield f"data: {json.dumps({'type': 'tool_result', 'result': description})}\n\n"
except Exception as e:
yield f"data: {json.dumps({'type': 'error', 'message': f'vision encode failed: {e}'})}\n\n"
return
# Step 2: fast turbo brain reasons over the description
yield f"data: {json.dumps({'type': 'step_start', 'step': 2, 'max': 2})}\n\n"
# Build the prompt for the turbo brain
if description:
system_msg = (
"You are a fast assistant that answers questions about images. "
"A vision model has already described the image for you. "
"Give a SHORT, direct answer (1-3 sentences). Do not show your reasoning."
)
user_msg = (
f"Image description: {description}\n\n"
f"User's question: {message}"
)
else:
system_msg = "You are a fast helpful assistant. Give SHORT, direct answers."
user_msg = message
# Stream from the turbo brain (mlx-lm OpenAI-compatible endpoint)
import urllib.request
payload = {
"model": "qwen3-1.7b",
"messages": [
{"role": "system", "content": system_msg},
{"role": "user", "content": user_msg},
],
"max_tokens": max_tokens,
"temperature": 0.5,
"stream": True,
}
req = urllib.request.Request(
f"{turbo_url}/v1/chat/completions",
data=json.dumps(payload).encode(),
headers={"Content-Type": "application/json"},
)
final_text = ""
in_think = False
try:
with urllib.request.urlopen(req, timeout=120) as resp:
buffer = ""
for chunk in iter(lambda: resp.read(512), b""):
buffer += chunk.decode("utf-8", errors="ignore")
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
line = line.strip()
if not line.startswith("data: "):
continue
data = line[6:]
if data == "[DONE]":
break
try:
ev = json.loads(data)
delta = ev.get("choices", [{}])[0].get("delta", {})
text = delta.get("content", "")
if not text:
continue
# Strip out <think>...</think> reasoning blocks
for ch in text:
if "<think>" in (final_text + ch)[-7:]:
in_think = True
continue
if "</think>" in (final_text + ch)[-8:]:
in_think = False
final_text = ""
continue
if not in_think:
final_text += ch
yield f"data: {json.dumps({'type': 'token', 'text': ch})}\n\n"
except Exception:
continue
except Exception as e:
yield f"data: {json.dumps({'type': 'error', 'message': f'turbo brain failed: {e}'})}\n\n"
return
yield f"data: {json.dumps({'type': 'final', 'text': final_text.strip()})}\n\n"
yield f"data: {json.dumps({'type': 'done'})}\n\n"
except Exception as e:
import traceback
traceback.print_exc()
yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n"
finally:
if image_path and os.path.exists(image_path):
try:
os.unlink(image_path)
except Exception:
pass
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no",
"Connection": "keep-alive"},
)
@app.post("/api/falcon")
async def falcon_ground(
query: str = Form(...),
image: UploadFile = File(...),
):
"""Run Falcon Perception on uploaded image with text query.
Returns JSON with detected masks (count, IDs, metadata) plus a
base64-encoded annotated image showing bounding boxes + labels.
"""
if falcon_tools is None:
return JSONResponse({"error": "Falcon Perception not loaded"}, status_code=400)
# Save uploaded image
import tempfile, base64, io
tmp = tempfile.NamedTemporaryFile(suffix="_" + image.filename, delete=False)
tmp.write(await image.read())
tmp.close()
try:
with lock:
# Set image in Falcon session
falcon_tools.set_image(tmp.name)
# Run grounding
t0 = time.time()
result = falcon_tools.ground(query, slot=query.replace(" ", "_")[:32])
elapsed = time.time() - t0
if "error" in result:
return JSONResponse(result, status_code=500)
# Annotate the image with bounding boxes
annotated = falcon_tools.annotate_image(mask_ids=result["mask_ids"])
# Encode annotated image as base64 PNG
buf = io.BytesIO()
annotated.save(buf, format="PNG")
annotated_b64 = base64.b64encode(buf.getvalue()).decode()
return JSONResponse({
"query": query,
"count": result["count"],
"mask_ids": result["mask_ids"],
"masks": result["masks"],
"annotated_image": f"data:image/png;base64,{annotated_b64}",
"elapsed_seconds": round(elapsed, 2),
})
except Exception as e:
import traceback
traceback.print_exc()
return JSONResponse({"error": str(e)}, status_code=500)
finally:
try:
os.unlink(tmp.name)
except Exception:
pass
print()
print("=" * 60)
print(f" mac-tensor UI ready")
print(f" Open: http://localhost:{port}")
print(f" http://{_local_ip()}:{port} (LAN access)")
print("=" * 60)
print()
import uvicorn
uvicorn.run(app, host=host, port=port, log_level="warning")
def _local_ip():
"""Best-effort detection of the LAN IP."""
import socket
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
except Exception:
return "localhost"
def main(args):
vision = getattr(args, "vision", False)
falcon_only = getattr(args, "falcon_only", False)
if falcon_only:
# Falcon-only mode: load Falcon Perception, skip Gemma entirely.
# ~1.5 GB resident. Used by the data labeling factory.
run_server(
model_key="falcon",
node_urls=None,
host=args.host or "0.0.0.0",
port=args.port or 8500,
allow_write=False,
vision=False,
falcon=True,
falcon_model=getattr(args, "falcon_model", None),
)
return
if vision:
# Vision mode: single-machine, no distributed nodes needed
run_server(
model_key="gemma4",
node_urls=None,
host=args.host or "0.0.0.0",
port=args.port or 8500,
allow_write=getattr(args, "write", False),
vision=True,
stream_dir=getattr(args, "stream_dir", None),
source_dir=getattr(args, "source_dir", None),
falcon=getattr(args, "falcon", False),
falcon_model=getattr(args, "falcon_model", None),
)
else:
if not args.nodes:
print("Error: --nodes is required (or pass --vision for single-machine mode)")
sys.exit(1)
node_urls = [u.strip() for u in args.nodes.split(",")]
run_server(
model_key=args.model or "gemma4",
node_urls=node_urls,
host=args.host or "0.0.0.0",
port=args.port or 8500,
allow_write=args.write,
)