NuWave / nuwave /substrate /ng_embed.py
Executor-Tyrant-Framework's picture
Initial commit
4c0cf4e
"""
ng_embed — Centralized embedding service for the E-T Systems ecosystem.
Singleton embedding engine used by every module. Provides:
1. Unified embedding via Snowflake/snowflake-arctic-embed-m-v1.5 (ONNX)
2. Dual-pass embedding (forest + trees) via TID concept extraction
3. Thread-safe singleton — one model instance per process
4. Hash fallback when ONNX model unavailable
This is a VENDORED file. Canonical source: ~/NeuroGraph/ng_embed.py
Do NOT modify vendored copies. Changes made here, re-vendored everywhere.
Model: Snowflake/snowflake-arctic-embed-m-v1.5
- 768-dim, CLS pooling, standard BERT architecture
- Query prefix: "Represent this sentence for searching relevant passages: "
- Documents: no prefix
- ONNX quantized (~110MB) via onnxruntime — no PyTorch dependency
Dual-pass (Punchlist #81 — Josh's invention):
Pass 1 (Forest): Gestalt embedding of whole content. One node.
Pass 2 (Trees): LLM extracts concepts via TID. Each concept embedded
separately. Each tree linked to its forest via synapses. Cross-document
tree links form naturally through similarity association.
# ---- Changelog ----
# [2026-03-22] Claude (Opus 4.6) — Initial creation.
# What: Centralized embedding + dual-pass for entire ecosystem.
# Why: PRD §5 (Dual_Pass_Embedding_Implementation.md). Replaces 7+
# identical _embed() functions. Prevents embedding dimension
# mismatch incidents. Upgrades model from bge-base-en-v1.5 to
# snowflake-arctic-embed-m-v1.5 (+1.89 retrieval MTEB).
# How: ONNX Runtime + tokenizers for embedding. TID for concept
# extraction. Substrate-learnable gate for Pass 2 value.
# -------------------
"""
from __future__ import annotations
import hashlib
import json
import logging
import os
import threading
import time
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, TYPE_CHECKING
import numpy as np
if TYPE_CHECKING:
from ng_ecosystem import NGEcosystem
logger = logging.getLogger("ng_embed")
# ---------------------------------------------------------------------------
# Configuration defaults — all values are bootstrap scaffolding
# ---------------------------------------------------------------------------
_DEFAULT_CONFIG = {
# Model
"model_id": "Snowflake/snowflake-arctic-embed-m-v1.5",
"onnx_filename": "onnx/model_quantized.onnx",
"embedding_dim": 768,
"pooling": "cls",
"query_prefix": "Represent this sentence for searching relevant passages: ",
"document_prefix": "",
"cache_dir": str(Path.home() / ".cache" / "ng_embed"),
# Dual-pass (Punchlist #81)
"tid_endpoint": "http://127.0.0.1:7437/v1/chat/completions",
"max_content_for_extraction": 2000, # Chars sent to TID
"max_concepts": 20, # Cap extracted concepts
"forest_to_tree_weight": 0.4, # Bootstrap synapse weight
"tree_to_forest_ratio": 0.7, # tree→forest = forest_weight * ratio
"tid_timeout": 30, # Seconds
"tid_model": "auto", # TID routes to appropriate model
"tid_temperature": 0.2,
"tid_max_tokens": 500,
}
# Concept extraction prompt — not classification, not labeling.
# The LLM reads content and identifies distinct concepts within it.
# This is extraction at the ingestion boundary — the LLM is a tool
# that helps the substrate receive richer raw experience (Law 7).
_EXTRACTION_PROMPT = """Extract the key concepts, terms, and specific references from this text. Return them as a JSON array of short strings, each one a distinct concept or term mentioned in the text.
Focus on:
- Specific technical terms
- Named entities (people, tools, systems)
- Domain-specific concepts
- Action descriptions
- Relationships between things
Return ONLY a JSON array of strings. No explanation. Example: ["concept one", "concept two", "specific term"]
Text:
{content}"""
# ---------------------------------------------------------------------------
# NGEmbed — The singleton embedding service
# ---------------------------------------------------------------------------
class NGEmbed:
"""Centralized embedding engine for the E-T Systems ecosystem.
Thread-safe singleton. One ONNX model instance per process, shared
by all modules. Provides both single-pass embedding and dual-pass
(forest + trees) via TID concept extraction.
Usage:
from ng_embed import embed, embed_batch
vec = embed("some text") # 768-dim document embedding
vec = embed("query text", is_query=True) # With query prefix
vec = embed("text", normalize=True) # L2-normalized (Praxis)
vecs = embed_batch(["text1", "text2"]) # Batch embedding
"""
_instance: Optional["NGEmbed"] = None
_lock = threading.Lock()
def __init__(self, config: Optional[Dict[str, Any]] = None):
self._config = dict(_DEFAULT_CONFIG)
if config:
self._config.update(config)
self._session = None # ONNX InferenceSession (lazy)
self._tokenizer = None # tokenizers.Tokenizer (lazy)
self._model_loaded = False
self._model_failed = False
self._model_lock = threading.Lock()
# Dual-pass stats
self._extractions = 0
self._concepts_total = 0
self._failures = 0
# -- Singleton -----------------------------------------------------------
@classmethod
def get_instance(cls, config: Optional[Dict[str, Any]] = None) -> "NGEmbed":
"""Thread-safe singleton factory."""
if cls._instance is not None:
return cls._instance
with cls._lock:
if cls._instance is None:
cls._instance = cls(config)
return cls._instance
@classmethod
def reset_instance(cls) -> None:
"""Destroy singleton (testing only)."""
with cls._lock:
if cls._instance is not None:
cls._instance._session = None
cls._instance._tokenizer = None
cls._instance = None
# -- Model loading -------------------------------------------------------
def _ensure_model(self) -> bool:
"""Lazy-load ONNX model + tokenizer on first use."""
if self._model_loaded:
return True
if self._model_failed:
return False
with self._model_lock:
if self._model_loaded:
return True
if self._model_failed:
return False
try:
import onnxruntime as ort
from huggingface_hub import hf_hub_download
from tokenizers import Tokenizer
model_id = self._config["model_id"]
cache_dir = self._config["cache_dir"]
os.makedirs(cache_dir, exist_ok=True)
# Download ONNX model
onnx_path = hf_hub_download(
repo_id=model_id,
filename=self._config["onnx_filename"],
cache_dir=cache_dir,
)
# Load ONNX session (CPU, optimized)
sess_opts = ort.SessionOptions()
sess_opts.graph_optimization_level = (
ort.GraphOptimizationLevel.ORT_ENABLE_ALL
)
sess_opts.intra_op_num_threads = max(1, os.cpu_count() // 2)
self._session = ort.InferenceSession(
onnx_path,
sess_options=sess_opts,
providers=["CPUExecutionProvider"],
)
# Load tokenizer
self._tokenizer = Tokenizer.from_pretrained(model_id)
self._tokenizer.enable_padding(
pad_id=0, pad_token="[PAD]",
)
self._tokenizer.enable_truncation(max_length=512)
self._model_loaded = True
logger.info(
"ng_embed: loaded %s (ONNX, %d-dim, CLS pooling)",
model_id, self._config["embedding_dim"],
)
return True
except Exception as exc:
logger.warning("ng_embed: model load failed, using hash fallback: %s", exc)
self._model_failed = True
return False
# -- Embedding -----------------------------------------------------------
def embed(
self,
text: str,
normalize: bool = False,
is_query: bool = False,
) -> np.ndarray:
"""Embed text → 768-dim float32 numpy array.
Args:
text: Raw text to embed.
normalize: L2-normalize output (True for Praxis compatibility).
is_query: Prepend query prefix (for recall/search operations).
Returns:
768-dim float32 numpy array.
"""
if self._ensure_model():
return self._onnx_embed(text, normalize=normalize, is_query=is_query)
return self._hash_embed(text, normalize=normalize)
def embed_batch(
self,
texts: List[str],
normalize: bool = False,
is_query: bool = False,
) -> List[np.ndarray]:
"""Batch embedding for efficiency.
Args:
texts: List of texts to embed.
normalize: L2-normalize outputs.
is_query: Prepend query prefix to all texts.
Returns:
List of 768-dim float32 numpy arrays.
"""
if not texts:
return []
if self._ensure_model():
return self._onnx_embed_batch(texts, normalize=normalize, is_query=is_query)
return [self._hash_embed(t, normalize=normalize) for t in texts]
def _onnx_embed(
self,
text: str,
normalize: bool = False,
is_query: bool = False,
) -> np.ndarray:
"""Single text embedding via ONNX Runtime."""
# Apply prefix
if is_query:
text = self._config["query_prefix"] + text
else:
prefix = self._config["document_prefix"]
if prefix:
text = prefix + text
# Tokenize
encoding = self._tokenizer.encode(text)
input_ids = np.array([encoding.ids], dtype=np.int64)
attention_mask = np.array([encoding.attention_mask], dtype=np.int64)
# Infer
outputs = self._session.run(
None,
{
"input_ids": input_ids,
"attention_mask": attention_mask,
},
)
# sentence_embedding output (index 1) — pre-pooled by model
vec = outputs[1][0, :].astype(np.float32)
if normalize:
norm = np.linalg.norm(vec)
if norm > 0:
vec = vec / norm
return vec
def _onnx_embed_batch(
self,
texts: List[str],
normalize: bool = False,
is_query: bool = False,
) -> List[np.ndarray]:
"""Batch embedding via ONNX Runtime with padding."""
# Apply prefixes
prefixed = []
for text in texts:
if is_query:
prefixed.append(self._config["query_prefix"] + text)
else:
prefix = self._config["document_prefix"]
prefixed.append((prefix + text) if prefix else text)
# Batch tokenize
encodings = self._tokenizer.encode_batch(prefixed)
max_len = max(len(e.ids) for e in encodings)
input_ids = np.zeros((len(encodings), max_len), dtype=np.int64)
attention_mask = np.zeros((len(encodings), max_len), dtype=np.int64)
for i, enc in enumerate(encodings):
length = len(enc.ids)
input_ids[i, :length] = enc.ids
attention_mask[i, :length] = enc.attention_mask
# Infer
outputs = self._session.run(
None,
{
"input_ids": input_ids,
"attention_mask": attention_mask,
},
)
# sentence_embedding output (index 1) — pre-pooled by model
results = []
for i in range(len(texts)):
vec = outputs[1][i, :].astype(np.float32)
if normalize:
norm = np.linalg.norm(vec)
if norm > 0:
vec = vec / norm
results.append(vec)
return results
def _hash_embed(
self,
text: str,
normalize: bool = False,
) -> np.ndarray:
"""Deterministic hash-based fallback embedding.
Produces a stable 768-dim vector from text via SHA-384.
Not semantically meaningful — ensures modules can operate
when the ONNX model is unavailable.
"""
dim = self._config["embedding_dim"]
h = hashlib.sha384(text.encode("utf-8")).digest()
# Expand hash to fill dim via seeded RNG
rng = np.random.RandomState(
int.from_bytes(h[:4], "little")
)
vec = rng.randn(dim).astype(np.float32)
if normalize:
norm = np.linalg.norm(vec)
if norm > 0:
vec = vec / norm
return vec
# -- Dual-pass (Punchlist #81) -------------------------------------------
def dual_record_outcome(
self,
ecosystem: "NGEcosystem",
content: str,
embedding: np.ndarray,
target_id: str,
success: bool,
strength: float = 1.0,
metadata: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""Dual-pass learning: forest embedding + tree concept extraction.
Pass 1: Record the forest (gestalt) embedding via ecosystem.record_outcome().
Pass 2: Extract concepts via TID → embed each → record_outcome()
per tree → create forest→tree synapses in the substrate.
If TID is unavailable or extraction fails, gracefully falls back
to single-pass (forest only). Pass 2 failure is never fatal.
Args:
ecosystem: The module's NGEcosystem instance.
content: Raw text content (for concept extraction).
embedding: Pre-computed forest embedding (Pass 1).
target_id: Opaque string for what was decided.
success: Whether the outcome was successful.
strength: Caller-reported significance [0.0, 1.0].
metadata: Additional metadata dict.
Returns:
{
"forest_result": dict, # record_outcome result for forest
"tree_ids": [str], # Target IDs for tree nodes
"concepts": [str], # Extracted concept strings
"pass2_attempted": bool,
}
"""
# Pass 1: Forest — standard record_outcome with gestalt embedding
forest_result = ecosystem.record_outcome(
embedding, target_id, success,
strength=strength, metadata=metadata,
)
result = {
"forest_result": forest_result,
"tree_ids": [],
"concepts": [],
"pass2_attempted": False,
}
# Pass 2: Trees — concept extraction via TID
concepts = self._extract_concepts(content)
result["pass2_attempted"] = True
if not concepts:
return result
result["concepts"] = concepts
# Embed and record each concept
tree_embeddings = self.embed_batch(concepts)
for concept, tree_emb in zip(concepts, tree_embeddings):
tree_meta = dict(metadata or {})
tree_meta["_tree_concept"] = True
tree_meta["_forest_target_id"] = target_id
tree_meta["_concept"] = concept
tree_target = f"{target_id}::tree::{concept[:64]}"
tree_result = ecosystem.record_outcome(
tree_emb, tree_target, success,
strength=strength * 0.8, # Trees slightly softer than forest
metadata=tree_meta,
)
if tree_result:
result["tree_ids"].append(tree_target)
# Forest→tree synapse creation happens in the substrate
# through ng_lite's similarity-based association when the
# tree embedding is close enough to the forest. The explicit
# synapses below reinforce this connection at bootstrap weight.
self._create_substrate_link(
ecosystem, embedding, tree_emb,
target_id, tree_target,
)
self._extractions += 1
self._concepts_total += len(result["tree_ids"])
logger.debug(
"Dual-pass: forest=%s, %d trees from %d concepts",
target_id[:32], len(result["tree_ids"]), len(concepts),
)
return result
def _create_substrate_link(
self,
ecosystem: "NGEcosystem",
forest_emb: np.ndarray,
tree_emb: np.ndarray,
forest_target: str,
tree_target: str,
) -> None:
"""Create forest↔tree link in the substrate via record_outcome.
Uses cross-recording: record the tree embedding against the forest
target_id, and vice versa. This creates bidirectional associations
in the substrate's Hebbian network.
"""
weight = self._config["forest_to_tree_weight"]
ratio = self._config["tree_to_forest_ratio"]
# Forest→tree: "when I see this tree, recall the forest"
try:
ecosystem.record_outcome(
tree_emb, forest_target, True,
strength=weight,
metadata={"_link": "dual_pass_tree_to_forest"},
)
except Exception:
pass
# Tree→forest: "when I see this forest, recall the tree"
try:
ecosystem.record_outcome(
forest_emb, tree_target, True,
strength=weight * ratio,
metadata={"_link": "dual_pass_forest_to_tree"},
)
except Exception:
pass
def _extract_concepts(self, text: str) -> List[str]:
"""Extract concepts from text via TID LLM call.
One LLM call per ingestion. Returns list of concept strings,
or empty list on failure (non-fatal).
"""
import requests
content = text[:self._config["max_content_for_extraction"]]
prompt = _EXTRACTION_PROMPT.format(content=content)
try:
resp = requests.post(
self._config["tid_endpoint"],
json={
"model": self._config["tid_model"],
"messages": [
{
"role": "system",
"content": "You extract concepts from text. "
"Return only a JSON array of strings.",
},
{"role": "user", "content": prompt},
],
"temperature": self._config["tid_temperature"],
"max_tokens": self._config["tid_max_tokens"],
},
timeout=self._config["tid_timeout"],
)
resp.raise_for_status()
response_text = (
resp.json()["choices"][0]["message"]["content"].strip()
)
concepts = self._parse_concepts(response_text)
return concepts[:self._config["max_concepts"]]
except Exception as exc:
logger.debug("Concept extraction failed: %s", exc)
self._failures += 1
return []
@staticmethod
def _parse_concepts(text: str) -> List[str]:
"""Parse a JSON array from LLM response, handling markdown fences."""
text = text.strip()
if text.startswith("```"):
lines = text.split("\n")
lines = [l for l in lines if not l.strip().startswith("```")]
text = "\n".join(lines).strip()
try:
result = json.loads(text)
if isinstance(result, list):
return [str(c).strip() for c in result if str(c).strip()]
except json.JSONDecodeError:
start = text.find("[")
end = text.rfind("]") + 1
if start >= 0 and end > start:
try:
result = json.loads(text[start:end])
if isinstance(result, list):
return [str(c).strip() for c in result if str(c).strip()]
except json.JSONDecodeError:
pass
return []
# -- Stats ---------------------------------------------------------------
@property
def stats(self) -> Dict[str, Any]:
return {
"model_id": self._config["model_id"],
"model_loaded": self._model_loaded,
"embedding_dim": self._config["embedding_dim"],
"pooling": self._config["pooling"],
"dual_pass": {
"extractions": self._extractions,
"concepts_total": self._concepts_total,
"failures": self._failures,
"avg_concepts": (
round(self._concepts_total / self._extractions, 1)
if self._extractions > 0 else 0
),
},
}
# ---------------------------------------------------------------------------
# Module-level convenience functions
# ---------------------------------------------------------------------------
def embed(
text: str,
normalize: bool = False,
is_query: bool = False,
) -> np.ndarray:
"""Embed text → 768-dim float32 numpy array.
Convenience wrapper around NGEmbed.get_instance().embed().
"""
return NGEmbed.get_instance().embed(text, normalize=normalize, is_query=is_query)
def embed_batch(
texts: List[str],
normalize: bool = False,
is_query: bool = False,
) -> List[np.ndarray]:
"""Batch embed texts → list of 768-dim float32 numpy arrays."""
return NGEmbed.get_instance().embed_batch(
texts, normalize=normalize, is_query=is_query,
)