| """ |
| ng_embed — Centralized embedding service for the E-T Systems ecosystem. |
| |
| Singleton embedding engine used by every module. Provides: |
| 1. Unified embedding via Snowflake/snowflake-arctic-embed-m-v1.5 (ONNX) |
| 2. Dual-pass embedding (forest + trees) via TID concept extraction |
| 3. Thread-safe singleton — one model instance per process |
| 4. Hash fallback when ONNX model unavailable |
| |
| This is a VENDORED file. Canonical source: ~/NeuroGraph/ng_embed.py |
| Do NOT modify vendored copies. Changes made here, re-vendored everywhere. |
| |
| Model: Snowflake/snowflake-arctic-embed-m-v1.5 |
| - 768-dim, CLS pooling, standard BERT architecture |
| - Query prefix: "Represent this sentence for searching relevant passages: " |
| - Documents: no prefix |
| - ONNX quantized (~110MB) via onnxruntime — no PyTorch dependency |
| |
| Dual-pass (Punchlist #81 — Josh's invention): |
| Pass 1 (Forest): Gestalt embedding of whole content. One node. |
| Pass 2 (Trees): LLM extracts concepts via TID. Each concept embedded |
| separately. Each tree linked to its forest via synapses. Cross-document |
| tree links form naturally through similarity association. |
| |
| # ---- Changelog ---- |
| # [2026-03-22] Claude (Opus 4.6) — Initial creation. |
| # What: Centralized embedding + dual-pass for entire ecosystem. |
| # Why: PRD §5 (Dual_Pass_Embedding_Implementation.md). Replaces 7+ |
| # identical _embed() functions. Prevents embedding dimension |
| # mismatch incidents. Upgrades model from bge-base-en-v1.5 to |
| # snowflake-arctic-embed-m-v1.5 (+1.89 retrieval MTEB). |
| # How: ONNX Runtime + tokenizers for embedding. TID for concept |
| # extraction. Substrate-learnable gate for Pass 2 value. |
| # ------------------- |
| """ |
|
|
| from __future__ import annotations |
|
|
| import hashlib |
| import json |
| import logging |
| import os |
| import threading |
| import time |
| from pathlib import Path |
| from typing import Any, Callable, Dict, List, Optional, TYPE_CHECKING |
|
|
| import numpy as np |
|
|
| if TYPE_CHECKING: |
| from ng_ecosystem import NGEcosystem |
|
|
| logger = logging.getLogger("ng_embed") |
|
|
| |
| |
| |
|
|
| _DEFAULT_CONFIG = { |
| |
| "model_id": "Snowflake/snowflake-arctic-embed-m-v1.5", |
| "onnx_filename": "onnx/model_quantized.onnx", |
| "embedding_dim": 768, |
| "pooling": "cls", |
| "query_prefix": "Represent this sentence for searching relevant passages: ", |
| "document_prefix": "", |
| "cache_dir": str(Path.home() / ".cache" / "ng_embed"), |
|
|
| |
| "tid_endpoint": "http://127.0.0.1:7437/v1/chat/completions", |
| "max_content_for_extraction": 2000, |
| "max_concepts": 20, |
| "forest_to_tree_weight": 0.4, |
| "tree_to_forest_ratio": 0.7, |
| "tid_timeout": 30, |
| "tid_model": "auto", |
| "tid_temperature": 0.2, |
| "tid_max_tokens": 500, |
| } |
|
|
| |
| |
| |
| |
| _EXTRACTION_PROMPT = """Extract the key concepts, terms, and specific references from this text. Return them as a JSON array of short strings, each one a distinct concept or term mentioned in the text. |
| |
| Focus on: |
| - Specific technical terms |
| - Named entities (people, tools, systems) |
| - Domain-specific concepts |
| - Action descriptions |
| - Relationships between things |
| |
| Return ONLY a JSON array of strings. No explanation. Example: ["concept one", "concept two", "specific term"] |
| |
| Text: |
| {content}""" |
|
|
|
|
| |
| |
| |
|
|
| class NGEmbed: |
| """Centralized embedding engine for the E-T Systems ecosystem. |
| |
| Thread-safe singleton. One ONNX model instance per process, shared |
| by all modules. Provides both single-pass embedding and dual-pass |
| (forest + trees) via TID concept extraction. |
| |
| Usage: |
| from ng_embed import embed, embed_batch |
| |
| vec = embed("some text") # 768-dim document embedding |
| vec = embed("query text", is_query=True) # With query prefix |
| vec = embed("text", normalize=True) # L2-normalized (Praxis) |
| |
| vecs = embed_batch(["text1", "text2"]) # Batch embedding |
| """ |
|
|
| _instance: Optional["NGEmbed"] = None |
| _lock = threading.Lock() |
|
|
| def __init__(self, config: Optional[Dict[str, Any]] = None): |
| self._config = dict(_DEFAULT_CONFIG) |
| if config: |
| self._config.update(config) |
|
|
| self._session = None |
| self._tokenizer = None |
| self._model_loaded = False |
| self._model_failed = False |
| self._model_lock = threading.Lock() |
|
|
| |
| self._extractions = 0 |
| self._concepts_total = 0 |
| self._failures = 0 |
|
|
| |
|
|
| @classmethod |
| def get_instance(cls, config: Optional[Dict[str, Any]] = None) -> "NGEmbed": |
| """Thread-safe singleton factory.""" |
| if cls._instance is not None: |
| return cls._instance |
| with cls._lock: |
| if cls._instance is None: |
| cls._instance = cls(config) |
| return cls._instance |
|
|
| @classmethod |
| def reset_instance(cls) -> None: |
| """Destroy singleton (testing only).""" |
| with cls._lock: |
| if cls._instance is not None: |
| cls._instance._session = None |
| cls._instance._tokenizer = None |
| cls._instance = None |
|
|
| |
|
|
| def _ensure_model(self) -> bool: |
| """Lazy-load ONNX model + tokenizer on first use.""" |
| if self._model_loaded: |
| return True |
| if self._model_failed: |
| return False |
|
|
| with self._model_lock: |
| if self._model_loaded: |
| return True |
| if self._model_failed: |
| return False |
|
|
| try: |
| import onnxruntime as ort |
| from huggingface_hub import hf_hub_download |
| from tokenizers import Tokenizer |
|
|
| model_id = self._config["model_id"] |
| cache_dir = self._config["cache_dir"] |
| os.makedirs(cache_dir, exist_ok=True) |
|
|
| |
| onnx_path = hf_hub_download( |
| repo_id=model_id, |
| filename=self._config["onnx_filename"], |
| cache_dir=cache_dir, |
| ) |
|
|
| |
| sess_opts = ort.SessionOptions() |
| sess_opts.graph_optimization_level = ( |
| ort.GraphOptimizationLevel.ORT_ENABLE_ALL |
| ) |
| sess_opts.intra_op_num_threads = max(1, os.cpu_count() // 2) |
| self._session = ort.InferenceSession( |
| onnx_path, |
| sess_options=sess_opts, |
| providers=["CPUExecutionProvider"], |
| ) |
|
|
| |
| self._tokenizer = Tokenizer.from_pretrained(model_id) |
| self._tokenizer.enable_padding( |
| pad_id=0, pad_token="[PAD]", |
| ) |
| self._tokenizer.enable_truncation(max_length=512) |
|
|
| self._model_loaded = True |
| logger.info( |
| "ng_embed: loaded %s (ONNX, %d-dim, CLS pooling)", |
| model_id, self._config["embedding_dim"], |
| ) |
| return True |
|
|
| except Exception as exc: |
| logger.warning("ng_embed: model load failed, using hash fallback: %s", exc) |
| self._model_failed = True |
| return False |
|
|
| |
|
|
| def embed( |
| self, |
| text: str, |
| normalize: bool = False, |
| is_query: bool = False, |
| ) -> np.ndarray: |
| """Embed text → 768-dim float32 numpy array. |
| |
| Args: |
| text: Raw text to embed. |
| normalize: L2-normalize output (True for Praxis compatibility). |
| is_query: Prepend query prefix (for recall/search operations). |
| |
| Returns: |
| 768-dim float32 numpy array. |
| """ |
| if self._ensure_model(): |
| return self._onnx_embed(text, normalize=normalize, is_query=is_query) |
| return self._hash_embed(text, normalize=normalize) |
|
|
| def embed_batch( |
| self, |
| texts: List[str], |
| normalize: bool = False, |
| is_query: bool = False, |
| ) -> List[np.ndarray]: |
| """Batch embedding for efficiency. |
| |
| Args: |
| texts: List of texts to embed. |
| normalize: L2-normalize outputs. |
| is_query: Prepend query prefix to all texts. |
| |
| Returns: |
| List of 768-dim float32 numpy arrays. |
| """ |
| if not texts: |
| return [] |
| if self._ensure_model(): |
| return self._onnx_embed_batch(texts, normalize=normalize, is_query=is_query) |
| return [self._hash_embed(t, normalize=normalize) for t in texts] |
|
|
| def _onnx_embed( |
| self, |
| text: str, |
| normalize: bool = False, |
| is_query: bool = False, |
| ) -> np.ndarray: |
| """Single text embedding via ONNX Runtime.""" |
| |
| if is_query: |
| text = self._config["query_prefix"] + text |
| else: |
| prefix = self._config["document_prefix"] |
| if prefix: |
| text = prefix + text |
|
|
| |
| encoding = self._tokenizer.encode(text) |
| input_ids = np.array([encoding.ids], dtype=np.int64) |
| attention_mask = np.array([encoding.attention_mask], dtype=np.int64) |
|
|
| |
| outputs = self._session.run( |
| None, |
| { |
| "input_ids": input_ids, |
| "attention_mask": attention_mask, |
| }, |
| ) |
|
|
| |
| vec = outputs[1][0, :].astype(np.float32) |
|
|
| if normalize: |
| norm = np.linalg.norm(vec) |
| if norm > 0: |
| vec = vec / norm |
|
|
| return vec |
|
|
| def _onnx_embed_batch( |
| self, |
| texts: List[str], |
| normalize: bool = False, |
| is_query: bool = False, |
| ) -> List[np.ndarray]: |
| """Batch embedding via ONNX Runtime with padding.""" |
| |
| prefixed = [] |
| for text in texts: |
| if is_query: |
| prefixed.append(self._config["query_prefix"] + text) |
| else: |
| prefix = self._config["document_prefix"] |
| prefixed.append((prefix + text) if prefix else text) |
|
|
| |
| encodings = self._tokenizer.encode_batch(prefixed) |
| max_len = max(len(e.ids) for e in encodings) |
|
|
| input_ids = np.zeros((len(encodings), max_len), dtype=np.int64) |
| attention_mask = np.zeros((len(encodings), max_len), dtype=np.int64) |
|
|
| for i, enc in enumerate(encodings): |
| length = len(enc.ids) |
| input_ids[i, :length] = enc.ids |
| attention_mask[i, :length] = enc.attention_mask |
|
|
| |
| outputs = self._session.run( |
| None, |
| { |
| "input_ids": input_ids, |
| "attention_mask": attention_mask, |
| }, |
| ) |
|
|
| |
| results = [] |
| for i in range(len(texts)): |
| vec = outputs[1][i, :].astype(np.float32) |
| if normalize: |
| norm = np.linalg.norm(vec) |
| if norm > 0: |
| vec = vec / norm |
| results.append(vec) |
|
|
| return results |
|
|
| def _hash_embed( |
| self, |
| text: str, |
| normalize: bool = False, |
| ) -> np.ndarray: |
| """Deterministic hash-based fallback embedding. |
| |
| Produces a stable 768-dim vector from text via SHA-384. |
| Not semantically meaningful — ensures modules can operate |
| when the ONNX model is unavailable. |
| """ |
| dim = self._config["embedding_dim"] |
| h = hashlib.sha384(text.encode("utf-8")).digest() |
| |
| rng = np.random.RandomState( |
| int.from_bytes(h[:4], "little") |
| ) |
| vec = rng.randn(dim).astype(np.float32) |
| if normalize: |
| norm = np.linalg.norm(vec) |
| if norm > 0: |
| vec = vec / norm |
| return vec |
|
|
| |
|
|
| def dual_record_outcome( |
| self, |
| ecosystem: "NGEcosystem", |
| content: str, |
| embedding: np.ndarray, |
| target_id: str, |
| success: bool, |
| strength: float = 1.0, |
| metadata: Optional[Dict[str, Any]] = None, |
| ) -> Dict[str, Any]: |
| """Dual-pass learning: forest embedding + tree concept extraction. |
| |
| Pass 1: Record the forest (gestalt) embedding via ecosystem.record_outcome(). |
| Pass 2: Extract concepts via TID → embed each → record_outcome() |
| per tree → create forest→tree synapses in the substrate. |
| |
| If TID is unavailable or extraction fails, gracefully falls back |
| to single-pass (forest only). Pass 2 failure is never fatal. |
| |
| Args: |
| ecosystem: The module's NGEcosystem instance. |
| content: Raw text content (for concept extraction). |
| embedding: Pre-computed forest embedding (Pass 1). |
| target_id: Opaque string for what was decided. |
| success: Whether the outcome was successful. |
| strength: Caller-reported significance [0.0, 1.0]. |
| metadata: Additional metadata dict. |
| |
| Returns: |
| { |
| "forest_result": dict, # record_outcome result for forest |
| "tree_ids": [str], # Target IDs for tree nodes |
| "concepts": [str], # Extracted concept strings |
| "pass2_attempted": bool, |
| } |
| """ |
| |
| forest_result = ecosystem.record_outcome( |
| embedding, target_id, success, |
| strength=strength, metadata=metadata, |
| ) |
|
|
| result = { |
| "forest_result": forest_result, |
| "tree_ids": [], |
| "concepts": [], |
| "pass2_attempted": False, |
| } |
|
|
| |
| concepts = self._extract_concepts(content) |
| result["pass2_attempted"] = True |
|
|
| if not concepts: |
| return result |
|
|
| result["concepts"] = concepts |
|
|
| |
| tree_embeddings = self.embed_batch(concepts) |
| for concept, tree_emb in zip(concepts, tree_embeddings): |
| tree_meta = dict(metadata or {}) |
| tree_meta["_tree_concept"] = True |
| tree_meta["_forest_target_id"] = target_id |
| tree_meta["_concept"] = concept |
|
|
| tree_target = f"{target_id}::tree::{concept[:64]}" |
| tree_result = ecosystem.record_outcome( |
| tree_emb, tree_target, success, |
| strength=strength * 0.8, |
| metadata=tree_meta, |
| ) |
|
|
| if tree_result: |
| result["tree_ids"].append(tree_target) |
|
|
| |
| |
| |
| |
| self._create_substrate_link( |
| ecosystem, embedding, tree_emb, |
| target_id, tree_target, |
| ) |
|
|
| self._extractions += 1 |
| self._concepts_total += len(result["tree_ids"]) |
|
|
| logger.debug( |
| "Dual-pass: forest=%s, %d trees from %d concepts", |
| target_id[:32], len(result["tree_ids"]), len(concepts), |
| ) |
|
|
| return result |
|
|
| def _create_substrate_link( |
| self, |
| ecosystem: "NGEcosystem", |
| forest_emb: np.ndarray, |
| tree_emb: np.ndarray, |
| forest_target: str, |
| tree_target: str, |
| ) -> None: |
| """Create forest↔tree link in the substrate via record_outcome. |
| |
| Uses cross-recording: record the tree embedding against the forest |
| target_id, and vice versa. This creates bidirectional associations |
| in the substrate's Hebbian network. |
| """ |
| weight = self._config["forest_to_tree_weight"] |
| ratio = self._config["tree_to_forest_ratio"] |
|
|
| |
| try: |
| ecosystem.record_outcome( |
| tree_emb, forest_target, True, |
| strength=weight, |
| metadata={"_link": "dual_pass_tree_to_forest"}, |
| ) |
| except Exception: |
| pass |
|
|
| |
| try: |
| ecosystem.record_outcome( |
| forest_emb, tree_target, True, |
| strength=weight * ratio, |
| metadata={"_link": "dual_pass_forest_to_tree"}, |
| ) |
| except Exception: |
| pass |
|
|
| def _extract_concepts(self, text: str) -> List[str]: |
| """Extract concepts from text via TID LLM call. |
| |
| One LLM call per ingestion. Returns list of concept strings, |
| or empty list on failure (non-fatal). |
| """ |
| import requests |
|
|
| content = text[:self._config["max_content_for_extraction"]] |
| prompt = _EXTRACTION_PROMPT.format(content=content) |
|
|
| try: |
| resp = requests.post( |
| self._config["tid_endpoint"], |
| json={ |
| "model": self._config["tid_model"], |
| "messages": [ |
| { |
| "role": "system", |
| "content": "You extract concepts from text. " |
| "Return only a JSON array of strings.", |
| }, |
| {"role": "user", "content": prompt}, |
| ], |
| "temperature": self._config["tid_temperature"], |
| "max_tokens": self._config["tid_max_tokens"], |
| }, |
| timeout=self._config["tid_timeout"], |
| ) |
| resp.raise_for_status() |
| response_text = ( |
| resp.json()["choices"][0]["message"]["content"].strip() |
| ) |
|
|
| concepts = self._parse_concepts(response_text) |
| return concepts[:self._config["max_concepts"]] |
|
|
| except Exception as exc: |
| logger.debug("Concept extraction failed: %s", exc) |
| self._failures += 1 |
| return [] |
|
|
| @staticmethod |
| def _parse_concepts(text: str) -> List[str]: |
| """Parse a JSON array from LLM response, handling markdown fences.""" |
| text = text.strip() |
| if text.startswith("```"): |
| lines = text.split("\n") |
| lines = [l for l in lines if not l.strip().startswith("```")] |
| text = "\n".join(lines).strip() |
|
|
| try: |
| result = json.loads(text) |
| if isinstance(result, list): |
| return [str(c).strip() for c in result if str(c).strip()] |
| except json.JSONDecodeError: |
| start = text.find("[") |
| end = text.rfind("]") + 1 |
| if start >= 0 and end > start: |
| try: |
| result = json.loads(text[start:end]) |
| if isinstance(result, list): |
| return [str(c).strip() for c in result if str(c).strip()] |
| except json.JSONDecodeError: |
| pass |
|
|
| return [] |
|
|
| |
|
|
| @property |
| def stats(self) -> Dict[str, Any]: |
| return { |
| "model_id": self._config["model_id"], |
| "model_loaded": self._model_loaded, |
| "embedding_dim": self._config["embedding_dim"], |
| "pooling": self._config["pooling"], |
| "dual_pass": { |
| "extractions": self._extractions, |
| "concepts_total": self._concepts_total, |
| "failures": self._failures, |
| "avg_concepts": ( |
| round(self._concepts_total / self._extractions, 1) |
| if self._extractions > 0 else 0 |
| ), |
| }, |
| } |
|
|
|
|
| |
| |
| |
|
|
| def embed( |
| text: str, |
| normalize: bool = False, |
| is_query: bool = False, |
| ) -> np.ndarray: |
| """Embed text → 768-dim float32 numpy array. |
| |
| Convenience wrapper around NGEmbed.get_instance().embed(). |
| """ |
| return NGEmbed.get_instance().embed(text, normalize=normalize, is_query=is_query) |
|
|
|
|
| def embed_batch( |
| texts: List[str], |
| normalize: bool = False, |
| is_query: bool = False, |
| ) -> List[np.ndarray]: |
| """Batch embed texts → list of 768-dim float32 numpy arrays.""" |
| return NGEmbed.get_instance().embed_batch( |
| texts, normalize=normalize, is_query=is_query, |
| ) |
|
|