"""
CASCADE Hyperlattice - Interactive 3D Agent Visualization
L40S GPU-POWERED. Graph-first. Click to drill. Auto-evolve.
"""
# MUST BE FIRST - Patch cascade-lattice before any imports
import cascade_patch # noqa: F401
import streamlit as st
from streamlit_autorefresh import st_autorefresh
import plotly.graph_objects as go
import numpy as np
import time
import colorsys
# Rich TUI for REAL terminal rendering
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from rich.text import Text
from rich.tree import Tree
from rich.style import Style
from io import StringIO
from lattice import Hyperlattice
from swarm import QuineSwarm
from ipfs_sync import CollectiveMemory, SyncManager
from cascade_bridge import CascadeBridge
from champion_loader import (
get_champion_info, load_champion_module,
get_champion_diagnostics, get_replicated_agents_info, get_download_status
)
# ═══════════════════════════════════════════════════════════════════════════════
# GPU DETECTION
# ═══════════════════════════════════════════════════════════════════════════════
try:
import torch
TORCH_AVAILABLE = True
GPU_AVAILABLE = torch.cuda.is_available()
GPU_NAME = torch.cuda.get_device_name(0) if GPU_AVAILABLE else None
GPU_MEMORY = torch.cuda.get_device_properties(0).total_memory // (1024**3) if GPU_AVAILABLE else 0
except ImportError:
TORCH_AVAILABLE = False
GPU_AVAILABLE = False
GPU_NAME = None
GPU_MEMORY = 0
try:
from cascade import Tracer, CausationGraph, sdk_observe, init as cascade_init
from cascade import store as cascade_store
from cascade import discover_models, discover_datasets
from cascade.hold import Hold, CausationHold, HoldSession
from cascade.analysis import MetricsEngine
from cascade.diagnostics import diagnose, BugDetector
from cascade.forensics import DataForensics, GhostLog
from cascade.viz import PlaybackBuffer, find_latest_tape, load_tape_file
CASCADE_AVAILABLE = True
# Initialize cascade observation layer
cascade_init(project="hyperlattice")
except Exception as e:
print(f"[CASCADE] Import failed: {e}")
CASCADE_AVAILABLE = False
sdk_observe = None
cascade_store = None
Tracer = None
CausationGraph = None
Hold = None
CausationHold = None
HoldSession = None
MetricsEngine = None
discover_models = lambda: []
discover_datasets = lambda: []
# ═══════════════════════════════════════════════════════════════════════════════
# PAGE CONFIG
# ═══════════════════════════════════════════════════════════════════════════════
st.set_page_config(
page_title="CASCADE Hyperlattice",
page_icon="🌌",
layout="wide",
initial_sidebar_state="expanded" # Sidebar open by default for controls
)
# ═══════════════════════════════════════════════════════════════════════════════
# ═══════════════════════════════════════════════════════════════════════════════
# EMERGENCY STOP CHECK - Run FIRST before anything else
# ═══════════════════════════════════════════════════════════════════════════════
if st.session_state.get('stop_requested', 0) > 0:
st.session_state.auto_run = False
st.session_state.stop_requested = 0
# ═══════════════════════════════════════════════════════════════════════════════
# SESSION STATE
# ═══════════════════════════════════════════════════════════════════════════════
if 'initialized' not in st.session_state:
st.session_state.initialized = False
st.session_state.lattice = None
st.session_state.swarm = None
st.session_state.cascade = None
st.session_state.step_count = 0
st.session_state.events = []
st.session_state.selected_agent = None
st.session_state.selected_gate = None # Null gate selection
st.session_state.selected_trail = None # Trail/movement selection
st.session_state.selection_type = None # 'agent', 'gate', 'trail'
st.session_state.lattice_size = 8
st.session_state.num_agents = 10
st.session_state.max_agents = 15 # Population cap
st.session_state.num_gates = 6
st.session_state.auto_run = False
st.session_state.stop_requested = 0 # Counter for aggressive stop
st.session_state.experience_chain = []
st.session_state.camera_angle = 0
# CHEESE - The goal!
st.session_state.cheese_position = None
st.session_state.cheese_found = False
st.session_state.cheese_finder = None
# HOLD system - pause simulation for graph interaction
st.session_state.hold_active = False
st.session_state.hold_camera = None # Store camera state during hold
# ═══════════════════════════════════════════════════════════════════════════════
# HOLD SYSTEM - cascade-lattice integration
# ═══════════════════════════════════════════════════════════════════════════════
if CASCADE_AVAILABLE and Hold is not None:
hold_system = Hold.get()
else:
hold_system = None
# ═══════════════════════════════════════════════════════════════════════════════
# SIMULATION
# ═══════════════════════════════════════════════════════════════════════════════
def check_hold():
"""Check if HOLD is active - if so, pause auto_run."""
if st.session_state.get('hold_active', False):
st.session_state.auto_run = False
return True
return False
def init_simulation():
try:
size = st.session_state.lattice_size
agents = st.session_state.num_agents
gates = st.session_state.num_gates
st.session_state.lattice = Hyperlattice(dimensions=(size, size, size), num_null_gates=gates)
st.session_state.swarm = QuineSwarm(st.session_state.lattice, initial_agents=agents)
st.session_state.cascade = CascadeBridge(session_id=f"stream_{int(time.time())}")
st.session_state.step_count = 0
st.session_state.initialized = True
st.session_state.events = []
# Place the CHEESE randomly in the lattice!
import random
all_nodes = list(st.session_state.lattice.nodes.keys())
if all_nodes:
st.session_state.cheese_position = random.choice(all_nodes)
st.session_state.cheese_found = False
st.session_state.cheese_finder = None
# Log genesis to cascade
if CASCADE_AVAILABLE and cascade_store:
try:
receipt = cascade_store.observe(
model_id="hyperlattice",
data={
"event": "genesis",
"lattice_size": size,
"agents": agents,
"gates": gates,
"session_id": st.session_state.cascade.session_id
}
)
print(f"[CASCADE] Genesis logged: {receipt.cid}")
except Exception as e:
print(f"[CASCADE] Genesis log failed: {e}")
except Exception as e:
st.error(f"Init failed: {e}")
import traceback
traceback.print_exc()
def step_simulation():
if not st.session_state.initialized:
init_simulation()
events = st.session_state.swarm.step()
st.session_state.step_count += 1
# Fork at null gates occasionally
new_agents = st.session_state.swarm.fork_at_null_gates()
# Prune if too many
st.session_state.swarm.prune_weak(keep_top=st.session_state.max_agents)
for event in events:
st.session_state.cascade.log_event(event)
# event is an Experience object, not a dict
event_type = event.action if hasattr(event, 'action') else "move"
agent_id = event.agent_id if hasattr(event, 'agent_id') else str(event)[:20]
event_data = event.to_dict() if hasattr(event, 'to_dict') else {"raw": str(event)}
st.session_state.events.append({
"step": st.session_state.step_count,
"type": event_type,
"agent": agent_id,
"data": event_data
})
# Track forks
for agent in new_agents:
st.session_state.events.append({
"step": st.session_state.step_count,
"type": "fork",
"agent": agent.agent_id,
"data": {"parent": agent.parent_hash, "gen": agent.generation}
})
# 🧀 CHEESE HUNT! Check if any agent found the cheese
if st.session_state.cheese_position and not st.session_state.cheese_found:
cheese_node = st.session_state.cheese_position # This is a node ID string
# Check all agents - swarm.agents is a Dict[str, QuineAgent]
for agent in st.session_state.swarm.agents.values():
# agent.position is the current node ID (string)
if agent.position == cheese_node:
# FOUND IT! 🎉
st.session_state.cheese_found = True
st.session_state.cheese_finder = agent.agent_id
st.session_state.auto_run = False # Stop the simulation!
st.session_state.events.append({
"step": st.session_state.step_count,
"type": "🧀 CHEESE FOUND!",
"agent": agent.agent_id,
"data": {"position": cheese_node[:12], "generation": agent.generation}
})
break
# Also check via 3D position proximity (fallback)
if not st.session_state.cheese_found:
try:
cheese_3d = st.session_state.lattice.get_node_position(cheese_node)
for agent in st.session_state.swarm.agents.values():
agent_3d = st.session_state.lattice.get_node_position(agent.position)
dist = sum((a - b) ** 2 for a, b in zip(agent_3d, cheese_3d)) ** 0.5
if dist < 0.1: # Very close
st.session_state.cheese_found = True
st.session_state.cheese_finder = agent.agent_id
st.session_state.auto_run = False
st.session_state.events.append({
"step": st.session_state.step_count,
"type": "🧀 CHEESE FOUND!",
"agent": agent.agent_id,
"data": {"position": cheese_node[:12], "generation": agent.generation}
})
break
except:
pass
# Keep ALL events - only limit to prevent memory issues in very long sessions
if len(st.session_state.events) > 10000:
st.session_state.events = st.session_state.events[-10000:]
# ═══════════════════════════════════════════════════════════════════════════════
# RICH TUI RENDERER - Real terminal rendering with SVG export
# ═══════════════════════════════════════════════════════════════════════════════
def render_rich_tui(events, cascade_stats, lineage_data, cascade_store=None):
"""
Render a REAL Rich TUI and export to SVG for Streamlit embedding.
SVG embeds cleanly unlike HTML which gets corrupted.
"""
console = Console(record=True, force_terminal=True, width=120, color_system="truecolor")
# Header Panel
obs_count = cascade_stats.get('total_observations', 0)
genesis = cascade_stats.get('genesis_root', '????????????????')
header_text = Text()
header_text.append("CASCADE HYPERLATTICE v0.6.2\n", style="bold cyan")
header_text.append(f"Genesis: ", style="white")
header_text.append(f"{genesis}\n", style="bold green")
header_text.append(f"Lattice Observations: ", style="white")
header_text.append(f"{obs_count:,}", style="bold yellow")
console.print(Panel(header_text, title="🌌 HYPERLATTICE", border_style="cyan"))
# Events Table - FULL HISTORY, NO TRUNCATION
events_table = Table(
title="🔥 LIVE EVENT STREAM",
show_header=True,
header_style="bold magenta",
border_style="green",
title_style="bold green"
)
events_table.add_column("Step", style="cyan", justify="right", width=6)
events_table.add_column("Agent", style="yellow", width=24)
events_table.add_column("Event", style="magenta", width=14)
events_table.add_column("Details", style="white")
# ALL events - no truncation
for evt in reversed(events):
step = str(evt.get('step', '?'))
agent = evt.get('agent', '??????')
evt_type = evt.get('type', 'unknown').upper()
data = evt.get('data', {})
# Format details based on event type
if evt_type == 'MOVE':
from_n = data.get('from_node', '?')
to_n = data.get('to_node', '?')
details = f"{from_n} → {to_n}"
elif evt_type == 'NULL_TRANSIT':
from_n = data.get('from_node', '?')
to_n = data.get('to_node', '?')
details = f"⚡ {from_n} ⇒ {to_n}"
elif evt_type == 'FORK':
child = data.get('child_id', '?')
details = f"🧬 parent→{child}"
elif evt_type == 'SPAWN':
details = "✨ new agent"
elif evt_type == 'PRUNE':
fit = data.get('fitness', 0)
details = f"💀 fitness={fit:.3f}"
elif evt_type == 'DECISION':
action = data.get('action', '?')
details = f"🎮 {action}"
elif evt_type == 'HOLD_RESOLUTION':
override = data.get('was_override', False)
details = f"{'🛑' if override else '✅'} override={override}"
else:
details = str(data)[:80] if data else ""
events_table.add_row(step, agent, evt_type, details)
console.print(events_table)
# Agent Lineage Tree - FULL, NO TRUNCATION
if lineage_data:
tree = Tree("🧬 [bold cyan]AGENT LINEAGE[/bold cyan]", guide_style="dim")
# Build tree structure
roots = []
children_map = {}
for aid, d in lineage_data.items():
parent = d.get('parent_hash')
if not parent or parent == 'GENESIS':
roots.append((aid, d))
else:
if parent not in children_map:
children_map[parent] = []
children_map[parent].append((aid, d))
def add_children(node, agent_id):
for child_id, child_data in children_map.get(agent_id, []):
gen = child_data.get('generation', 0)
fit = child_data.get('fitness', 0)
child_node = node.add(f"[yellow]{child_id}[/yellow] G{gen} F{fit:.2f}")
add_children(child_node, child_id)
for root_id, root_data in roots:
gen = root_data.get('generation', 0)
fit = root_data.get('fitness', 0)
root_node = tree.add(f"[bold green]{root_id}[/bold green] G{gen} F{fit:.2f}")
add_children(root_node, root_id)
console.print(tree)
# Cascade Store Observations - query if available
if cascade_store:
try:
recent_obs = cascade_store.query(limit=50)
if recent_obs:
obs_table = Table(
title="🔗 LATTICE OBSERVATIONS",
show_header=True,
header_style="bold blue",
border_style="blue"
)
obs_table.add_column("CID", style="cyan", width=18)
obs_table.add_column("Model", style="yellow", width=30)
obs_table.add_column("Timestamp", style="dim")
for obs in recent_obs:
cid = obs.cid if hasattr(obs, 'cid') and obs.cid else "N/A"
model = obs.model_id if hasattr(obs, 'model_id') else "?"
ts = str(obs.timestamp)[:19] if hasattr(obs, 'timestamp') else "?"
obs_table.add_row(cid, model, ts)
console.print(obs_table)
except Exception as e:
console.print(f"[dim red]Cascade query: {e}[/dim red]")
# Export to SVG - embeds cleanly in Streamlit
svg = console.export_svg(title="CASCADE TUI")
return svg
# ═══════════════════════════════════════════════════════════════════════════════
# CSS - Mobile-first responsive design
# ═══════════════════════════════════════════════════════════════════════════════
st.markdown("""
""", unsafe_allow_html=True)
# ═══════════════════════════════════════════════════════════════════════════════
# GATHER DATA
# ═══════════════════════════════════════════════════════════════════════════════
diagnostics = get_champion_diagnostics()
identity = diagnostics.get('identity', {})
arch = diagnostics.get('architecture', {})
caps = diagnostics.get('capabilities', {})
traits_data = diagnostics.get('traits', {})
lineage_data = {}
swarm_stats = {}
provenance_receipt = {}
if st.session_state.initialized:
lineage_data = st.session_state.swarm.get_agent_lineage_data()
swarm_stats = st.session_state.swarm.get_swarm_stats()
if st.session_state.cascade:
provenance_receipt = st.session_state.cascade.get_session_receipt()
# ═══════════════════════════════════════════════════════════════════════════════
# SIDEBAR - Controls & Settings (VISIBLE BY DEFAULT)
# ═══════════════════════════════════════════════════════════════════════════════
with st.sidebar:
st.markdown("# 🎮 CONTROLS")
# BIG obvious buttons
col1, col2 = st.columns(2)
with col1:
if st.button("🚀 START", key="sidebar_start"):
with st.spinner("Initializing..."):
init_simulation()
st.rerun()
with col2:
if st.button("⚡ STEP", use_container_width=True):
step_simulation()
st.rerun()
# Auto-run toggle - AGGRESSIVE STOP
st.markdown("---")
auto_label = "🛑 STOP AUTO" if st.session_state.auto_run else "▶️ AUTO RUN"
auto_class = "auto-btn-active" if st.session_state.auto_run else "auto-btn"
st.markdown(f'
', unsafe_allow_html=True)
if st.button(auto_label, use_container_width=True):
if st.session_state.auto_run:
# AGGRESSIVE STOP - set flag immediately, no toggle
st.session_state.auto_run = False
st.session_state.stop_requested += 1
else:
st.session_state.auto_run = True
st.session_state.stop_requested = 0
st.rerun()
st.markdown('
', unsafe_allow_html=True)
# 🔒 HOLD TOGGLE - Uses cascade-lattice HOLD system
st.markdown("---")
hold_label = "🔓 RELEASE HOLD" if st.session_state.get('hold_active', False) else "🔒 HOLD GRAPH"
hold_help = "HOLD pauses simulation & lets you rotate/pan the 3D graph freely"
if st.button(hold_label, use_container_width=True, help=hold_help):
if st.session_state.get('hold_active', False):
# Release hold - RESUME auto if it was running before hold
st.session_state.hold_active = False
# Restore auto_run state from before hold
if st.session_state.get('auto_run_before_hold', False):
st.session_state.auto_run = True
st.session_state.auto_run_before_hold = False
if hold_system:
hold_system.accept() # Accept/release the hold point
else:
# Engage hold - PAUSE auto_run (save state to restore later)
st.session_state.hold_active = True
# Save current auto_run state before pausing
st.session_state.auto_run_before_hold = st.session_state.auto_run
st.session_state.auto_run = False
if hold_system:
# Log hold engagement to cascade (action_probs must be numpy array)
hold_system.yield_point(
action_probs=np.array([1.0]),
value=0.0,
observation={"event": "graph_hold", "step": st.session_state.step_count},
brain_id="hyperlattice_ui"
)
st.rerun()
if st.session_state.get('hold_active', False):
st.info("🔒 **HOLD ACTIVE** - Graph interaction enabled. Simulation paused.")
# Check for accumulated stop requests
if st.session_state.get('stop_requested', 0) > 0:
st.session_state.auto_run = False
# Check hold state - if active, force auto_run off
check_hold()
st.markdown("---")
st.markdown("## ⚙️ Settings")
st.session_state.lattice_size = st.slider("🌐 Lattice Size", 4, 12, st.session_state.lattice_size)
st.session_state.num_agents = st.slider("👥 Agents", 3, 25, st.session_state.num_agents)
st.session_state.max_agents = st.slider("💀 Max Pop (cull)", 5, 50, st.session_state.get('max_agents', 15))
st.session_state.num_gates = st.slider("🔮 Null Gates", 0, 15, st.session_state.num_gates)
if st.button("🔄 REINIT", use_container_width=True):
init_simulation()
st.rerun()
# System info
st.markdown("---")
st.markdown("## 🖥️ System")
if GPU_AVAILABLE:
st.success(f"✅ {GPU_NAME}")
st.caption(f"{GPU_MEMORY}GB VRAM")
else:
st.warning("⚠️ CPU Mode")
# Selected agent drill-down
if st.session_state.selected_agent and st.session_state.selected_agent in lineage_data:
st.markdown("---")
st.markdown("## 👤 Selected Agent")
d = lineage_data[st.session_state.selected_agent]
st.code(st.session_state.selected_agent, language=None)
col1, col2 = st.columns(2)
with col1:
st.metric("Gen", d['generation'])
st.metric("Children", d['num_children'])
with col2:
st.metric("Fitness", f"{d['fitness']:.4f}")
# Experience chain - FULL, NO TRUNCATION
if st.session_state.experience_chain:
with st.expander(f"📜 Experience Chain ({len(st.session_state.experience_chain)})", expanded=True):
for i, exp in enumerate(st.session_state.experience_chain):
st.caption(f"#{i+1} → {exp.get('to_node', '?')} (R:{exp.get('reward', 0):.3f})")
if st.button("❌ Deselect", use_container_width=True):
st.session_state.selected_agent = None
st.session_state.experience_chain = []
st.rerun()
# ═══════════════════════════════════════════════════════════════
# CASCADE-LATTICE STATS - Full package integration
# ═══════════════════════════════════════════════════════════════
st.markdown("---")
st.markdown("## 🔗 CASCADE LATTICE")
if CASCADE_AVAILABLE and cascade_store:
try:
lattice_stats = cascade_store.stats()
st.metric("📊 Observations", f"{lattice_stats.get('total_observations', 0):,}")
st.metric("📌 Pinned", lattice_stats.get('pinned_observations', 0))
# Genesis root
genesis = lattice_stats.get('genesis_root', 'unknown')
st.code(f"Genesis: {genesis}", language=None)
# Model count
models = lattice_stats.get('models', {})
st.caption(f"{len(models)} model types tracked")
# Show ALL models in expander - NO TRUNCATION
with st.expander(f"📁 Models ({len(models)} total)", expanded=False):
sorted_models = sorted(models.items(), key=lambda x: -x[1]) # ALL models
for model_id, count in sorted_models:
st.text(f"{model_id}: {count}")
# HOLD stats
if Hold:
hold = Hold.get()
hold_stats = hold.stats
st.caption(f"HOLD: {hold_stats.get('total_holds', 0)} holds, {hold_stats.get('overrides', 0)} overrides")
except Exception as e:
st.caption(f"Stats error: {e}")
else:
st.caption("cascade-lattice not available")
# ═══════════════════════════════════════════════════════════════════════════════
# MAIN AREA - Graph Left (60%), Data Right (40%)
# ═══════════════════════════════════════════════════════════════════════════════
st.markdown("# CASCADE Hyperlattice")
st.markdown(f"**Step: {st.session_state.step_count}** | Agents: {len(lineage_data) if st.session_state.initialized else 0}")
# ═══════════════════════════════════════════════════════════════════════════════
# MAIN LAYOUT: Responsive - Graph (left 65%) | Data Panels (right 35%)
# ═══════════════════════════════════════════════════════════════════════════════
graph_col, data_col = st.columns([2, 1], gap="medium")
with graph_col:
if not st.session_state.initialized:
st.markdown("""
""", unsafe_allow_html=True)
else:
# Build 3D graph
def get_lineage_color(root_id, generation, fitness, max_gen):
hue = (hash(root_id) % 360) / 360.0
saturation = max(0.3, 1.0 - (generation / max(max_gen, 1)) * 0.7)
lightness = 0.3 + min(1.0, fitness / 5.0) * 0.5
r, g, b = colorsys.hls_to_rgb(hue, lightness, saturation)
return f'rgb({int(r*255)},{int(g*255)},{int(b*255)})'
max_gen = max((d['generation'] for d in lineage_data.values()), default=1)
x_vals, y_vals, z_vals = [], [], []
colors, sizes, texts = [], [], []
agent_ids = list(lineage_data.keys())
for aid, d in lineage_data.items():
pos = d['position']
x_vals.append(pos[0])
y_vals.append(pos[1])
z_vals.append(pos[2])
color = get_lineage_color(d['root_lineage'], d['generation'], d['fitness'], max_gen)
colors.append(color)
size = 20 if st.session_state.selected_agent == aid else 12
sizes.append(size)
short_id = aid[-8:] if len(aid) > 8 else aid
texts.append(f"{short_id}
Gen: {d['generation']}
Fit: {d['fitness']:.4f}")
fig = go.Figure()
# ═══════════════════════════════════════════════════════════════════
# QUINE CONFLUENCE TRAILS - Each agent has unique color from Merkle hash
# Temporal gradient: solid at present → faded at origin
# Direction cones show traversal direction
# ═══════════════════════════════════════════════════════════════════
def hash_to_hsl(id_str: str) -> tuple:
"""Convert any string (agent_id) to unique HSL color via hashing."""
if not id_str:
return (180, 70, 50) # Default cyan
# Hash the string to get consistent hex representation
import hashlib
hex_hash = hashlib.md5(id_str.encode()).hexdigest()
# Use hash chars for hue, saturation, lightness
h = int(hex_hash[:4], 16) % 360
s = 60 + (int(hex_hash[4:6], 16) % 30) # 60-90%
l = 45 + (int(hex_hash[6:8], 16) % 20) # 45-65%
return (h, s, l)
def hsl_to_rgb(h, s, l):
"""Convert HSL to RGB."""
s, l = s / 100, l / 100
c = (1 - abs(2 * l - 1)) * s
x = c * (1 - abs((h / 60) % 2 - 1))
m = l - c / 2
if h < 60: r, g, b = c, x, 0
elif h < 120: r, g, b = x, c, 0
elif h < 180: r, g, b = 0, c, x
elif h < 240: r, g, b = 0, x, c
elif h < 300: r, g, b = x, 0, c
else: r, g, b = c, 0, x
return int((r + m) * 255), int((g + m) * 255), int((b + m) * 255)
# Group movements by agent
agent_trails = {} # agent_id -> list of (from_node, to_node, step, is_null_transit)
max_step = st.session_state.step_count or 1
for e in st.session_state.events:
if e["type"] in ["move", "null_transit"]:
data = e.get("data", {})
agent_id = e.get("agent", "unknown")
from_node = data.get("from_node") or ""
to_node = data.get("to_node") or ""
step = e.get("step", 0)
is_null = e["type"] == "null_transit"
if from_node and to_node:
if agent_id not in agent_trails:
agent_trails[agent_id] = []
agent_trails[agent_id].append((from_node, to_node, step, is_null))
# Get agent colors from STABLE identifier (agent_id, not quine_hash which evolves)
# agent_id is the birth hash - it never changes
agent_colors = {}
for agent_id in agent_trails.keys():
# Use the agent_id itself - it's the stable birth hash
agent_colors[agent_id] = hash_to_hsl(agent_id)
# Draw each agent's trail - connect ALL segments in step order
# The path should be continuous since to_node[n] == from_node[n+1] in normal movement
for agent_id, moves in agent_trails.items():
if not moves:
continue
# Sort by step to ensure correct path order
moves.sort(key=lambda m: m[2])
h, s, l = agent_colors.get(agent_id, (180, 70, 50))
r, g, b = hsl_to_rgb(h, s, l)
# Build continuous path - just chain all positions together
path_x, path_y, path_z = [], [], []
hover_texts = []
null_segment_indices = [] # Track segment start indices for null transits
for i, (from_node, to_node, step, is_null) in enumerate(moves):
if from_node not in st.session_state.lattice.nodes or to_node not in st.session_state.lattice.nodes:
continue
p1 = st.session_state.lattice.get_node_position(from_node)
p2 = st.session_state.lattice.get_node_position(to_node)
# Always add from_node for each segment (creates overlapping points but ensures continuity)
path_x.append(p1[0])
path_y.append(p1[1])
path_z.append(p1[2])
hover_texts.append(f"{agent_id[:8]} step {step}")
# Add destination point
path_x.append(p2[0])
path_y.append(p2[1])
path_z.append(p2[2])
hover_texts.append(f"{agent_id[:8]} step {step}")
# Track null transit segment for dashed overlay
if is_null:
null_segment_indices.append(len(path_x) - 2) # Index of from point
# Draw the path with breaks
if len(path_x) >= 2:
opacity = 0.85
width = 2.5
fig.add_trace(go.Scatter3d(
x=path_x, y=path_y, z=path_z,
mode='lines+markers',
line=dict(color=f'rgba({r},{g},{b},{opacity:.2f})', width=width),
marker=dict(size=2, color=f'rgba({r},{g},{b},{opacity:.2f})'),
hoverinfo='text',
hovertext=hover_texts,
showlegend=False,
connectgaps=False # Don't connect across None values
))
# Overlay dashed lines for null transit segments
for seg_idx in null_segment_indices:
if seg_idx + 1 < len(path_x) and path_x[seg_idx] is not None and path_x[seg_idx + 1] is not None:
fig.add_trace(go.Scatter3d(
x=[path_x[seg_idx], path_x[seg_idx + 1]],
y=[path_y[seg_idx], path_y[seg_idx + 1]],
z=[path_z[seg_idx], path_z[seg_idx + 1]],
mode='lines',
line=dict(color=f'rgba({r},{g},{b},0.9)', width=3, dash='dash'),
hoverinfo='skip',
showlegend=False
))
# NULL GATE CONNECTIONS - Glowing wormhole tunnels
null_pairs = st.session_state.lattice.get_null_gate_pairs()
gate_conn_x, gate_conn_y, gate_conn_z = [], [], []
for n1, n2 in null_pairs:
p1 = st.session_state.lattice.get_node_position(n1)
p2 = st.session_state.lattice.get_node_position(n2)
gate_conn_x.extend([p1[0], p2[0], None])
gate_conn_y.extend([p1[1], p2[1], None])
gate_conn_z.extend([p1[2], p2[2], None])
if gate_conn_x:
# Outer glow
fig.add_trace(go.Scatter3d(
x=gate_conn_x, y=gate_conn_y, z=gate_conn_z,
mode='lines',
line=dict(color='rgba(255,100,255,0.3)', width=8),
hoverinfo='none',
showlegend=False
))
# Core beam
fig.add_trace(go.Scatter3d(
x=gate_conn_x, y=gate_conn_y, z=gate_conn_z,
mode='lines',
line=dict(color='rgba(255,0,255,0.7)', width=3),
hoverinfo='none',
name='Wormholes'
))
# AGENTS - 🐭 Mouse visualization to chase the cheese!
# Outer glow ring (fitness-based intensity)
glow_sizes = []
glow_colors = []
for aid, d in lineage_data.items():
fitness = d['fitness']
# Glow size based on fitness (more fit = bigger aura)
glow_size = 25 + min(fitness * 10, 30)
glow_sizes.append(glow_size)
# Glow color with transparency
base_color = colors[list(lineage_data.keys()).index(aid)]
glow_colors.append(base_color.replace('rgb(', 'rgba(').replace(')', ',0.25)'))
# Outer glow layer for mice
fig.add_trace(go.Scatter3d(
x=x_vals, y=y_vals, z=z_vals,
mode='markers',
marker=dict(size=glow_sizes, color=glow_colors, opacity=0.3,
line=dict(width=0)),
hoverinfo='none',
showlegend=False
))
# 🐭 Mouse emoji markers for quine agents!
fig.add_trace(go.Scatter3d(
x=x_vals, y=y_vals, z=z_vals,
mode='markers+text',
marker=dict(size=sizes, color=colors, opacity=0.9,
symbol='circle',
line=dict(width=2, color='#fff')),
text=['🐭'] * len(x_vals),
textposition='middle center',
textfont=dict(size=14, color='#ffffff'),
hovertext=texts,
hoverinfo='text',
customdata=agent_ids,
name='Mice'
))
# SELECTION HIGHLIGHT - Glowing ring around selected agent
if st.session_state.selected_agent and st.session_state.selected_agent in lineage_data:
sel_d = lineage_data[st.session_state.selected_agent]
sel_pos = sel_d['position']
# Add multiple rings for glow effect
for ring_size, ring_alpha in [(35, 0.3), (28, 0.5), (22, 0.8)]:
fig.add_trace(go.Scatter3d(
x=[sel_pos[0]], y=[sel_pos[1]], z=[sel_pos[2]],
mode='markers',
marker=dict(
size=ring_size,
color=f'rgba(0,255,255,{ring_alpha})',
symbol='circle-open',
line=dict(width=3, color='#00ffff')
),
hoverinfo='none',
name='Selection',
showlegend=False
))
# Lineage edges
edge_x, edge_y, edge_z = [], [], []
for aid, d in lineage_data.items():
if d['parent_hash']:
for pid, pd in lineage_data.items():
if pd['quine_hash'] == d['parent_hash']:
pos1 = pd['position']
pos2 = d['position']
edge_x.extend([pos1[0], pos2[0], None])
edge_y.extend([pos1[1], pos2[1], None])
edge_z.extend([pos1[2], pos2[2], None])
break
if edge_x:
fig.add_trace(go.Scatter3d(
x=edge_x, y=edge_y, z=edge_z,
mode='lines',
line=dict(color='rgba(80,80,80,0.5)', width=2),
hoverinfo='none',
name='Lineage'
))
# Null gates - PORTAL visualization with glowing rings
gate_ids = []
if st.session_state.lattice.null_gates:
null_x, null_y, null_z = [], [], []
gate_hovers = []
for gate_key, node_id in st.session_state.lattice.null_gates.items():
try:
pos = st.session_state.lattice.get_node_position(node_id)
null_x.append(pos[0])
null_y.append(pos[1])
null_z.append(pos[2])
gate_ids.append(gate_key)
# Find paired gate
pair = st.session_state.lattice.get_gate_pair(gate_key)
pair_str = pair[:8] if pair else "N/A"
gate_hovers.append(f"⚡ PORTAL·Gate: {gate_key[:8]}·Pair: {pair_str}·Node: {node_id[:8]}")
except (KeyError, AttributeError):
pass
if null_x:
# Outer glow ring
fig.add_trace(go.Scatter3d(
x=null_x, y=null_y, z=null_z,
mode='markers',
marker=dict(size=28, color='rgba(255,100,255,0.2)',
symbol='circle-open',
line=dict(width=4, color='rgba(255,50,255,0.4)')),
hoverinfo='none',
showlegend=False
))
# Middle ring
fig.add_trace(go.Scatter3d(
x=null_x, y=null_y, z=null_z,
mode='markers',
marker=dict(size=18, color='rgba(200,50,255,0.5)',
symbol='circle-open',
line=dict(width=3, color='#ff44ff')),
hoverinfo='none',
showlegend=False
))
# Core portal marker - just the lightning bolt, no X
fig.add_trace(go.Scatter3d(
x=null_x, y=null_y, z=null_z,
mode='markers+text',
marker=dict(size=12, color='#ff00ff', opacity=0.8,
symbol='circle',
line=dict(width=2, color='#ff88ff')),
text=['⚡' for _ in null_x],
textposition='middle center',
textfont=dict(size=18, color='#ffffff'),
hovertext=gate_hovers,
hoverinfo='text',
customdata=gate_ids,
name='Portals'
))
# 🧀 THE CHEESE - Render LAST so it's on top of everything!
if st.session_state.cheese_position and not st.session_state.cheese_found:
try:
cheese_pos = st.session_state.lattice.get_node_position(st.session_state.cheese_position)
cx, cy, cz = cheese_pos[0], cheese_pos[1], cheese_pos[2]
# Big glowing cheese marker - larger than gates, distinctive color
fig.add_trace(go.Scatter3d(
x=[cx], y=[cy], z=[cz],
mode='markers+text',
marker=dict(size=20, color='#FFD700', opacity=1.0,
symbol='circle',
line=dict(width=3, color='#FF8C00')),
text=['🧀'],
textposition='top center',
textfont=dict(size=24, color='#FFD700'),
hovertext='🧀 THE CHEESE! Find me!',
hoverinfo='text',
name='🧀 Cheese'
))
except (KeyError, AttributeError):
pass
# Camera - use saved position if HOLD was used, otherwise default
if st.session_state.get('hold_camera'):
camera_eye = st.session_state.hold_camera
else:
camera_eye = dict(x=1.5, y=1.5, z=1.0)
# Unique key for uirevision to preserve camera during HOLD
ui_rev = 'hold_locked' if st.session_state.get('hold_active', False) else 'camera_lock'
# Build scene config
scene_config = dict(
xaxis=dict(showbackground=False, showgrid=True, gridcolor='#222', zeroline=False, title='', showticklabels=False),
yaxis=dict(showbackground=False, showgrid=True, gridcolor='#222', zeroline=False, title='', showticklabels=False),
zaxis=dict(showbackground=False, showgrid=True, gridcolor='#222', zeroline=False, title='', showticklabels=False),
bgcolor='#0a0a0a',
camera=dict(eye=camera_eye),
aspectmode='cube', # Lock aspect ratio
dragmode='orbit' if st.session_state.get('hold_active', False) else 'pan' # Orbit when HOLD active
)
fig.update_layout(
scene=scene_config,
paper_bgcolor='#0a0a0a',
plot_bgcolor='#0a0a0a',
showlegend=False,
margin=dict(l=0, r=0, t=0, b=0),
width=800, # FIXED width - no resize on settings change
height=700, # FIXED height
autosize=False, # DISABLE autosize to prevent resizing
uirevision=ui_rev, # PRESERVE camera/zoom - different key during HOLD
scene_camera=dict(projection=dict(type='perspective'))
)
# 🔒 HOLD indicator overlay on graph
if st.session_state.get('hold_active', False):
st.markdown("""
🔒 HOLD ACTIVE - Drag to rotate/pan
""", unsafe_allow_html=True)
# 🧀 CHEESE VICTORY CELEBRATION!
if st.session_state.cheese_found:
st.balloons()
st.success(f"🧀🎉 **THE CHEESE HAS BEEN FOUND!** 🎉🧀\n\n**Champion:** `{st.session_state.cheese_finder}`\n\n*Press Reset Sim to play again!*")
# Render graph - NO selection callback during HOLD (allows free interaction)
if st.session_state.get('hold_active', False):
# During HOLD: no on_select to prevent reruns, allow free rotation/pan
event = st.plotly_chart(fig, key="main_graph_hold", width="content")
else:
# Normal mode: selection callback enabled
event = st.plotly_chart(fig, key="main_graph", on_select="rerun", selection_mode="points", width="content")
# Handle selection from graph click
if event and hasattr(event, 'selection'):
sel = event.selection
# Streamlit selection is a dict with 'points' key when user clicks
# Skip if it's not a dict (could be a method reference or other object)
if isinstance(sel, dict) and sel.get('points'):
point = sel['points'][0]
trace_idx = point.get('curve_number', point.get('curveNumber', -1))
pt_idx = point.get('point_number', point.get('pointNumber', -1))
# Determine which trace was clicked
# Trail = 0, Wormholes = 1, Agents = 2, Lineage = 3, Gates = 4
# (order depends on how many traces were added)
trace_name = ""
try:
trace_name = fig.data[trace_idx].name
except:
pass
if trace_name == 'Agents' and pt_idx >= 0 and pt_idx < len(agent_ids):
clicked_agent = agent_ids[pt_idx]
st.session_state.selected_agent = clicked_agent
st.session_state.selection_type = 'agent'
st.session_state.experience_chain = st.session_state.swarm.get_experience_chain(clicked_agent)
st.rerun()
elif trace_name == 'Gates' and pt_idx >= 0 and pt_idx < len(gate_ids):
clicked_gate = gate_ids[pt_idx]
st.session_state.selected_gate = clicked_gate
st.session_state.selection_type = 'gate'
st.rerun()
# ═══════════════════════════════════════════════════════════════
# 🧬 LINEAGE SUNBURST - Interactive genealogical tree
# ═══════════════════════════════════════════════════════════════
st.markdown("---")
st.markdown("### 🧬 LINEAGE TREE")
# Build sunburst data structure
sb_ids = []
sb_parents = []
sb_values = []
sb_labels = []
sb_colors = []
# Map quine_hash -> agent_id for parent lookup
hash_to_id = {d['quine_hash']: aid for aid, d in lineage_data.items()}
for aid, d in lineage_data.items():
sb_ids.append(aid)
# Find parent by quine_hash
parent_hash = d.get('parent_hash')
if parent_hash and parent_hash in hash_to_id:
sb_parents.append(hash_to_id[parent_hash])
else:
sb_parents.append("") # Root node
# Value = fitness (minimum 0.1 for visibility)
sb_values.append(max(0.1, d['fitness']))
# Label = short ID + generation
short_id = aid[-6:] if len(aid) > 6 else aid
sb_labels.append(f"{short_id}
G{d['generation']}")
# Color by generation
sb_colors.append(d['generation'])
if sb_ids:
# Create sunburst chart
sunburst_fig = go.Figure(go.Sunburst(
ids=sb_ids,
labels=sb_labels,
parents=sb_parents,
values=sb_values,
branchvalues="total",
marker=dict(
colors=sb_colors,
colorscale='Viridis',
line=dict(width=2, color='#111')
),
hovertemplate="%{label}
Fitness: %{value:.2f}",
textfont=dict(size=11, color='white'),
insidetextorientation='radial'
))
sunburst_fig.update_layout(
margin=dict(t=10, l=10, r=10, b=10),
paper_bgcolor='#0a0a0a',
height=280,
font=dict(color='white'),
clickmode='event+select' # Enable click events
)
# Render chart
st.plotly_chart(sunburst_fig, key="lineage_sunburst", width="stretch")
# Agent selector with radio buttons (guaranteed to work)
sorted_agents = sorted(lineage_data.items(), key=lambda x: -x[1]['fitness'])
# Radio button selector
agent_labels = {aid: f"🧬 {aid[-6:]} G{d['generation']} F{d['fitness']:.1f}" for aid, d in sorted_agents}
agent_labels[None] = "— none —"
current_sel = st.session_state.selected_agent if st.session_state.selected_agent in agent_labels else None
new_selection = st.radio(
"Select Agent:",
options=[None] + [aid for aid, _ in sorted_agents],
format_func=lambda x: agent_labels.get(x, str(x)),
index=0 if current_sel is None else list(agent_labels.keys()).index(current_sel),
key="agent_radio",
horizontal=True,
label_visibility="collapsed"
)
if new_selection != st.session_state.selected_agent:
st.session_state.selected_agent = new_selection
if new_selection:
st.session_state.experience_chain = st.session_state.swarm.get_experience_chain(new_selection)
st.toast(f"✅ {new_selection[-8:]}", icon="🧬")
st.rerun()
# ═══════════════════════════════════════════════════════════════
# 💻 REAL RICH TUI TERMINAL - Genuine Rich library rendering
# ═══════════════════════════════════════════════════════════════
st.markdown("### 💻 RICH TUI TERMINAL")
# Get cascade stats
cascade_stats = {}
if CASCADE_AVAILABLE and cascade_store:
try:
cascade_stats = cascade_store.stats()
except:
cascade_stats = {}
# Render REAL Rich TUI and export to SVG
rich_svg = render_rich_tui(
events=st.session_state.events, # ALL events, no truncation
cascade_stats=cascade_stats,
lineage_data=lineage_data,
cascade_store=cascade_store if CASCADE_AVAILABLE else None
)
# Use st.components.html() which properly renders SVG with embedded
{rich_svg}