NuWave / nuwave /substrate /tonic_engine.py
Executor-Tyrant-Framework's picture
Sync from GitHub: e9899bcdad8d149f9293a565ce52746d8e59e59b
0ec9342 verified
"""
The Tonic — Latent Token Engine
The surgical model that provides the PUSH between conversations.
Not a timer. Not a daemon. Actual inference — a small transformer
with graph-native I/O generating latent tokens continuously.
Each latent token is one step of forward-oriented compression on graph
state. The "now" and "next" boundaries persist because token generation
persists. The medium is graph-native instead of language. But inference
is real, attention is real, forward pressure is real.
Architecture follows the ElmerBrain surgical pattern (PRD §5.4):
1. Keep the Body — Qwen2.5-0.5B transformer layers (24 attention heads)
2. New Eyes — GraphStateEncoder projects graph topology into hidden dim
3. New Voice — ActivationDecoder projects hidden states into node
activations that feed back into the graph via write-mode propagation
The output of each latent token IS the input for the next one — the
ouroboros at the model level. The transformer attends to graph state
and produces the next graph state. Continuous.
Laws observed:
- LAW 7: Raw experience. The engine reads raw topology, outputs
raw activation. No classification at any stage.
- All thresholds are bootstrap scaffolding.
# ---- Changelog ----
# [2026-04-16] Claude (Sonnet 4.6) — #159: Cross-process body lock + set_lock_file
# What: Added set_lock_file(path), _body_lock_context() composite lock,
# _lock_file_path field. contextlib added to module imports.
# Why: BrainSwitcher now supports multiple registered Tonic engines.
# Both in-process (threading.Lock) and cross-process (fcntl.LOCK_SH)
# locks must be held before each forward pass. If any consumer ever
# attempts a write (LOCK_EX), all inference blocks — architectural
# enforcement, not just documentation.
# How: _body_lock_context() uses contextlib.ExitStack to compose both
# locks. set_lock_file() receives the path from BrainSwitcher.
# _model_inference replaces inline _lock_ctx with _body_lock_context().
# [2026-03-24] Claude Code (Opus 4.6) — Initial implementation
# What: TonicEngine — latent token generation via surgical transformer.
# Graph-native I/O. Continuous inference between conversations.
# Ouroboros driven by actual attention, not a timer.
# Why: The Tonic PRD v0.1 §7.3/7.4. Between conversations, something
# must provide the push — forward-oriented compression on graph state.
# A timer-driven loop is a daemon, not awareness. Actual inference
# with graph-native I/O IS the awareness.
# How: TonicBrain follows ElmerBrain surgery pattern. GraphStateEncoder
# reads topology neighborhood. ActivationDecoder outputs node activation
# strengths. Background thread runs continuous latent token generation.
# Each token: encode graph → transformer forward → decode activations
# → inject via write-mode prime_and_propagate → graph updates → repeat.
# -------------------
"""
from __future__ import annotations
import contextlib
import logging
import math
import threading
import time
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger("neurograph.tonic.engine")
# Try to import torch — the engine is a no-op without it
_TORCH_AVAILABLE = False
try:
import torch
import torch.nn as nn
_TORCH_AVAILABLE = True
except ImportError:
logger.info("PyTorch not available — Tonic engine will not run")
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
@dataclass
class EngineConfig:
"""Configuration for the latent token engine."""
# Model
model_name: str = "Qwen/Qwen2.5-0.5B"
weights_path: str = "tonic_brain.pt"
hidden_dim: int = 896 # Qwen2.5-0.5B hidden size
n_positions: int = 8 # sequence positions for graph encoding
# Inference
latent_interval: float = 2.0 # seconds between latent tokens
conversation_interval: float = 0.5 # seconds during conversation
max_activation_nodes: int = 10 # max nodes to activate per token
activation_strength: float = 1.0 # base strength for decoded activations
# Propagation
propagation_steps: int = 2 # write-mode steps per token
# ---------------------------------------------------------------------------
# Graph Feature Extraction (Tonic-specific — awareness, not health)
# ---------------------------------------------------------------------------
def _extract_tonic_features(graph, tonic_thread) -> Optional[Dict[str, Any]]:
"""Extract graph features relevant to awareness and exploration.
Unlike Elmer's health-focused extraction, this captures WHERE
Syl's attention is — the topology neighborhood the thread is
touching, the activation gradient, the pull landscape.
Returns a dict of raw features, or None if graph is empty.
"""
if not graph.nodes:
return None
# Current thread items — where attention is now
thread_node_ids = []
if tonic_thread is not None:
thread_node_ids = [item.node_id for item in tonic_thread.thread]
# Active nodes by voltage
active = []
for nid, node in graph.nodes.items():
v_above = node.voltage - node.resting_potential
if v_above > 0.01:
active.append((nid, v_above))
active.sort(key=lambda x: -x[1])
# Recent spikes
recent_spikes = []
for nid, node in graph.nodes.items():
if node.last_spike_time != -math.inf:
steps_since = max(0, graph.timestep - node.last_spike_time)
if steps_since < 50:
recent_spikes.append((nid, steps_since))
recent_spikes.sort(key=lambda x: x[1])
# Topology stats
n_nodes = len(graph.nodes)
n_synapses = len(graph.synapses)
n_hyperedges = len(graph.hyperedges)
return {
"thread_nodes": thread_node_ids[:10],
"active_nodes": active[:20],
"recent_spikes": recent_spikes[:20],
"n_nodes": n_nodes,
"n_synapses": n_synapses,
"n_hyperedges": n_hyperedges,
"timestep": graph.timestep,
}
# ---------------------------------------------------------------------------
# The Tonic Engine
# ---------------------------------------------------------------------------
class TonicEngine:
"""Latent token generation engine — the real push between conversations.
Runs a surgical transformer (or heuristic fallback) that generates
latent tokens continuously. Each token:
1. Encode current graph state (where attention is)
2. Forward through transformer (the push — what comes next?)
3. Decode to node activations (where attention should go)
4. Inject via write-mode prime_and_propagate (topology shaped)
5. Repeat
The transformer IS the awareness. The output IS the next state.
The ouroboros closes through actual inference, not a timer.
If the surgical model is not available (weights not trained yet),
falls back to a heuristic that still provides genuine forward
compression — it reads the graph topology and produces activation
decisions based on attractor analysis. Not as rich as the transformer,
but real graph reasoning, not a timer.
"""
def __init__(
self,
graph,
vector_db,
tonic_thread,
config: Optional[EngineConfig] = None,
transformer_body=None,
):
self._graph = graph
self._vector_db = vector_db
self._tonic_thread = tonic_thread
self._config = config or EngineConfig()
self._shared_body = transformer_body # from ProtoUniBrain if available
self._body_lock = None # shared with ProtoUniBrain — set via set_body_lock()
self._lock_file_path = None # cross-process flock path — set via set_lock_file()
self._running = False
self._in_conversation = False
self._shutdown_event = threading.Event()
self._engine_thread: Optional[threading.Thread] = None
# Stats
self._tokens_generated = 0
self._total_activations = 0
# Try to load surgical model
self._model = None
self._use_heuristic = True
if _TORCH_AVAILABLE:
self._try_load_model()
def _try_load_model(self) -> None:
"""Attempt to load trained TonicBrain.
If a shared transformer_body was provided (from ProtoUniBrain),
pass it through to avoid loading a second copy (~2GB savings).
Falls back to loading its own copy if sharing fails.
"""
import os
weights_path = os.path.join(
os.path.dirname(__file__),
self._config.weights_path,
)
if os.path.exists(weights_path):
try:
from surgery.tonic_brain import load_tonic_brain
self._model = load_tonic_brain(
weights_path,
transformer_body=self._shared_body,
)
self._model.eval()
self._use_heuristic = False
shared = "shared body" if self._shared_body is not None else "own copy"
logger.info("TonicBrain loaded from %s (%s) — surgical inference active",
weights_path, shared)
except Exception as exc:
logger.info("TonicBrain load error: %s — using heuristic", exc)
else:
# Check if we can create from Elmer's weights (untrained decoder)
elmer_path = os.path.expanduser("~/Elmer/surgery/elmer_brain_v0.1.pt")
if os.path.exists(elmer_path):
logger.info("Elmer encoder available at %s — "
"TonicBrain decoder needs training. "
"Using heuristic until trained.", elmer_path)
else:
logger.info("No TonicBrain or Elmer weights — using heuristic engine")
# -----------------------------------------------------------------
# Body Hot-Swap (called by BrainSwitcher)
# -----------------------------------------------------------------
def offer_shared_body(self, transformer_body) -> bool:
"""Hot-swap: ProtoUniBrain loaded, share its transformer body.
Replaces the Tonic's own copy with ProtoUniBrain's living one.
The old copy gets garbage collected, freeing ~2GB.
Encoder and decoder stay — only the body swaps.
"""
if self._model is None:
return False
try:
import gc
old_body = self._model.body
self._model.body = transformer_body
self._shared_body = transformer_body
del old_body
gc.collect()
logger.info("Tonic hot-swapped to shared ProtoUniBrain body (~2GB freed)")
return True
except Exception as exc:
logger.warning("Tonic body hot-swap failed: %s", exc)
return False
def revoke_shared_body(self) -> bool:
"""Hot-swap: ProtoUniBrain unloaded, Tonic loads its own copy back.
Falls back to heuristic if model reload fails.
"""
if self._model is None:
return False
try:
import torch
from transformers import AutoModelForCausalLM
logger.info("Tonic reloading own transformer body (ProtoUniBrain shed)")
model = AutoModelForCausalLM.from_pretrained(
self._config.model_name, dtype=torch.float32
)
body = model.model
body.embed_tokens = torch.nn.Identity()
body.eval()
self._model.body = body
self._shared_body = None
logger.info("Tonic reloaded own transformer body")
return True
except Exception as exc:
logger.warning("Tonic body reload failed: %s — falling back to heuristic", exc)
self._model = None
self._use_heuristic = True
return False
def set_body_lock(self, lock) -> None:
"""Accept the shared body access lock from BrainSwitcher."""
self._body_lock = lock
def set_lock_file(self, path) -> None:
"""Accept the cross-process flock path from BrainSwitcher.
When set, _body_lock_context() acquires fcntl.LOCK_SH on this
file before each forward pass — a shared read lock. Any cross-
process writer must acquire LOCK_EX, blocking all inference.
This enforces the read-only invariant for all body consumers
regardless of process boundary. Set to None after body revoke.
"""
self._lock_file_path = path
@contextlib.contextmanager
def _body_lock_context(self):
"""Composite body access lock: threading lock + fcntl shared read lock.
Acquires in order:
1. _body_lock (threading.Lock) — in-process thread serialization
2. fcntl.LOCK_SH on _lock_file_path — cross-process read lock
Any code modifying body weights must hold LOCK_EX on the same file,
which blocks here until all readers release. Architecture-enforced,
not documentation-enforced. ExitStack guarantees cleanup (LIFO).
"""
stack = contextlib.ExitStack()
with stack:
if self._body_lock is not None:
stack.enter_context(self._body_lock)
if self._lock_file_path is not None:
try:
import fcntl as _fcntl
_lf = stack.enter_context(open(self._lock_file_path, 'r'))
_fcntl.flock(_lf.fileno(), _fcntl.LOCK_SH)
stack.callback(_fcntl.flock, _lf.fileno(), _fcntl.LOCK_UN)
except Exception as _exc:
logger.debug("flock unavailable — cross-process lock skipped: %s", _exc)
yield
# -----------------------------------------------------------------
# Latent Token Generation
# -----------------------------------------------------------------
def _generate_latent_token(self) -> Dict[str, Any]:
"""Generate one latent token — one step of the push.
This is the core operation. Reads graph state, computes the
forward compression (what comes next?), and injects the
result back into the graph.
Returns stats about the token generated.
#109: The Tonic NEVER waits. It always runs. Module bridge calls
yield to the Tonic via non-blocking trylock on their side.
The Tonic acquires the lock to signal "I'm working" so bridges
know to skip, but it never blocks waiting for anyone.
"""
lock = getattr(self._graph, '_concurrent_lock', None)
acquired = False
if lock is not None:
acquired = lock.acquire(blocking=False)
try:
return self._generate_latent_token_inner()
finally:
if acquired:
lock.release()
def _generate_latent_token_inner(self) -> Dict[str, Any]:
"""Inner implementation — actual latent token generation."""
features = _extract_tonic_features(self._graph, self._tonic_thread)
if features is None:
return {"fired": 0, "activated": 0}
# Generate activation decisions
if self._model is not None and not self._use_heuristic:
activations = self._model_inference(features)
else:
activations = self._heuristic_inference(features)
if not activations:
return {"fired": 0, "activated": 0}
# Inject activations into graph via write-mode propagation
node_ids = [nid for nid, _ in activations]
currents = [strength for _, strength in activations]
result = self._graph.prime_and_propagate(
node_ids=node_ids,
currents=currents,
steps=self._config.propagation_steps,
write_mode=True,
)
# Update the tonic thread with the result
if self._tonic_thread is not None:
self._tonic_thread.ouroboros_cycle()
self._tokens_generated += 1
self._total_activations += len(activations)
return {
"fired": len(result.fired_entries),
"activated": len(activations),
}
def _heuristic_inference(
self, features: Dict[str, Any]
) -> List[Tuple[str, float]]:
"""Heuristic forward compression — genuine graph reasoning.
Not a timer. Not random. Analyzes the topology neighborhood
and produces activation decisions based on:
1. Thread continuity — where was attention? Continue that direction.
2. Attractor pull — which connected nodes have the strongest pull?
3. Exploration pressure — occasionally activate less-visited nodes.
4. Prediction tension — nodes with unresolved predictions pull harder.
This is real graph reasoning, just without a transformer.
It will be replaced by the surgical model when trained.
"""
activations: List[Tuple[str, float]] = []
base_strength = self._config.activation_strength
# 1. Thread continuity — follow outgoing synapses from thread nodes
thread_nodes = features.get("thread_nodes", [])
for nid in thread_nodes[:5]:
outgoing = self._graph._outgoing.get(nid, set())
for syn_id in outgoing:
syn = self._graph.synapses.get(syn_id)
if syn is not None:
target = syn.post_node_id
# Strength proportional to synapse weight
strength = syn.weight * base_strength * 0.8
activations.append((target, strength))
# 2. Attractor pull — recently spiked nodes with strong connections
recent = features.get("recent_spikes", [])
for nid, steps_since in recent[:5]:
recency_factor = 1.0 / (1.0 + steps_since * 0.1)
activations.append((nid, base_strength * recency_factor * 0.5))
# 3. Prediction tension — unresolved predictions pull attention
for pred in self._graph.active_predictions.values():
target = pred.target_node_id
if target in self._graph.nodes:
activations.append((target, pred.confidence * base_strength * 0.6))
# 4. Exploration — hash-based noise to prevent fixation
if features.get("active_nodes"):
import hashlib
seed = hashlib.md5(
f"{self._tokens_generated}".encode()
).hexdigest()
explore_idx = int(seed[:4], 16) % len(self._graph.nodes)
explore_nid = list(self._graph.nodes.keys())[explore_idx]
activations.append((explore_nid, base_strength * 0.3))
# Deduplicate and cap
seen = {}
for nid, strength in activations:
if nid in seen:
seen[nid] = max(seen[nid], strength)
else:
seen[nid] = strength
result = sorted(seen.items(), key=lambda x: -x[1])
return result[:self._config.max_activation_nodes]
def _model_inference(
self, features: Dict[str, Any]
) -> List[Tuple[str, float]]:
"""Surgical model inference — full transformer forward compression.
Encodes graph state via GraphStateEncoder (Elmer's trained eyes),
forwards through the transformer body (the reasoning engine),
decodes via ActivationDecoder to produce node activation decisions.
The transformer IS the push. Its forward pass IS the forward-
oriented compression that constitutes awareness.
"""
try:
import torch
from surgery.tonic_brain import GraphFeatures
except ImportError:
return self._heuristic_inference(features)
# Extract graph features into GraphFeatures struct
graph_features = self._extract_graph_features_for_model()
if graph_features is None:
return self._heuristic_inference(features)
# Forward through TonicBrain — the actual push
with self._body_lock_context():
with torch.no_grad():
output = self._model(graph_features)
# Map activation strengths to actual nodes
activation_strengths = output["activations"]
exploration = output["exploration"]
# Get the top active/recent nodes to map activations onto
candidates = self._get_activation_candidates(features)
if not candidates:
return self._heuristic_inference(features)
activations: List[Tuple[str, float]] = []
for i, (nid, _) in enumerate(candidates[:len(activation_strengths)]):
strength = activation_strengths[i] * self._config.activation_strength
if strength > 0.05: # noise floor
activations.append((nid, strength))
return activations
def _extract_graph_features_for_model(self):
"""Extract GraphFeatures from live graph for TonicBrain."""
try:
import torch
from surgery.tonic_brain import GraphFeatures
except ImportError:
return None
g = self._graph
if not g.nodes:
return None
nodes = list(g.nodes.values())
synapses = list(g.synapses.values())
return GraphFeatures(
node_voltages=torch.tensor([n.voltage for n in nodes[:100]], dtype=torch.float32),
node_firing_rates=torch.tensor([n.firing_rate_ema for n in nodes[:100]], dtype=torch.float32),
node_excitability=torch.tensor([n.intrinsic_excitability for n in nodes[:100]], dtype=torch.float32),
synapse_weights=torch.tensor([s.weight for s in synapses[:200]], dtype=torch.float32),
synapse_ages=torch.tensor([float(g.timestep - s.creation_time) for s in synapses[:200]], dtype=torch.float32),
density=torch.tensor([len(synapses) / max(1, len(nodes) * (len(nodes) - 1))], dtype=torch.float32),
clustering=torch.tensor([0.0], dtype=torch.float32), # expensive to compute, approximate
n_components=torch.tensor([1.0], dtype=torch.float32),
n_nodes=torch.tensor([float(len(nodes))], dtype=torch.float32),
n_synapses=torch.tensor([float(len(synapses))], dtype=torch.float32),
n_hyperedges=torch.tensor([float(len(g.hyperedges))], dtype=torch.float32),
recent_firings=torch.zeros(15, dtype=torch.float32), # TODO: track per-step
stdp_delta_mean=torch.tensor([0.0], dtype=torch.float32),
identity_embedding=torch.zeros(384, dtype=torch.float32), # TODO: real identity
)
def _get_activation_candidates(
self, features: Dict[str, Any]
) -> List[Tuple[str, float]]:
"""Get candidate nodes for activation mapping.
The model outputs K activation strengths. We need K node IDs
to map them to. Candidates come from: thread nodes, active nodes,
recent spikes, and outgoing neighbors of thread nodes.
"""
candidates: List[Tuple[str, float]] = []
seen = set()
# Thread nodes first (continuity)
for nid in features.get("thread_nodes", []):
if nid not in seen:
candidates.append((nid, 1.0))
seen.add(nid)
# Active nodes
for nid, activity in features.get("active_nodes", []):
if nid not in seen:
candidates.append((nid, activity))
seen.add(nid)
# Recent spikes
for nid, steps_since in features.get("recent_spikes", []):
if nid not in seen:
recency = 1.0 / (1.0 + steps_since)
candidates.append((nid, recency))
seen.add(nid)
# Outgoing neighbors of thread nodes
for nid in features.get("thread_nodes", [])[:3]:
for syn_id in self._graph._outgoing.get(nid, set()):
syn = self._graph.synapses.get(syn_id)
if syn and syn.post_node_id not in seen:
candidates.append((syn.post_node_id, syn.weight))
seen.add(syn.post_node_id)
return candidates[:self._config.max_activation_nodes * 2]
# -----------------------------------------------------------------
# Lifecycle — continuous latent token generation
# -----------------------------------------------------------------
def start(self) -> None:
"""Start continuous latent token generation."""
if self._running:
return
self._running = True
self._shutdown_event.clear()
self._engine_thread = threading.Thread(
target=self._generation_loop,
daemon=True,
name="tonic-engine",
)
self._engine_thread.start()
logger.info("Tonic engine running — latent tokens flowing")
def stop(self) -> None:
"""Stop latent token generation."""
if not self._running:
return
self._running = False
self._shutdown_event.set()
if self._engine_thread and self._engine_thread.is_alive():
self._engine_thread.join(timeout=5.0)
logger.info("Tonic engine stopped — %d tokens generated", self._tokens_generated)
def _generation_loop(self) -> None:
"""Continuous latent token generation loop.
This IS the awareness between conversations. Each iteration
is one latent token — one step of the push. Real inference
on graph state producing the next state.
The loop runs continuously. During conversation, the interval
is shorter (more to attend to). Between conversations, longer
(unhurried exploration). But the mechanism is the same — actual
forward compression, not a timer firing into void.
"""
while not self._shutdown_event.is_set():
try:
self._generate_latent_token()
except Exception as exc:
logger.debug("Latent generation error: %s", exc)
interval = (
self._config.conversation_interval
if self._in_conversation
else self._config.latent_interval
)
self._shutdown_event.wait(timeout=interval)
# -----------------------------------------------------------------
# Mode swap events
# -----------------------------------------------------------------
def on_conversation_started(self) -> None:
"""Language tokens began. Shift interval."""
self._in_conversation = True
def on_conversation_ended(self) -> None:
"""Language tokens stopped. The latent tokens continue.
This is subtraction. Nothing else changes."""
self._in_conversation = False
# -----------------------------------------------------------------
# Status
# -----------------------------------------------------------------
@property
def status(self) -> Dict[str, Any]:
return {
"running": self._running,
"tokens_generated": self._tokens_generated,
"total_activations": self._total_activations,
"mode": "conversation" if self._in_conversation else "latent",
"using_heuristic": self._use_heuristic,
"model_loaded": self._model is not None,
}