"""Custom inference handler for Hugging Face Inference Endpoints. This module exposes :class:`EndpointHandler`, the entrypoint used by the Hugging Face serving stack when ``--task custom`` is selected. The handler loads the exported Noesis decoder ONNX graph and accepts symbolic intent vectors (``psi``) along with an optional ``slow_state`` memory tensor. The outputs mirror the values produced by the training runtime: * ``z_out`` – semantic embedding projected back into symbolic space. * ``choice``, ``pain``, ``memory`` and ``quality`` – diagnostic scalars. * ``slow_state`` – updated slow memory tensor suitable for recurrent usage. The handler is intentionally lightweight so it can run without the rest of the AletheiaEngine Python package being installed. """ from __future__ import annotations import importlib import importlib.util from dataclasses import dataclass from pathlib import Path import hashlib import random import re from typing import Any, Mapping, MutableMapping, Optional, Sequence import numpy as np _WORD_RE = re.compile(r"\w+", re.UNICODE) _INTENT_VOCAB = [ "clarity", "empathy", "analysis", "evidence", "caution", "curiosity", "context", "precision", "ethics", "resilience", "coherence", "safety", "humility", "breadth", "depth", "innovation", "structure", "rigour", "balance", "confidence", ] _DEFAULT_PROVIDER = "aletheia-noesis" _DEFAULT_MODEL = "noesis-transformer-onnx" class _TextEncoder: """Deterministic text → vector encoder. The Hugging Face Inference Endpoints frequently pass user prompts as strings via the ``inputs`` field. The Noesis decoder, however, expects a symbolic vector (``psi``) as input. To provide a graceful fallback the handler lazily converts short text prompts into a stable float32 vector by hashing tokens onto a hypersphere. This mirrors the lightweight ``TextEncoder256`` implementation bundled with the full AletheiaEngine package while avoiding a heavy import dependency inside the endpoint container. """ def __init__(self, dim: int) -> None: self.dim = dim @staticmethod def _tokens(text: str) -> list[str]: return [tok.lower() for tok in _WORD_RE.findall(text)] @staticmethod def _seed(tok: str) -> int: # FNV-1a hash for determinism across processes/platforms. value = 2166136261 for byte in tok.encode("utf-8"): value ^= byte value = (value * 16777619) & 0xFFFFFFFF return int(value) def encode(self, text: str) -> np.ndarray: tokens = self._tokens(text) if not tokens: return np.zeros((1, self.dim), dtype=np.float32) vecs = [] for tok in tokens: rs = np.random.RandomState(self._seed(tok)) embedding = rs.normal(0.0, 1.0, size=(self.dim,)).astype(np.float32) norm = float(np.linalg.norm(embedding)) or 1.0 vecs.append(embedding / norm) stacked = np.stack(vecs, axis=0) pooled = stacked.mean(axis=0, dtype=np.float32, keepdims=True) pooled_norm = float(np.linalg.norm(pooled)) or 1.0 return pooled / pooled_norm class _SimpleTokenizer: """Minimal tokenizer mirroring the reference Noesis runtime.""" def __init__(self) -> None: special_tokens = ["", "", "", ""] alphabet = list("abcdefghijklmnopqrstuvwxyz0123456789 .,;:'\"!?-\n") self._tokens = special_tokens + alphabet self._token_to_id = {token: idx for idx, token in enumerate(self._tokens)} @property def pad_token_id(self) -> int: return 0 @property def bos_token_id(self) -> int: return 1 @property def eos_token_id(self) -> int: return 2 @property def unk_token_id(self) -> int: return 3 def encode(self, text: str) -> list[int]: tokens = [self.bos_token_id] for char in text: tokens.append(self._token_to_id.get(char.lower(), self.unk_token_id)) tokens.append(self.eos_token_id) return tokens def _summarise_intent(psi: Sequence[float], top_k: int = 4) -> list[str]: """Convert strongest symbolic dimensions into descriptors.""" vector = np.asarray(list(psi), dtype=np.float32).reshape(-1) if vector.size == 0: return [] k = min(top_k, vector.size) magnitudes = np.abs(vector) top_indices = magnitudes.argsort()[::-1][:k] summary: list[str] = [] for index in top_indices.tolist(): descriptor = _INTENT_VOCAB[index % len(_INTENT_VOCAB)] direction = "elevated" if vector[index] >= 0 else "attenuated" summary.append(f"{descriptor} ({direction}, |ψ|={magnitudes[index]:.2f})") return summary @dataclass(frozen=True) class _DecodingParams: beam_size: int = 6 temperature: float = 0.8 top_p: float = 0.9 max_new_tokens: int = 256 stop_quality: float = 0.6 @classmethod def from_payload(cls, payload: Mapping[str, Any]) -> "_DecodingParams": source: Mapping[str, Any] | None = None if "decoding" in payload and isinstance(payload["decoding"], Mapping): source = payload["decoding"] elif "parameters" in payload and isinstance(payload["parameters"], Mapping): candidate = payload["parameters"].get("decoding") if isinstance(candidate, Mapping): source = candidate if not source: return cls() kwargs: dict[str, Any] = {} for field in cls.__dataclass_fields__.keys(): # type: ignore[attr-defined] if field in source: try: kwargs[field] = type(getattr(cls(), field))(source[field]) except (TypeError, ValueError): continue return cls(**kwargs) def to_dict(self) -> dict[str, Any]: return {field: getattr(self, field) for field in self.__dataclass_fields__.keys()} # type: ignore[attr-defined] @dataclass(frozen=True) class _ModelIO: """Snapshot of ONNX input and output metadata.""" inputs: tuple[Any, ...] outputs: tuple[Any, ...] class EndpointHandler: """Callable endpoint used by Hugging Face to drive inference.""" def __init__(self, path: str | None = None) -> None: self.model_dir = Path(path or Path(__file__).parent) self.session = self._load_session() self.io = self._capture_io() self.primary_input = self.io.inputs[0].name self.slow_input = self._find_input("slow_state") self.tokens_input = self._find_input("tokens") self._primary_dim = self._infer_primary_dim() self._text_encoder = _TextEncoder(self._primary_dim) self._tokenizer = _SimpleTokenizer() self._defaults = {} skip_inputs = {self.primary_input} if self.slow_input is not None: skip_inputs.add(self.slow_input) if self.tokens_input is not None: skip_inputs.add(self.tokens_input) for node in self.io.inputs: if node.name in skip_inputs: continue self._defaults[node.name] = self._zeros_like(node) if self.slow_input is not None: self._slow_fallback = self._zeros_like(self._input_map[self.slow_input]) else: self._slow_fallback = None if self.tokens_input is not None: token_node = self._input_map[self.tokens_input] self._token_sequence_length = self._infer_sequence_length(token_node) self._token_dtype = self._dtype_for(token_node) else: self._token_sequence_length = 0 self._token_dtype = np.int64 def _load_session(self): """Load the ONNX session, tolerating alternate filenames.""" ort = self._import_onnxruntime() preferred_names = ("model.onnx", "model_infer.onnx") for name in preferred_names: candidate = self.model_dir / name if candidate.exists(): return ort.InferenceSession(str(candidate), providers=["CPUExecutionProvider"]) available = sorted(str(p.name) for p in self.model_dir.glob("*.onnx")) if len(available) == 1: # Fall back to the lone ONNX artefact if it has a non-standard name. return ort.InferenceSession(str(self.model_dir / available[0]), providers=["CPUExecutionProvider"]) choices = ", ".join(available) or "" raise FileNotFoundError( "Could not locate any of %s in %s (available: %s)" % (", ".join(preferred_names), self.model_dir, choices) ) @staticmethod def _import_onnxruntime(): """Import :mod:`onnxruntime`, providing a helpful error if unavailable.""" spec = importlib.util.find_spec("onnxruntime") if spec is None: raise ModuleNotFoundError( "onnxruntime is required to load Noesis decoder ONNX graphs. " "Install it with 'pip install onnxruntime'." ) return importlib.import_module("onnxruntime") @property def _input_map(self) -> Mapping[str, Any]: return {node.name: node for node in self.io.inputs} def _capture_io(self) -> _ModelIO: return _ModelIO(inputs=tuple(self.session.get_inputs()), outputs=tuple(self.session.get_outputs())) def _find_input(self, target: str) -> Optional[str]: target = target.lower() for node in self.io.inputs: if node.name.lower() == target: return node.name return None def _infer_primary_dim(self) -> int: node = self._input_map[self.primary_input] for dim in reversed(node.shape): if isinstance(dim, int) and dim > 0: return dim # Conservative default matching TextEncoder256. return 256 def _infer_sequence_length(self, node: Any) -> int: for dim in reversed(getattr(node, "shape", [])): if isinstance(dim, int) and dim > 0: return dim return 1 @staticmethod def _onnx_type_to_numpy(type_str: str | None) -> np.dtype: mapping = { "tensor(float)": np.float32, "tensor(float16)": np.float16, "tensor(double)": np.float64, "tensor(int64)": np.int64, "tensor(int32)": np.int32, "tensor(int16)": np.int16, "tensor(int8)": np.int8, "tensor(uint8)": np.uint8, "tensor(bool)": np.bool_, } return mapping.get(type_str, np.float32) def _dtype_for(self, node: Any) -> np.dtype: return self._onnx_type_to_numpy(getattr(node, "type", None)) def _zeros_like(self, node: Any) -> np.ndarray: shape: list[int] = [] for dim in node.shape: if isinstance(dim, int) and dim > 0: shape.append(dim) else: shape.append(1) dtype = self._dtype_for(node) return np.zeros(shape, dtype=dtype) def _coerce_array(self, value: Any, *, node: Any, allow_empty: bool = False) -> np.ndarray: dtype = self._dtype_for(node) array = np.asarray(value, dtype=dtype) if array.size == 0 and not allow_empty: raise ValueError("Received an empty array; provide at least one value.") if array.ndim == 1: array = np.expand_dims(array, axis=0) elif array.ndim > 2: raise ValueError("Expected a 1D or batched 2D array; received shape %s" % (array.shape,)) if array.dtype != dtype: array = array.astype(dtype, copy=False) return array def _prepare_inputs(self, payload: Mapping[str, Any]) -> MutableMapping[str, np.ndarray]: psi = payload.get("psi") if psi is None: psi = ( payload.get("vector") or payload.get("psi_s") or payload.get("inputs") or payload.get("prompt") or payload.get("text") ) if psi is None: raise KeyError("Payload must include a 'psi' field containing the symbolic vector.") primary_node = self._input_map[self.primary_input] inputs: MutableMapping[str, np.ndarray] = { self.primary_input: self._vector_from_payload(psi, node=primary_node) } if self.slow_input is not None: slow_value = payload.get("slow_state") or payload.get("slow") or payload.get("state") if slow_value is None: inputs[self.slow_input] = self._slow_fallback.copy() else: inputs[self.slow_input] = self._coerce_array( slow_value, node=self._input_map[self.slow_input], allow_empty=True, ) for name, default in self._defaults.items(): inputs[name] = default.copy() return inputs def _vector_from_payload(self, value: Any, *, node: Any) -> np.ndarray: if isinstance(value, str): encoded = self._text_encoder.encode(value) return self._coerce_array(encoded, node=node) if isinstance(value, (list, tuple)) and value and all(isinstance(v, str) for v in value): encoded = self._text_encoder.encode(" ".join(value)) return self._coerce_array(encoded, node=node) return self._coerce_array(value, node=node) def _encode_tokens(self, text: str) -> tuple[np.ndarray, list[int]]: token_ids = self._tokenizer.encode(text) if self._token_sequence_length <= 0: array = np.asarray([token_ids], dtype=self._token_dtype) return array, token_ids length = min(len(token_ids), self._token_sequence_length) padded = np.full( (1, self._token_sequence_length), fill_value=self._tokenizer.pad_token_id, dtype=self._token_dtype, ) padded[0, :length] = np.asarray(token_ids[:length], dtype=self._token_dtype) return padded, token_ids[:length] @staticmethod def _candidate_seed(psi: np.ndarray) -> int: digest = hashlib.sha1(psi.tobytes()).digest() return int.from_bytes(digest[:4], "little", signed=False) def _build_candidates( self, psi_vector: np.ndarray, *, user_prompt: str | None, system_prompt: str | None, constraints: Mapping[str, Any] | None, ) -> tuple[list[str], str, list[str]]: descriptors = _summarise_intent(psi_vector) summary = ", ".join(descriptors) if descriptors else "balanced intent" observations = [ f"Interpretation: the symbolic intent emphasises {summary}.", f"Symbolic synopsis → {summary}.", ] if user_prompt: observations.append(f"{user_prompt.strip()}\nInsight: {summary}.") if system_prompt: observations.append(f"{system_prompt.strip()}\nDirective: honour {summary}.") if constraints: formatted = ", ".join(f"{key}={value}" for key, value in constraints.items()) observations.append(f"Constraints observed: {formatted}.") seed = self._candidate_seed(psi_vector.astype(np.float32, copy=False)) rng = random.Random(seed) rng.shuffle(observations) if not observations: observations = [f"Symbolic synopsis → {summary}."] return observations, summary, descriptors def _run_candidate(self, base_feed: Mapping[str, np.ndarray], tokens: np.ndarray) -> list[tuple[Any, np.ndarray]]: feed = {name: value for name, value in base_feed.items()} if self.tokens_input is not None: feed[self.tokens_input] = tokens outputs = self.session.run(None, feed) return list(zip(self.io.outputs, outputs)) @staticmethod def _extract_q_hat(outputs: Sequence[tuple[Any, np.ndarray]]) -> float: for node, value in outputs: if getattr(node, "name", "").lower() == "q_hat": return float(np.squeeze(np.asarray(value, dtype=np.float32))) # Fallback if the node name differs slightly. for node, value in outputs: if "q" in getattr(node, "name", "").lower(): return float(np.squeeze(np.asarray(value, dtype=np.float32))) return float("-inf") @staticmethod def _format_output(name: str, value: np.ndarray) -> Any: value = np.asarray(value, dtype=np.float32) value = np.nan_to_num(value, nan=0.0, posinf=0.0, neginf=0.0) squeezed = np.squeeze(value) if squeezed.ndim == 0: return float(squeezed) return squeezed.tolist() def __call__(self, data: Mapping[str, Any]) -> Mapping[str, Any]: payload = data.get("inputs", data) if not isinstance(payload, Mapping): payload = {"psi": payload} feed = self._prepare_inputs(payload) psi_vector = np.asarray(feed[self.primary_input], dtype=np.float32).reshape(-1) state_constraints = payload.get("constraints") if not isinstance(state_constraints, Mapping): state_constraints = None decoding = _DecodingParams.from_payload(payload) system_prompt = payload.get("system_prompt") user_prompt = payload.get("user_prompt") candidates, summary, descriptors = self._build_candidates( psi_vector, user_prompt=user_prompt if isinstance(user_prompt, str) else None, system_prompt=system_prompt if isinstance(system_prompt, str) else None, constraints=state_constraints, ) best_text: str | None = None best_tokens: list[int] = [] best_outputs: list[tuple[Any, np.ndarray]] | None = None best_quality = float("-inf") limit = min(len(candidates), max(decoding.beam_size, 1)) for candidate in candidates[:limit]: if self.tokens_input is None: break token_array, token_ids = self._encode_tokens(candidate) outputs = self._run_candidate(feed, token_array) quality = self._extract_q_hat(outputs) if quality > best_quality: best_quality = quality best_text = candidate best_tokens = token_ids best_outputs = outputs if quality >= decoding.stop_quality: break if best_outputs is None: # Fall back to a single pass using the prepared feed. outputs = self.session.run(None, feed) best_outputs = list(zip(self.io.outputs, outputs)) if best_text is None: best_text = f"Symbolic synopsis → {summary}." if best_quality == float("-inf"): best_quality = self._extract_q_hat(best_outputs) formatted = { node.name: self._format_output(node.name, value) for node, value in best_outputs } if not np.isfinite(best_quality): best_quality = 0.0 best_quality = float(best_quality) if best_text is None: best_text = f"Symbolic synopsis → {summary}." response = { "text": best_text, "tokens": best_tokens, "quality": best_quality, "q_hat": best_quality, "provider": _DEFAULT_PROVIDER, "model": _DEFAULT_MODEL, "metadata": { "summary": summary, "descriptors": descriptors, "constraints": state_constraints or {}, "decoding": decoding.to_dict(), }, } response.update(formatted) return response __all__ = ["EndpointHandler"]