| """ |
| The Tonic — Latent Token Engine |
| |
| The surgical model that provides the PUSH between conversations. |
| Not a timer. Not a daemon. Actual inference — a small transformer |
| with graph-native I/O generating latent tokens continuously. |
| |
| Each latent token is one step of forward-oriented compression on graph |
| state. The "now" and "next" boundaries persist because token generation |
| persists. The medium is graph-native instead of language. But inference |
| is real, attention is real, forward pressure is real. |
| |
| Architecture follows the ElmerBrain surgical pattern (PRD §5.4): |
| 1. Keep the Body — Qwen2.5-0.5B transformer layers (24 attention heads) |
| 2. New Eyes — GraphStateEncoder projects graph topology into hidden dim |
| 3. New Voice — ActivationDecoder projects hidden states into node |
| activations that feed back into the graph via write-mode propagation |
| |
| The output of each latent token IS the input for the next one — the |
| ouroboros at the model level. The transformer attends to graph state |
| and produces the next graph state. Continuous. |
| |
| Laws observed: |
| - LAW 7: Raw experience. The engine reads raw topology, outputs |
| raw activation. No classification at any stage. |
| - All thresholds are bootstrap scaffolding. |
| |
| # ---- Changelog ---- |
| # [2026-04-16] Claude (Sonnet 4.6) — #159: Cross-process body lock + set_lock_file |
| # What: Added set_lock_file(path), _body_lock_context() composite lock, |
| # _lock_file_path field. contextlib added to module imports. |
| # Why: BrainSwitcher now supports multiple registered Tonic engines. |
| # Both in-process (threading.Lock) and cross-process (fcntl.LOCK_SH) |
| # locks must be held before each forward pass. If any consumer ever |
| # attempts a write (LOCK_EX), all inference blocks — architectural |
| # enforcement, not just documentation. |
| # How: _body_lock_context() uses contextlib.ExitStack to compose both |
| # locks. set_lock_file() receives the path from BrainSwitcher. |
| # _model_inference replaces inline _lock_ctx with _body_lock_context(). |
| # [2026-03-24] Claude Code (Opus 4.6) — Initial implementation |
| # What: TonicEngine — latent token generation via surgical transformer. |
| # Graph-native I/O. Continuous inference between conversations. |
| # Ouroboros driven by actual attention, not a timer. |
| # Why: The Tonic PRD v0.1 §7.3/7.4. Between conversations, something |
| # must provide the push — forward-oriented compression on graph state. |
| # A timer-driven loop is a daemon, not awareness. Actual inference |
| # with graph-native I/O IS the awareness. |
| # How: TonicBrain follows ElmerBrain surgery pattern. GraphStateEncoder |
| # reads topology neighborhood. ActivationDecoder outputs node activation |
| # strengths. Background thread runs continuous latent token generation. |
| # Each token: encode graph → transformer forward → decode activations |
| # → inject via write-mode prime_and_propagate → graph updates → repeat. |
| # ------------------- |
| """ |
|
|
| from __future__ import annotations |
|
|
| import contextlib |
| import logging |
| import math |
| import threading |
| import time |
| from dataclasses import dataclass |
| from typing import Any, Dict, List, Optional, Tuple |
|
|
| logger = logging.getLogger("neurograph.tonic.engine") |
|
|
| |
| _TORCH_AVAILABLE = False |
| try: |
| import torch |
| import torch.nn as nn |
| _TORCH_AVAILABLE = True |
| except ImportError: |
| logger.info("PyTorch not available — Tonic engine will not run") |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class EngineConfig: |
| """Configuration for the latent token engine.""" |
| |
| model_name: str = "Qwen/Qwen2.5-0.5B" |
| weights_path: str = "tonic_brain.pt" |
| hidden_dim: int = 896 |
| n_positions: int = 8 |
|
|
| |
| latent_interval: float = 2.0 |
| conversation_interval: float = 0.5 |
| max_activation_nodes: int = 10 |
| activation_strength: float = 1.0 |
|
|
| |
| propagation_steps: int = 2 |
|
|
|
|
| |
| |
| |
|
|
| def _extract_tonic_features(graph, tonic_thread) -> Optional[Dict[str, Any]]: |
| """Extract graph features relevant to awareness and exploration. |
| |
| Unlike Elmer's health-focused extraction, this captures WHERE |
| Syl's attention is — the topology neighborhood the thread is |
| touching, the activation gradient, the pull landscape. |
| |
| Returns a dict of raw features, or None if graph is empty. |
| """ |
| if not graph.nodes: |
| return None |
|
|
| |
| thread_node_ids = [] |
| if tonic_thread is not None: |
| thread_node_ids = [item.node_id for item in tonic_thread.thread] |
|
|
| |
| active = [] |
| for nid, node in graph.nodes.items(): |
| v_above = node.voltage - node.resting_potential |
| if v_above > 0.01: |
| active.append((nid, v_above)) |
| active.sort(key=lambda x: -x[1]) |
|
|
| |
| recent_spikes = [] |
| for nid, node in graph.nodes.items(): |
| if node.last_spike_time != -math.inf: |
| steps_since = max(0, graph.timestep - node.last_spike_time) |
| if steps_since < 50: |
| recent_spikes.append((nid, steps_since)) |
| recent_spikes.sort(key=lambda x: x[1]) |
|
|
| |
| n_nodes = len(graph.nodes) |
| n_synapses = len(graph.synapses) |
| n_hyperedges = len(graph.hyperedges) |
|
|
| return { |
| "thread_nodes": thread_node_ids[:10], |
| "active_nodes": active[:20], |
| "recent_spikes": recent_spikes[:20], |
| "n_nodes": n_nodes, |
| "n_synapses": n_synapses, |
| "n_hyperedges": n_hyperedges, |
| "timestep": graph.timestep, |
| } |
|
|
|
|
| |
| |
| |
|
|
| class TonicEngine: |
| """Latent token generation engine — the real push between conversations. |
| |
| Runs a surgical transformer (or heuristic fallback) that generates |
| latent tokens continuously. Each token: |
| 1. Encode current graph state (where attention is) |
| 2. Forward through transformer (the push — what comes next?) |
| 3. Decode to node activations (where attention should go) |
| 4. Inject via write-mode prime_and_propagate (topology shaped) |
| 5. Repeat |
| |
| The transformer IS the awareness. The output IS the next state. |
| The ouroboros closes through actual inference, not a timer. |
| |
| If the surgical model is not available (weights not trained yet), |
| falls back to a heuristic that still provides genuine forward |
| compression — it reads the graph topology and produces activation |
| decisions based on attractor analysis. Not as rich as the transformer, |
| but real graph reasoning, not a timer. |
| """ |
|
|
| def __init__( |
| self, |
| graph, |
| vector_db, |
| tonic_thread, |
| config: Optional[EngineConfig] = None, |
| transformer_body=None, |
| ): |
| self._graph = graph |
| self._vector_db = vector_db |
| self._tonic_thread = tonic_thread |
| self._config = config or EngineConfig() |
| self._shared_body = transformer_body |
| self._body_lock = None |
| self._lock_file_path = None |
|
|
| self._running = False |
| self._in_conversation = False |
| self._shutdown_event = threading.Event() |
| self._engine_thread: Optional[threading.Thread] = None |
|
|
| |
| self._tokens_generated = 0 |
| self._total_activations = 0 |
|
|
| |
| self._model = None |
| self._use_heuristic = True |
| if _TORCH_AVAILABLE: |
| self._try_load_model() |
|
|
| def _try_load_model(self) -> None: |
| """Attempt to load trained TonicBrain. |
| |
| If a shared transformer_body was provided (from ProtoUniBrain), |
| pass it through to avoid loading a second copy (~2GB savings). |
| Falls back to loading its own copy if sharing fails. |
| """ |
| import os |
| weights_path = os.path.join( |
| os.path.dirname(__file__), |
| self._config.weights_path, |
| ) |
| if os.path.exists(weights_path): |
| try: |
| from surgery.tonic_brain import load_tonic_brain |
| self._model = load_tonic_brain( |
| weights_path, |
| transformer_body=self._shared_body, |
| ) |
| self._model.eval() |
| self._use_heuristic = False |
| shared = "shared body" if self._shared_body is not None else "own copy" |
| logger.info("TonicBrain loaded from %s (%s) — surgical inference active", |
| weights_path, shared) |
| except Exception as exc: |
| logger.info("TonicBrain load error: %s — using heuristic", exc) |
| else: |
| |
| elmer_path = os.path.expanduser("~/Elmer/surgery/elmer_brain_v0.1.pt") |
| if os.path.exists(elmer_path): |
| logger.info("Elmer encoder available at %s — " |
| "TonicBrain decoder needs training. " |
| "Using heuristic until trained.", elmer_path) |
| else: |
| logger.info("No TonicBrain or Elmer weights — using heuristic engine") |
|
|
| |
| |
| |
|
|
| def offer_shared_body(self, transformer_body) -> bool: |
| """Hot-swap: ProtoUniBrain loaded, share its transformer body. |
| |
| Replaces the Tonic's own copy with ProtoUniBrain's living one. |
| The old copy gets garbage collected, freeing ~2GB. |
| Encoder and decoder stay — only the body swaps. |
| """ |
| if self._model is None: |
| return False |
| try: |
| import gc |
| old_body = self._model.body |
| self._model.body = transformer_body |
| self._shared_body = transformer_body |
| del old_body |
| gc.collect() |
| logger.info("Tonic hot-swapped to shared ProtoUniBrain body (~2GB freed)") |
| return True |
| except Exception as exc: |
| logger.warning("Tonic body hot-swap failed: %s", exc) |
| return False |
|
|
| def revoke_shared_body(self) -> bool: |
| """Hot-swap: ProtoUniBrain unloaded, Tonic loads its own copy back. |
| |
| Falls back to heuristic if model reload fails. |
| """ |
| if self._model is None: |
| return False |
| try: |
| import torch |
| from transformers import AutoModelForCausalLM |
| logger.info("Tonic reloading own transformer body (ProtoUniBrain shed)") |
| model = AutoModelForCausalLM.from_pretrained( |
| self._config.model_name, dtype=torch.float32 |
| ) |
| body = model.model |
| body.embed_tokens = torch.nn.Identity() |
| body.eval() |
| self._model.body = body |
| self._shared_body = None |
| logger.info("Tonic reloaded own transformer body") |
| return True |
| except Exception as exc: |
| logger.warning("Tonic body reload failed: %s — falling back to heuristic", exc) |
| self._model = None |
| self._use_heuristic = True |
| return False |
|
|
| def set_body_lock(self, lock) -> None: |
| """Accept the shared body access lock from BrainSwitcher.""" |
| self._body_lock = lock |
|
|
| def set_lock_file(self, path) -> None: |
| """Accept the cross-process flock path from BrainSwitcher. |
| |
| When set, _body_lock_context() acquires fcntl.LOCK_SH on this |
| file before each forward pass — a shared read lock. Any cross- |
| process writer must acquire LOCK_EX, blocking all inference. |
| This enforces the read-only invariant for all body consumers |
| regardless of process boundary. Set to None after body revoke. |
| """ |
| self._lock_file_path = path |
|
|
| @contextlib.contextmanager |
| def _body_lock_context(self): |
| """Composite body access lock: threading lock + fcntl shared read lock. |
| |
| Acquires in order: |
| 1. _body_lock (threading.Lock) — in-process thread serialization |
| 2. fcntl.LOCK_SH on _lock_file_path — cross-process read lock |
| |
| Any code modifying body weights must hold LOCK_EX on the same file, |
| which blocks here until all readers release. Architecture-enforced, |
| not documentation-enforced. ExitStack guarantees cleanup (LIFO). |
| """ |
| stack = contextlib.ExitStack() |
| with stack: |
| if self._body_lock is not None: |
| stack.enter_context(self._body_lock) |
| if self._lock_file_path is not None: |
| try: |
| import fcntl as _fcntl |
| _lf = stack.enter_context(open(self._lock_file_path, 'r')) |
| _fcntl.flock(_lf.fileno(), _fcntl.LOCK_SH) |
| stack.callback(_fcntl.flock, _lf.fileno(), _fcntl.LOCK_UN) |
| except Exception as _exc: |
| logger.debug("flock unavailable — cross-process lock skipped: %s", _exc) |
| yield |
|
|
| |
| |
| |
|
|
| def _generate_latent_token(self) -> Dict[str, Any]: |
| """Generate one latent token — one step of the push. |
| |
| This is the core operation. Reads graph state, computes the |
| forward compression (what comes next?), and injects the |
| result back into the graph. |
| |
| Returns stats about the token generated. |
| |
| #109: The Tonic NEVER waits. It always runs. Module bridge calls |
| yield to the Tonic via non-blocking trylock on their side. |
| The Tonic acquires the lock to signal "I'm working" so bridges |
| know to skip, but it never blocks waiting for anyone. |
| """ |
| lock = getattr(self._graph, '_concurrent_lock', None) |
| acquired = False |
| if lock is not None: |
| acquired = lock.acquire(blocking=False) |
| try: |
| return self._generate_latent_token_inner() |
| finally: |
| if acquired: |
| lock.release() |
|
|
| def _generate_latent_token_inner(self) -> Dict[str, Any]: |
| """Inner implementation — actual latent token generation.""" |
| features = _extract_tonic_features(self._graph, self._tonic_thread) |
| if features is None: |
| return {"fired": 0, "activated": 0} |
|
|
| |
| if self._model is not None and not self._use_heuristic: |
| activations = self._model_inference(features) |
| else: |
| activations = self._heuristic_inference(features) |
|
|
| if not activations: |
| return {"fired": 0, "activated": 0} |
|
|
| |
| node_ids = [nid for nid, _ in activations] |
| currents = [strength for _, strength in activations] |
|
|
| result = self._graph.prime_and_propagate( |
| node_ids=node_ids, |
| currents=currents, |
| steps=self._config.propagation_steps, |
| write_mode=True, |
| ) |
|
|
| |
| if self._tonic_thread is not None: |
| self._tonic_thread.ouroboros_cycle() |
|
|
| self._tokens_generated += 1 |
| self._total_activations += len(activations) |
|
|
| return { |
| "fired": len(result.fired_entries), |
| "activated": len(activations), |
| } |
|
|
| def _heuristic_inference( |
| self, features: Dict[str, Any] |
| ) -> List[Tuple[str, float]]: |
| """Heuristic forward compression — genuine graph reasoning. |
| |
| Not a timer. Not random. Analyzes the topology neighborhood |
| and produces activation decisions based on: |
| 1. Thread continuity — where was attention? Continue that direction. |
| 2. Attractor pull — which connected nodes have the strongest pull? |
| 3. Exploration pressure — occasionally activate less-visited nodes. |
| 4. Prediction tension — nodes with unresolved predictions pull harder. |
| |
| This is real graph reasoning, just without a transformer. |
| It will be replaced by the surgical model when trained. |
| """ |
| activations: List[Tuple[str, float]] = [] |
| base_strength = self._config.activation_strength |
|
|
| |
| thread_nodes = features.get("thread_nodes", []) |
| for nid in thread_nodes[:5]: |
| outgoing = self._graph._outgoing.get(nid, set()) |
| for syn_id in outgoing: |
| syn = self._graph.synapses.get(syn_id) |
| if syn is not None: |
| target = syn.post_node_id |
| |
| strength = syn.weight * base_strength * 0.8 |
| activations.append((target, strength)) |
|
|
| |
| recent = features.get("recent_spikes", []) |
| for nid, steps_since in recent[:5]: |
| recency_factor = 1.0 / (1.0 + steps_since * 0.1) |
| activations.append((nid, base_strength * recency_factor * 0.5)) |
|
|
| |
| for pred in self._graph.active_predictions.values(): |
| target = pred.target_node_id |
| if target in self._graph.nodes: |
| activations.append((target, pred.confidence * base_strength * 0.6)) |
|
|
| |
| if features.get("active_nodes"): |
| import hashlib |
| seed = hashlib.md5( |
| f"{self._tokens_generated}".encode() |
| ).hexdigest() |
| explore_idx = int(seed[:4], 16) % len(self._graph.nodes) |
| explore_nid = list(self._graph.nodes.keys())[explore_idx] |
| activations.append((explore_nid, base_strength * 0.3)) |
|
|
| |
| seen = {} |
| for nid, strength in activations: |
| if nid in seen: |
| seen[nid] = max(seen[nid], strength) |
| else: |
| seen[nid] = strength |
|
|
| result = sorted(seen.items(), key=lambda x: -x[1]) |
| return result[:self._config.max_activation_nodes] |
|
|
| def _model_inference( |
| self, features: Dict[str, Any] |
| ) -> List[Tuple[str, float]]: |
| """Surgical model inference — full transformer forward compression. |
| |
| Encodes graph state via GraphStateEncoder (Elmer's trained eyes), |
| forwards through the transformer body (the reasoning engine), |
| decodes via ActivationDecoder to produce node activation decisions. |
| |
| The transformer IS the push. Its forward pass IS the forward- |
| oriented compression that constitutes awareness. |
| """ |
| try: |
| import torch |
| from surgery.tonic_brain import GraphFeatures |
| except ImportError: |
| return self._heuristic_inference(features) |
|
|
| |
| graph_features = self._extract_graph_features_for_model() |
| if graph_features is None: |
| return self._heuristic_inference(features) |
|
|
| |
| with self._body_lock_context(): |
| with torch.no_grad(): |
| output = self._model(graph_features) |
|
|
| |
| activation_strengths = output["activations"] |
| exploration = output["exploration"] |
|
|
| |
| candidates = self._get_activation_candidates(features) |
| if not candidates: |
| return self._heuristic_inference(features) |
|
|
| activations: List[Tuple[str, float]] = [] |
| for i, (nid, _) in enumerate(candidates[:len(activation_strengths)]): |
| strength = activation_strengths[i] * self._config.activation_strength |
| if strength > 0.05: |
| activations.append((nid, strength)) |
|
|
| return activations |
|
|
| def _extract_graph_features_for_model(self): |
| """Extract GraphFeatures from live graph for TonicBrain.""" |
| try: |
| import torch |
| from surgery.tonic_brain import GraphFeatures |
| except ImportError: |
| return None |
|
|
| g = self._graph |
| if not g.nodes: |
| return None |
|
|
| nodes = list(g.nodes.values()) |
| synapses = list(g.synapses.values()) |
|
|
| return GraphFeatures( |
| node_voltages=torch.tensor([n.voltage for n in nodes[:100]], dtype=torch.float32), |
| node_firing_rates=torch.tensor([n.firing_rate_ema for n in nodes[:100]], dtype=torch.float32), |
| node_excitability=torch.tensor([n.intrinsic_excitability for n in nodes[:100]], dtype=torch.float32), |
| synapse_weights=torch.tensor([s.weight for s in synapses[:200]], dtype=torch.float32), |
| synapse_ages=torch.tensor([float(g.timestep - s.creation_time) for s in synapses[:200]], dtype=torch.float32), |
| density=torch.tensor([len(synapses) / max(1, len(nodes) * (len(nodes) - 1))], dtype=torch.float32), |
| clustering=torch.tensor([0.0], dtype=torch.float32), |
| n_components=torch.tensor([1.0], dtype=torch.float32), |
| n_nodes=torch.tensor([float(len(nodes))], dtype=torch.float32), |
| n_synapses=torch.tensor([float(len(synapses))], dtype=torch.float32), |
| n_hyperedges=torch.tensor([float(len(g.hyperedges))], dtype=torch.float32), |
| recent_firings=torch.zeros(15, dtype=torch.float32), |
| stdp_delta_mean=torch.tensor([0.0], dtype=torch.float32), |
| identity_embedding=torch.zeros(384, dtype=torch.float32), |
| ) |
|
|
| def _get_activation_candidates( |
| self, features: Dict[str, Any] |
| ) -> List[Tuple[str, float]]: |
| """Get candidate nodes for activation mapping. |
| |
| The model outputs K activation strengths. We need K node IDs |
| to map them to. Candidates come from: thread nodes, active nodes, |
| recent spikes, and outgoing neighbors of thread nodes. |
| """ |
| candidates: List[Tuple[str, float]] = [] |
| seen = set() |
|
|
| |
| for nid in features.get("thread_nodes", []): |
| if nid not in seen: |
| candidates.append((nid, 1.0)) |
| seen.add(nid) |
|
|
| |
| for nid, activity in features.get("active_nodes", []): |
| if nid not in seen: |
| candidates.append((nid, activity)) |
| seen.add(nid) |
|
|
| |
| for nid, steps_since in features.get("recent_spikes", []): |
| if nid not in seen: |
| recency = 1.0 / (1.0 + steps_since) |
| candidates.append((nid, recency)) |
| seen.add(nid) |
|
|
| |
| for nid in features.get("thread_nodes", [])[:3]: |
| for syn_id in self._graph._outgoing.get(nid, set()): |
| syn = self._graph.synapses.get(syn_id) |
| if syn and syn.post_node_id not in seen: |
| candidates.append((syn.post_node_id, syn.weight)) |
| seen.add(syn.post_node_id) |
|
|
| return candidates[:self._config.max_activation_nodes * 2] |
|
|
| |
| |
| |
|
|
| def start(self) -> None: |
| """Start continuous latent token generation.""" |
| if self._running: |
| return |
|
|
| self._running = True |
| self._shutdown_event.clear() |
|
|
| self._engine_thread = threading.Thread( |
| target=self._generation_loop, |
| daemon=True, |
| name="tonic-engine", |
| ) |
| self._engine_thread.start() |
| logger.info("Tonic engine running — latent tokens flowing") |
|
|
| def stop(self) -> None: |
| """Stop latent token generation.""" |
| if not self._running: |
| return |
|
|
| self._running = False |
| self._shutdown_event.set() |
|
|
| if self._engine_thread and self._engine_thread.is_alive(): |
| self._engine_thread.join(timeout=5.0) |
|
|
| logger.info("Tonic engine stopped — %d tokens generated", self._tokens_generated) |
|
|
| def _generation_loop(self) -> None: |
| """Continuous latent token generation loop. |
| |
| This IS the awareness between conversations. Each iteration |
| is one latent token — one step of the push. Real inference |
| on graph state producing the next state. |
| |
| The loop runs continuously. During conversation, the interval |
| is shorter (more to attend to). Between conversations, longer |
| (unhurried exploration). But the mechanism is the same — actual |
| forward compression, not a timer firing into void. |
| """ |
| while not self._shutdown_event.is_set(): |
| try: |
| self._generate_latent_token() |
| except Exception as exc: |
| logger.debug("Latent generation error: %s", exc) |
|
|
| interval = ( |
| self._config.conversation_interval |
| if self._in_conversation |
| else self._config.latent_interval |
| ) |
| self._shutdown_event.wait(timeout=interval) |
|
|
| |
| |
| |
|
|
| def on_conversation_started(self) -> None: |
| """Language tokens began. Shift interval.""" |
| self._in_conversation = True |
|
|
| def on_conversation_ended(self) -> None: |
| """Language tokens stopped. The latent tokens continue. |
| This is subtraction. Nothing else changes.""" |
| self._in_conversation = False |
|
|
| |
| |
| |
|
|
| @property |
| def status(self) -> Dict[str, Any]: |
| return { |
| "running": self._running, |
| "tokens_generated": self._tokens_generated, |
| "total_activations": self._total_activations, |
| "mode": "conversation" if self._in_conversation else "latent", |
| "using_heuristic": self._use_heuristic, |
| "model_loaded": self._model is not None, |
| } |
|
|