Working-in-a-Codemine / openclaw_hook.py
Codemine (QB)
fix(openclaw_hook): ingest error path, checkpoint config re-apply, tonic gate (#215)
ba60fcd
# ---- Changelog ----
# [2026-04-26] Codemine (BLK-FC-215) — Three targeted bug fixes
# What: (A) on_message() wraps ingestor.ingest() in try/except; result=None on failure; return dict gated.
# (B) graph.config.update(snn_config) re-applied after restore() so checkpoint cannot overwrite code defaults.
# (C) Unexpected TonicEngine init Exception escalated to logger.warning; _tonic_thread cleared to None.
# Why: (A) Unbound NameError on result.nodes_created if ingest throws inside the concurrent lock.
# (B) Pre-tuning config values bleed in from checkpoint on every container restart.
# (C) Unexpected engine failures were silently swallowed at info level, leaving half-configured tonic state.
# How: Targeted edits only. Intentional scope differences (no CES, no River, no BrainSwitcher) preserved.
# [2026-04-20] Codemine (BLK-NG-193) — Wire SimpleVectorDB persistence into save/load
# What: _vector_db_path added; __init__ loads sidecar if exists; save() writes it.
# Why: vector_db was recreated empty on every restart — recall() returned
# nothing after cold start. Fix: vector_db.npz sidecar alongside main.msgpack.
# How: SimpleVectorDB.save/load added in universal_ingestor.py (same spec).
# [2026-04-16] Claude (Sonnet 4.6) — Tonic wiring (heuristic mode)
# What: TonicThread + TonicEngine wired into NeuroGraphMemory. _concurrent_lock added
# to graph. ouroboros_cycle() called on every on_message(). Tonic status in stats().
# Why: Worker NG was dormant between spec executions — nodes never warmed up, nothing
# fired, zeros everywhere. TonicEngine background thread (2s idle / 0.5s active)
# keeps substrate alive via heuristic_inference(): thread continuity + attractor
# pull + prediction tension + exploration. No transformer weights on HF Spaces →
# _use_heuristic=True automatically. First-class inference path, not a stub.
# How: tonic_thread.py + tonic_engine.py vendored from NeuroGraph canonical.
# Simplified wiring vs canonical: no BTF River deposit (Codemine has no River),
# no BrainSwitcher body sharing (no ProtoUniBrain on HF).
# [2026-04-15] Claude (Sonnet 4.6) — v0.4.1 homeostasis audit + three-factor enable
# What: OPENCLAW_SNN_CONFIG: scaling_interval 100→25, threshold_ceiling 5.0 added,
# three_factor_enabled False→True, tonic disabled, stats() version bumped to 0.4.2
# Why: scaling_interval=100 means homeostatic scaling never fires in ephemeral workers.
# three_factor_enabled was False — reward learning never fired even with inject_reward.
# Tonic requires a persistent process; workers are ephemeral.
# -------------------
from __future__ import annotations
import logging
import os
import threading
import time
from pathlib import Path
from typing import Any, Dict, List, Optional
from neuro_foundation import Graph, CheckpointMode
from universal_ingestor import (
UniversalIngestor,
SimpleVectorDB,
SourceType,
get_ingestor_config,
)
logger = logging.getLogger("neurograph")
# OpenClaw-tuned SNN config: fast learning, tight causal windows
OPENCLAW_SNN_CONFIG = {
"learning_rate": 0.02,
"tau_plus": 15.0,
"tau_minus": 15.0,
"A_plus": 1.0,
"A_minus": 1.2,
"decay_rate": 0.95,
"default_threshold": 1.0,
"refractory_period": 2,
"max_weight": 5.0,
"target_firing_rate": 0.05,
"scaling_interval": 25, # v0.4.1: lowered from 100 — homeostatic scaling fires more often
"threshold_ceiling": 5.0, # v0.4.1: prevents runaway threshold growth
"weight_threshold": 0.01,
"grace_period": 500,
"inactivity_threshold": 1000,
"co_activation_window": 5,
"initial_sprouting_weight": 0.1,
# Predictive coding
"prediction_threshold": 3.0,
"prediction_pre_charge_factor": 0.3,
"prediction_window": 10,
"prediction_chain_decay": 0.7,
"prediction_max_chain_depth": 3,
"prediction_confirm_bonus": 0.01,
"prediction_error_penalty": 0.02,
"prediction_max_active": 1000,
"surprise_sprouting_weight": 0.1,
"three_factor_enabled": True, # reward learning enabled — inject_reward wired in worker_ng
# Tonic enabled — Gradio process is long-lived; TonicEngine runs in heuristic mode
# (no transformer weights on HF Spaces → _use_heuristic=True automatically).
# Background thread fires every 2s (idle) / 0.5s (active spec execution).
"tonic": {"enabled": True},
# Hypergraph
"he_pattern_completion_strength": 0.3,
"he_member_weight_lr": 0.05,
"he_threshold_lr": 0.01,
"he_discovery_window": 10,
"he_discovery_min_co_fires": 5,
"he_discovery_min_nodes": 3,
"he_consolidation_overlap": 0.8,
"he_experience_threshold": 100,
}
class NeuroGraphMemory:
"""Singleton cognitive memory layer for OpenClaw integration.
Wraps NeuroGraph's Graph + UniversalIngestor + SimpleVectorDB into a
single interface for message-level ingestion, learning, and recall.
Auto-saves every ``auto_save_interval`` messages (default 10).
Loads from the latest checkpoint on initialization if one exists.
"""
_instance: Optional[NeuroGraphMemory] = None
def __init__(
self,
workspace_dir: Optional[str] = None,
config: Optional[Dict[str, Any]] = None,
) -> None:
self._workspace_dir = Path(
workspace_dir
or os.environ.get("NEUROGRAPH_WORKSPACE_DIR", "~/.openclaw/neurograph")
).expanduser()
self._checkpoint_dir = self._workspace_dir / "checkpoints"
self._checkpoint_dir.mkdir(parents=True, exist_ok=True)
self._checkpoint_path = self._checkpoint_dir / "main.msgpack"
self._vector_db_path = self._checkpoint_dir / "vector_db.npz"
# Merge user config over OpenClaw defaults
snn_config = {**OPENCLAW_SNN_CONFIG, **(config or {})}
self.graph = Graph(config=snn_config)
# Concurrent lock — Tonic engine acquires non-blocking; hook ops
# acquire blocking (waiting for Tonic to finish before mutating graph).
if not hasattr(self.graph, '_concurrent_lock'):
self.graph._concurrent_lock = threading.RLock()
# Restore from checkpoint if one exists
if self._checkpoint_path.exists():
try:
self.graph.restore(str(self._checkpoint_path))
logger.info(
"Restored graph from %s (%d nodes, %d synapses)",
self._checkpoint_path,
len(self.graph.nodes),
len(self.graph.synapses),
)
self.graph.config.update(snn_config)
except Exception as exc:
logger.warning("Failed to restore checkpoint: %s", exc)
# Vector DB for semantic search
self.vector_db = SimpleVectorDB()
if self._vector_db_path.exists():
try:
self.vector_db.load(str(self._vector_db_path))
logger.info(
"Restored vector DB from %s (%d entries)",
self._vector_db_path, self.vector_db.count(),
)
except Exception as exc:
logger.warning("Failed to restore vector DB: %s", exc)
# Ingestor with OpenClaw project config
ingestor_config = get_ingestor_config("openclaw")
self.ingestor = UniversalIngestor(
self.graph, self.vector_db, config=ingestor_config
)
self._message_count = 0
self.auto_save_interval = 10
# --- The Tonic: Latent Thread + Engine ---
# Keeps the substrate alive between spec executions via continuous
# heuristic inference (thread continuity + attractor pull + prediction
# tension + exploration). No transformer weights needed — heuristic
# mode is the designed path for Codemine.
self._tonic_thread = None
tonic_conf = snn_config.get("tonic", {})
if tonic_conf.get("enabled", True):
try:
from tonic_thread import TonicThread, TonicConfig
tonic_config = TonicConfig()
for k, v in tonic_conf.items():
if k != "enabled" and hasattr(tonic_config, k):
setattr(tonic_config, k, v)
self._tonic_thread = TonicThread(
self.graph, self.vector_db, tonic_config
)
logger.info("The Tonic initialized — latent thread live")
# Latent engine — heuristic inference loop between spec executions.
# No BrainSwitcher body sharing on Codemine (no ProtoUniBrain).
try:
from tonic_engine import TonicEngine
engine = TonicEngine(
self.graph, self.vector_db, self._tonic_thread,
)
self._tonic_thread.set_latent_engine(engine)
engine.start()
logger.info("Tonic engine running — heuristic mode active")
except ImportError:
logger.info("Tonic engine not available — ouroboros-only mode")
except Exception as exc:
logger.warning("Tonic engine init error: %s — ouroboros-only mode", exc)
self._tonic_thread = None
except Exception as exc:
logger.info("The Tonic not available: %s", exc)
@classmethod
def get_instance(
cls,
workspace_dir: Optional[str] = None,
config: Optional[Dict[str, Any]] = None,
) -> NeuroGraphMemory:
"""Return the singleton instance, creating it if needed."""
if cls._instance is None:
cls._instance = cls(workspace_dir=workspace_dir, config=config)
return cls._instance
@classmethod
def reset_instance(cls) -> None:
"""Reset the singleton (useful for testing)."""
cls._instance = None
# ------------------------------------------------------------------
# Core API
# ------------------------------------------------------------------
def on_message(self, text: str, source_type: Optional[SourceType] = None) -> Dict[str, Any]:
"""Ingest a message, run one STDP learning step, and auto-save.
Args:
text: Raw message content to ingest.
source_type: Override auto-detection (TEXT, MARKDOWN, CODE, etc.).
Returns:
Dict with ingestion stats and learning results.
"""
if not text or not text.strip():
return {"status": "skipped", "reason": "empty_input"}
# Stage 1-5: Extract → Chunk → Embed → Register → Associate
# Acquire graph lock — waits for Tonic engine to finish its current
# token before mutating graph state (RLock so re-entrant calls are safe).
with self.graph._concurrent_lock:
try:
result = self.ingestor.ingest(text, source_type=source_type)
except Exception as exc:
logger.warning("Ingest error: %s", exc)
result = None
# Run SNN learning step
step_result = self.graph.step()
# Update novelty probation for ingested nodes
graduated = self.ingestor.update_probation()
# The Tonic: signal message arrival + ouroboros cycle
if self._tonic_thread is not None:
try:
self._tonic_thread.message_received()
self._tonic_thread.ouroboros_cycle()
except Exception as exc:
logger.debug("Tonic cycle error: %s", exc)
self._message_count += 1
# Auto-save
if self._message_count % self.auto_save_interval == 0:
self.save()
if result is None:
return {"status": "error", "reason": "ingest_failed", "message_count": self._message_count}
return {
"status": "ingested",
"nodes_created": len(result.nodes_created),
"synapses_created": len(result.synapses_created),
"hyperedges_created": len(result.hyperedges_created),
"chunks": result.chunks_created,
"fired": len(step_result.fired_node_ids),
"graduated": len(graduated),
"message_count": self._message_count,
}
def recall(self, query: str, k: int = 5, threshold: float = 0.5) -> List[Dict[str, Any]]:
"""Semantic similarity search over ingested knowledge.
Args:
query: Text to search for.
k: Maximum results to return.
threshold: Minimum similarity score (0-1).
Returns:
List of dicts with 'content', 'similarity', 'node_id', 'metadata'.
"""
return self.ingestor.query_similar(query, k=k, threshold=threshold)
def step(self, n: int = 1) -> List[Any]:
"""Run N SNN learning steps without ingestion."""
results = []
for _ in range(n):
results.append(self.graph.step())
return results
def save(self) -> str:
"""Save graph state to checkpoint. Returns the checkpoint path."""
self.graph.checkpoint(str(self._checkpoint_path), mode=CheckpointMode.FULL)
try:
self.vector_db.save(str(self._vector_db_path))
except Exception as exc:
logger.warning("Vector DB save failed (non-fatal): %s", exc)
logger.info("Checkpoint saved to %s", self._checkpoint_path)
return str(self._checkpoint_path)
def stats(self) -> Dict[str, Any]:
"""Return current graph statistics and telemetry."""
tel = self.graph.get_telemetry()
result = {
"version": "0.4.2",
"timestep": tel.timestep,
"nodes": tel.total_nodes,
"synapses": tel.total_synapses,
"hyperedges": tel.total_hyperedges,
"firing_rate": round(tel.global_firing_rate, 4),
"mean_weight": round(tel.mean_weight, 4),
"predictions_made": tel.total_predictions_made,
"predictions_confirmed": tel.total_predictions_confirmed,
"prediction_accuracy": round(tel.prediction_accuracy, 4),
"novel_sequences": tel.total_novel_sequences,
"pruned": tel.total_pruned,
"sprouted": tel.total_sprouted,
"vector_db_count": self.vector_db.count(),
"checkpoint": str(self._checkpoint_path),
"message_count": self._message_count,
}
if self._tonic_thread is not None:
result["tonic"] = self._tonic_thread.status
return result
def ingest_file(self, path: str, source_type: Optional[SourceType] = None) -> Dict[str, Any]:
"""Ingest a file from disk."""
p = Path(path).expanduser()
if not p.exists():
return {"status": "error", "reason": f"File not found: {path}"}
content = p.read_text(errors="replace")
# Auto-detect source type from extension
if source_type is None:
ext = p.suffix.lower()
type_map = {
".py": SourceType.CODE,
".js": SourceType.CODE,
".ts": SourceType.CODE,
".md": SourceType.MARKDOWN,
".html": SourceType.URL,
".htm": SourceType.URL,
".pdf": SourceType.PDF,
}
source_type = type_map.get(ext, SourceType.TEXT)
return self.on_message(content, source_type=source_type)
def ingest_directory(
self,
directory: str,
extensions: Optional[List[str]] = None,
recursive: bool = True,
) -> List[Dict[str, Any]]:
"""Ingest all matching files from a directory.
Args:
directory: Path to directory.
extensions: File extensions to include (e.g. ['.py', '.md']).
Default: ['.py', '.js', '.ts', '.md', '.txt']
recursive: Whether to recurse into subdirectories.
Returns:
List of ingestion results per file.
"""
if extensions is None:
extensions = [".py", ".js", ".ts", ".md", ".txt"]
d = Path(directory).expanduser()
if not d.is_dir():
return [{"status": "error", "reason": f"Not a directory: {directory}"}]
results = []
pattern = "**/*" if recursive else "*"
for fp in sorted(d.glob(pattern)):
if fp.is_file() and fp.suffix.lower() in extensions:
res = self.ingest_file(str(fp))
res["file"] = str(fp)
results.append(res)
# Save after batch ingestion
self.save()
return results