|
|
"""Custom inference handler for Hugging Face Inference Endpoints.
|
|
|
|
|
|
This module exposes :class:`EndpointHandler`, the entrypoint used by the
|
|
|
Hugging Face serving stack when ``--task custom`` is selected. The handler
|
|
|
loads the exported Noesis decoder ONNX graph and accepts symbolic intent
|
|
|
vectors (``psi``) along with an optional ``slow_state`` memory tensor. The
|
|
|
outputs mirror the values produced by the training runtime:
|
|
|
|
|
|
* ``z_out`` – semantic embedding projected back into symbolic space.
|
|
|
* ``choice``, ``pain``, ``memory`` and ``quality`` – diagnostic scalars.
|
|
|
* ``slow_state`` – updated slow memory tensor suitable for recurrent usage.
|
|
|
|
|
|
The handler is intentionally lightweight so it can run without the rest of the
|
|
|
AletheiaEngine Python package being installed.
|
|
|
"""
|
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
import importlib
|
|
|
import importlib.util
|
|
|
from dataclasses import dataclass
|
|
|
from pathlib import Path
|
|
|
import hashlib
|
|
|
import re
|
|
|
from typing import Any, Mapping, MutableMapping, Optional, Sequence, Tuple
|
|
|
|
|
|
import numpy as np
|
|
|
|
|
|
|
|
|
_WORD_RE = re.compile(r"\w+", re.UNICODE)
|
|
|
|
|
|
_INTENT_VOCAB = [
|
|
|
"clarity",
|
|
|
"empathy",
|
|
|
"analysis",
|
|
|
"evidence",
|
|
|
"caution",
|
|
|
"curiosity",
|
|
|
"context",
|
|
|
"precision",
|
|
|
"ethics",
|
|
|
"resilience",
|
|
|
"coherence",
|
|
|
"safety",
|
|
|
"humility",
|
|
|
"breadth",
|
|
|
"depth",
|
|
|
"innovation",
|
|
|
"structure",
|
|
|
"rigour",
|
|
|
"balance",
|
|
|
"confidence",
|
|
|
]
|
|
|
|
|
|
_DEFAULT_PROVIDER = "aletheia-noesis"
|
|
|
_DEFAULT_MODEL = "noesis-transformer-onnx"
|
|
|
|
|
|
|
|
|
class _TextEncoder:
|
|
|
"""Deterministic text → vector encoder.
|
|
|
|
|
|
The Hugging Face Inference Endpoints frequently pass user prompts as
|
|
|
strings via the ``inputs`` field. The Noesis decoder, however, expects a
|
|
|
symbolic vector (``psi``) as input. To provide a graceful fallback the
|
|
|
handler lazily converts short text prompts into a stable float32 vector by
|
|
|
hashing tokens onto a hypersphere. This mirrors the lightweight
|
|
|
``TextEncoder256`` implementation bundled with the full AletheiaEngine
|
|
|
package while avoiding a heavy import dependency inside the endpoint
|
|
|
container.
|
|
|
"""
|
|
|
|
|
|
def __init__(self, dim: int) -> None:
|
|
|
self.dim = dim
|
|
|
|
|
|
@staticmethod
|
|
|
def _tokens(text: str) -> list[str]:
|
|
|
return [tok.lower() for tok in _WORD_RE.findall(text)]
|
|
|
|
|
|
@staticmethod
|
|
|
def _seed(tok: str) -> int:
|
|
|
|
|
|
value = 2166136261
|
|
|
for byte in tok.encode("utf-8"):
|
|
|
value ^= byte
|
|
|
value = (value * 16777619) & 0xFFFFFFFF
|
|
|
return int(value)
|
|
|
|
|
|
def encode(self, text: str) -> np.ndarray:
|
|
|
tokens = self._tokens(text)
|
|
|
if not tokens:
|
|
|
return np.zeros((1, self.dim), dtype=np.float32)
|
|
|
|
|
|
vecs = []
|
|
|
for tok in tokens:
|
|
|
rs = np.random.RandomState(self._seed(tok))
|
|
|
embedding = rs.normal(0.0, 1.0, size=(self.dim,)).astype(np.float32)
|
|
|
norm = float(np.linalg.norm(embedding)) or 1.0
|
|
|
vecs.append(embedding / norm)
|
|
|
|
|
|
stacked = np.stack(vecs, axis=0)
|
|
|
pooled = stacked.mean(axis=0, dtype=np.float32, keepdims=True)
|
|
|
pooled_norm = float(np.linalg.norm(pooled)) or 1.0
|
|
|
return pooled / pooled_norm
|
|
|
|
|
|
|
|
|
class _SimpleTokenizer:
|
|
|
"""Minimal tokenizer mirroring the reference Noesis runtime."""
|
|
|
|
|
|
def __init__(self) -> None:
|
|
|
special_tokens = ["<pad>", "<bos>", "<eos>", "<unk>"]
|
|
|
alphabet = list("abcdefghijklmnopqrstuvwxyz0123456789 .,;:'\"!?-\n")
|
|
|
self._tokens = special_tokens + alphabet
|
|
|
self._token_to_id = {token: idx for idx, token in enumerate(self._tokens)}
|
|
|
|
|
|
@property
|
|
|
def pad_token_id(self) -> int:
|
|
|
return 0
|
|
|
|
|
|
@property
|
|
|
def bos_token_id(self) -> int:
|
|
|
return 1
|
|
|
|
|
|
@property
|
|
|
def eos_token_id(self) -> int:
|
|
|
return 2
|
|
|
|
|
|
@property
|
|
|
def unk_token_id(self) -> int:
|
|
|
return 3
|
|
|
|
|
|
def encode(self, text: str) -> list[int]:
|
|
|
tokens = [self.bos_token_id]
|
|
|
for char in text:
|
|
|
tokens.append(self._token_to_id.get(char.lower(), self.unk_token_id))
|
|
|
tokens.append(self.eos_token_id)
|
|
|
return tokens
|
|
|
|
|
|
|
|
|
def _summarise_intent(psi: Sequence[float], top_k: int = 4) -> list[str]:
|
|
|
"""Convert strongest symbolic dimensions into descriptors."""
|
|
|
|
|
|
vector = np.asarray(list(psi), dtype=np.float32).reshape(-1)
|
|
|
if vector.size == 0:
|
|
|
return []
|
|
|
|
|
|
k = min(top_k, vector.size)
|
|
|
magnitudes = np.abs(vector)
|
|
|
top_indices = magnitudes.argsort()[::-1][:k]
|
|
|
summary: list[str] = []
|
|
|
for index in top_indices.tolist():
|
|
|
descriptor = _INTENT_VOCAB[index % len(_INTENT_VOCAB)]
|
|
|
direction = "elevated" if vector[index] >= 0 else "attenuated"
|
|
|
summary.append(f"{descriptor} ({direction}, |ψ|={magnitudes[index]:.2f})")
|
|
|
return summary
|
|
|
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
|
class _DecodingParams:
|
|
|
beam_size: int = 6
|
|
|
temperature: float = 0.8
|
|
|
top_p: float = 0.9
|
|
|
max_new_tokens: int = 256
|
|
|
stop_quality: float = 0.6
|
|
|
|
|
|
@classmethod
|
|
|
def from_payload(cls, payload: Mapping[str, Any]) -> "_DecodingParams":
|
|
|
source: Mapping[str, Any] | None = None
|
|
|
if "decoding" in payload and isinstance(payload["decoding"], Mapping):
|
|
|
source = payload["decoding"]
|
|
|
elif "parameters" in payload and isinstance(payload["parameters"], Mapping):
|
|
|
candidate = payload["parameters"].get("decoding")
|
|
|
if isinstance(candidate, Mapping):
|
|
|
source = candidate
|
|
|
|
|
|
if not source:
|
|
|
return cls()
|
|
|
|
|
|
kwargs: dict[str, Any] = {}
|
|
|
for field in cls.__dataclass_fields__.keys():
|
|
|
if field in source:
|
|
|
try:
|
|
|
kwargs[field] = type(getattr(cls(), field))(source[field])
|
|
|
except (TypeError, ValueError):
|
|
|
continue
|
|
|
return cls(**kwargs)
|
|
|
|
|
|
def to_dict(self) -> dict[str, Any]:
|
|
|
return {field: getattr(self, field) for field in self.__dataclass_fields__.keys()}
|
|
|
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
|
class _ModelIO:
|
|
|
"""Snapshot of ONNX input and output metadata."""
|
|
|
|
|
|
inputs: tuple[Any, ...]
|
|
|
outputs: tuple[Any, ...]
|
|
|
|
|
|
|
|
|
class EndpointHandler:
|
|
|
"""Callable endpoint used by Hugging Face to drive inference."""
|
|
|
|
|
|
def __init__(self, path: str | None = None) -> None:
|
|
|
self.model_dir = Path(path or Path(__file__).parent)
|
|
|
self.session = self._load_session()
|
|
|
self.io = self._capture_io()
|
|
|
|
|
|
self.primary_input = self.io.inputs[0].name
|
|
|
self.slow_input = self._find_input("slow_state")
|
|
|
self.tokens_input = self._find_input("tokens")
|
|
|
self._primary_dim = self._infer_primary_dim()
|
|
|
self._text_encoder = _TextEncoder(self._primary_dim)
|
|
|
self._tokenizer = _SimpleTokenizer()
|
|
|
self._defaults = {}
|
|
|
skip_inputs = {self.primary_input}
|
|
|
if self.slow_input is not None:
|
|
|
skip_inputs.add(self.slow_input)
|
|
|
if self.tokens_input is not None:
|
|
|
skip_inputs.add(self.tokens_input)
|
|
|
for node in self.io.inputs:
|
|
|
if node.name in skip_inputs:
|
|
|
continue
|
|
|
self._defaults[node.name] = self._zeros_like(node)
|
|
|
if self.slow_input is not None:
|
|
|
self._slow_fallback = self._zeros_like(self._input_map[self.slow_input])
|
|
|
else:
|
|
|
self._slow_fallback = None
|
|
|
if self.tokens_input is not None:
|
|
|
token_node = self._input_map[self.tokens_input]
|
|
|
self._token_sequence_length = self._infer_sequence_length(token_node)
|
|
|
self._token_dtype = self._dtype_for(token_node)
|
|
|
else:
|
|
|
self._token_sequence_length = 0
|
|
|
self._token_dtype = np.int64
|
|
|
|
|
|
def _load_session(self):
|
|
|
"""Load the ONNX session, tolerating alternate filenames."""
|
|
|
|
|
|
ort = self._import_onnxruntime()
|
|
|
preferred_names = ("model.onnx", "model_infer.onnx")
|
|
|
for name in preferred_names:
|
|
|
candidate = self.model_dir / name
|
|
|
if candidate.exists():
|
|
|
return ort.InferenceSession(str(candidate), providers=["CPUExecutionProvider"])
|
|
|
|
|
|
available = sorted(str(p.name) for p in self.model_dir.glob("*.onnx"))
|
|
|
if len(available) == 1:
|
|
|
|
|
|
return ort.InferenceSession(str(self.model_dir / available[0]), providers=["CPUExecutionProvider"])
|
|
|
|
|
|
choices = ", ".join(available) or "<none>"
|
|
|
raise FileNotFoundError(
|
|
|
"Could not locate any of %s in %s (available: %s)"
|
|
|
% (", ".join(preferred_names), self.model_dir, choices)
|
|
|
)
|
|
|
|
|
|
@staticmethod
|
|
|
def _import_onnxruntime():
|
|
|
"""Import :mod:`onnxruntime`, providing a helpful error if unavailable."""
|
|
|
|
|
|
spec = importlib.util.find_spec("onnxruntime")
|
|
|
if spec is None:
|
|
|
raise ModuleNotFoundError(
|
|
|
"onnxruntime is required to load Noesis decoder ONNX graphs. "
|
|
|
"Install it with 'pip install onnxruntime'."
|
|
|
)
|
|
|
return importlib.import_module("onnxruntime")
|
|
|
|
|
|
@property
|
|
|
def _input_map(self) -> Mapping[str, Any]:
|
|
|
return {node.name: node for node in self.io.inputs}
|
|
|
|
|
|
def _capture_io(self) -> _ModelIO:
|
|
|
return _ModelIO(inputs=tuple(self.session.get_inputs()), outputs=tuple(self.session.get_outputs()))
|
|
|
|
|
|
def _find_input(self, target: str) -> Optional[str]:
|
|
|
target = target.lower()
|
|
|
for node in self.io.inputs:
|
|
|
if node.name.lower() == target:
|
|
|
return node.name
|
|
|
return None
|
|
|
|
|
|
def _infer_primary_dim(self) -> int:
|
|
|
node = self._input_map[self.primary_input]
|
|
|
for dim in reversed(node.shape):
|
|
|
if isinstance(dim, int) and dim > 0:
|
|
|
return dim
|
|
|
|
|
|
return 256
|
|
|
|
|
|
def _infer_sequence_length(self, node: Any) -> int:
|
|
|
for dim in reversed(getattr(node, "shape", [])):
|
|
|
if isinstance(dim, int) and dim > 0:
|
|
|
return dim
|
|
|
return 1
|
|
|
|
|
|
@staticmethod
|
|
|
def _onnx_type_to_numpy(type_str: str | None) -> np.dtype:
|
|
|
mapping = {
|
|
|
"tensor(float)": np.float32,
|
|
|
"tensor(float16)": np.float16,
|
|
|
"tensor(double)": np.float64,
|
|
|
"tensor(int64)": np.int64,
|
|
|
"tensor(int32)": np.int32,
|
|
|
"tensor(int16)": np.int16,
|
|
|
"tensor(int8)": np.int8,
|
|
|
"tensor(uint8)": np.uint8,
|
|
|
"tensor(bool)": np.bool_,
|
|
|
}
|
|
|
return mapping.get(type_str, np.float32)
|
|
|
|
|
|
def _dtype_for(self, node: Any) -> np.dtype:
|
|
|
return self._onnx_type_to_numpy(getattr(node, "type", None))
|
|
|
|
|
|
def _zeros_like(self, node: Any) -> np.ndarray:
|
|
|
shape: list[int] = []
|
|
|
for dim in node.shape:
|
|
|
if isinstance(dim, int) and dim > 0:
|
|
|
shape.append(dim)
|
|
|
else:
|
|
|
shape.append(1)
|
|
|
dtype = self._dtype_for(node)
|
|
|
return np.zeros(shape, dtype=dtype)
|
|
|
|
|
|
def _coerce_array(self, value: Any, *, node: Any, allow_empty: bool = False) -> np.ndarray:
|
|
|
dtype = self._dtype_for(node)
|
|
|
array = np.asarray(value, dtype=dtype)
|
|
|
if array.size == 0 and not allow_empty:
|
|
|
raise ValueError("Received an empty array; provide at least one value.")
|
|
|
if array.ndim == 1:
|
|
|
array = np.expand_dims(array, axis=0)
|
|
|
elif array.ndim > 2:
|
|
|
raise ValueError("Expected a 1D or batched 2D array; received shape %s" % (array.shape,))
|
|
|
if array.dtype != dtype:
|
|
|
array = array.astype(dtype, copy=False)
|
|
|
return array
|
|
|
|
|
|
def _prepare_inputs(self, payload: Mapping[str, Any]) -> MutableMapping[str, np.ndarray]:
|
|
|
psi = payload.get("psi")
|
|
|
if psi is None:
|
|
|
psi = (
|
|
|
payload.get("vector")
|
|
|
or payload.get("psi_s")
|
|
|
or payload.get("inputs")
|
|
|
or payload.get("prompt")
|
|
|
or payload.get("text")
|
|
|
)
|
|
|
if psi is None:
|
|
|
raise KeyError("Payload must include a 'psi' field containing the symbolic vector.")
|
|
|
|
|
|
primary_node = self._input_map[self.primary_input]
|
|
|
inputs: MutableMapping[str, np.ndarray] = {
|
|
|
self.primary_input: self._vector_from_payload(psi, node=primary_node)
|
|
|
}
|
|
|
|
|
|
if self.slow_input is not None:
|
|
|
slow_value = payload.get("slow_state") or payload.get("slow") or payload.get("state")
|
|
|
if slow_value is None:
|
|
|
inputs[self.slow_input] = self._slow_fallback.copy()
|
|
|
else:
|
|
|
inputs[self.slow_input] = self._coerce_array(
|
|
|
slow_value,
|
|
|
node=self._input_map[self.slow_input],
|
|
|
allow_empty=True,
|
|
|
)
|
|
|
|
|
|
for name, default in self._defaults.items():
|
|
|
inputs[name] = default.copy()
|
|
|
|
|
|
return inputs
|
|
|
|
|
|
def _vector_from_payload(self, value: Any, *, node: Any) -> np.ndarray:
|
|
|
if isinstance(value, str):
|
|
|
encoded = self._text_encoder.encode(value)
|
|
|
return self._coerce_array(encoded, node=node)
|
|
|
|
|
|
if isinstance(value, (list, tuple)) and value and all(isinstance(v, str) for v in value):
|
|
|
encoded = self._text_encoder.encode(" ".join(value))
|
|
|
return self._coerce_array(encoded, node=node)
|
|
|
|
|
|
return self._coerce_array(value, node=node)
|
|
|
|
|
|
@staticmethod
|
|
|
def _candidate_seed(psi: np.ndarray) -> int:
|
|
|
digest = hashlib.sha1(psi.tobytes()).digest()
|
|
|
return int.from_bytes(digest[:4], "little", signed=False)
|
|
|
|
|
|
def _token_array_from_ids(self, token_ids: Sequence[int]) -> np.ndarray:
|
|
|
ids = list(token_ids)
|
|
|
if self._token_sequence_length <= 0:
|
|
|
return np.asarray([ids], dtype=self._token_dtype)
|
|
|
|
|
|
padded = np.full(
|
|
|
(1, self._token_sequence_length),
|
|
|
fill_value=self._tokenizer.pad_token_id,
|
|
|
dtype=self._token_dtype,
|
|
|
)
|
|
|
length = min(len(ids), self._token_sequence_length)
|
|
|
if length > 0:
|
|
|
padded[0, :length] = np.asarray(ids[:length], dtype=self._token_dtype)
|
|
|
return padded
|
|
|
|
|
|
def _run_candidate(self, base_feed: Mapping[str, np.ndarray], tokens: Sequence[int]) -> list[tuple[Any, np.ndarray]]:
|
|
|
feed = {
|
|
|
name: (value.copy() if isinstance(value, np.ndarray) else value)
|
|
|
for name, value in base_feed.items()
|
|
|
}
|
|
|
if self.tokens_input is not None:
|
|
|
feed[self.tokens_input] = self._token_array_from_ids(tokens)
|
|
|
outputs = self.session.run(None, feed)
|
|
|
return list(zip(self.io.outputs, outputs))
|
|
|
|
|
|
@staticmethod
|
|
|
def _extract_logits(outputs: Sequence[tuple[Any, np.ndarray]]) -> Optional[np.ndarray]:
|
|
|
for node, value in outputs:
|
|
|
if getattr(node, "name", "").lower() == "logits":
|
|
|
return np.asarray(value, dtype=np.float32)
|
|
|
if outputs:
|
|
|
return np.asarray(outputs[0][1], dtype=np.float32)
|
|
|
return None
|
|
|
|
|
|
@staticmethod
|
|
|
def _sample_next_token(
|
|
|
logits: np.ndarray,
|
|
|
decoding: _DecodingParams,
|
|
|
rng: np.random.Generator,
|
|
|
) -> int:
|
|
|
vector = np.asarray(logits, dtype=np.float64).reshape(-1)
|
|
|
temperature = max(float(decoding.temperature), 1e-5)
|
|
|
top_p = float(decoding.top_p)
|
|
|
|
|
|
if temperature <= 1e-5 or not np.isfinite(vector).any():
|
|
|
return int(int(np.argmax(vector)))
|
|
|
|
|
|
stabilized = vector / temperature
|
|
|
stabilized -= np.max(stabilized)
|
|
|
probs = np.exp(stabilized)
|
|
|
probs = np.nan_to_num(probs, nan=0.0, posinf=0.0, neginf=0.0)
|
|
|
total = probs.sum()
|
|
|
if total <= 0.0:
|
|
|
return int(np.argmax(vector))
|
|
|
probs /= total
|
|
|
|
|
|
if top_p <= 0.0:
|
|
|
return int(np.argmax(probs))
|
|
|
|
|
|
if 0.0 < top_p < 1.0:
|
|
|
sorted_indices = np.argsort(-probs)
|
|
|
sorted_probs = probs[sorted_indices]
|
|
|
cumulative = np.cumsum(sorted_probs)
|
|
|
mask = cumulative <= top_p
|
|
|
if mask.size > 0:
|
|
|
mask[0] = True
|
|
|
filtered_indices = sorted_indices[mask]
|
|
|
filtered_probs = sorted_probs[mask]
|
|
|
filtered_total = filtered_probs.sum()
|
|
|
if filtered_total <= 0.0:
|
|
|
filtered_indices = sorted_indices
|
|
|
filtered_probs = sorted_probs
|
|
|
filtered_total = filtered_probs.sum()
|
|
|
filtered_probs = filtered_probs / filtered_total
|
|
|
choice = rng.choice(len(filtered_indices), p=filtered_probs)
|
|
|
return int(filtered_indices[int(choice)])
|
|
|
|
|
|
choice = rng.choice(len(probs), p=probs)
|
|
|
return int(choice)
|
|
|
|
|
|
def _generate_sequence(
|
|
|
self,
|
|
|
base_feed: Mapping[str, np.ndarray],
|
|
|
*,
|
|
|
decoding: _DecodingParams,
|
|
|
seed: int,
|
|
|
) -> Optional[Tuple[str, list[int], float, list[tuple[Any, np.ndarray]], int]]:
|
|
|
if self.tokens_input is None:
|
|
|
return None
|
|
|
|
|
|
rng = np.random.default_rng(seed)
|
|
|
token_ids: list[int] = [self._tokenizer.bos_token_id]
|
|
|
quality = float("-inf")
|
|
|
formatted_outputs: list[tuple[Any, np.ndarray]] | None = None
|
|
|
steps = 0
|
|
|
|
|
|
max_steps = max(decoding.max_new_tokens, 1)
|
|
|
for _ in range(max_steps):
|
|
|
outputs = self._run_candidate(base_feed, token_ids)
|
|
|
logits = self._extract_logits(outputs)
|
|
|
if logits is None:
|
|
|
break
|
|
|
last_index = min(len(token_ids) - 1, logits.shape[1] - 1)
|
|
|
next_logits = logits[0, last_index]
|
|
|
next_token = self._sample_next_token(next_logits, decoding, rng)
|
|
|
token_ids.append(int(next_token))
|
|
|
steps += 1
|
|
|
|
|
|
outputs = self._run_candidate(base_feed, token_ids)
|
|
|
formatted_outputs = outputs
|
|
|
quality = self._extract_q_hat(outputs)
|
|
|
|
|
|
if token_ids[-1] == self._tokenizer.eos_token_id:
|
|
|
break
|
|
|
if self._token_sequence_length > 0 and len(token_ids) >= self._token_sequence_length:
|
|
|
break
|
|
|
|
|
|
if formatted_outputs is None:
|
|
|
return None
|
|
|
|
|
|
text = self._tokenizer.decode(token_ids)
|
|
|
return text, token_ids, float(quality), formatted_outputs, steps
|
|
|
|
|
|
@staticmethod
|
|
|
def _extract_q_hat(outputs: Sequence[tuple[Any, np.ndarray]]) -> float:
|
|
|
for node, value in outputs:
|
|
|
if getattr(node, "name", "").lower() == "q_hat":
|
|
|
return float(np.squeeze(np.asarray(value, dtype=np.float32)))
|
|
|
|
|
|
for node, value in outputs:
|
|
|
if "q" in getattr(node, "name", "").lower():
|
|
|
return float(np.squeeze(np.asarray(value, dtype=np.float32)))
|
|
|
return float("-inf")
|
|
|
|
|
|
@staticmethod
|
|
|
def _format_output(name: str, value: np.ndarray) -> Any:
|
|
|
value = np.asarray(value, dtype=np.float32)
|
|
|
value = np.nan_to_num(value, nan=0.0, posinf=0.0, neginf=0.0)
|
|
|
squeezed = np.squeeze(value)
|
|
|
if squeezed.ndim == 0:
|
|
|
return float(squeezed)
|
|
|
return squeezed.tolist()
|
|
|
|
|
|
def __call__(self, data: Mapping[str, Any]) -> Mapping[str, Any]:
|
|
|
payload = data.get("inputs", data)
|
|
|
if not isinstance(payload, Mapping):
|
|
|
payload = {"psi": payload}
|
|
|
|
|
|
feed = self._prepare_inputs(payload)
|
|
|
psi_vector = np.asarray(feed[self.primary_input], dtype=np.float32).reshape(-1)
|
|
|
state_constraints = payload.get("constraints")
|
|
|
if not isinstance(state_constraints, Mapping):
|
|
|
state_constraints = None
|
|
|
decoding = _DecodingParams.from_payload(payload)
|
|
|
system_prompt = payload.get("system_prompt")
|
|
|
user_prompt = payload.get("user_prompt")
|
|
|
|
|
|
descriptors = _summarise_intent(psi_vector)
|
|
|
summary = ", ".join(descriptors) if descriptors else "balanced intent"
|
|
|
|
|
|
best_candidate: Optional[Tuple[str, list[int], float, list[tuple[Any, np.ndarray]], int]] = None
|
|
|
seeds: list[int] = []
|
|
|
|
|
|
if self.tokens_input is not None:
|
|
|
beams = max(decoding.beam_size, 1)
|
|
|
base_seed = self._candidate_seed(psi_vector)
|
|
|
for beam_idx in range(beams):
|
|
|
seed = base_seed + beam_idx
|
|
|
seeds.append(seed)
|
|
|
candidate = self._generate_sequence(
|
|
|
feed,
|
|
|
decoding=decoding,
|
|
|
seed=seed,
|
|
|
)
|
|
|
if candidate is None:
|
|
|
continue
|
|
|
text, token_ids, quality, outputs, steps = candidate
|
|
|
if (
|
|
|
best_candidate is None
|
|
|
or quality > best_candidate[2]
|
|
|
):
|
|
|
best_candidate = candidate
|
|
|
if quality >= decoding.stop_quality:
|
|
|
break
|
|
|
|
|
|
if best_candidate is None:
|
|
|
outputs = self.session.run(None, feed)
|
|
|
formatted_outputs = list(zip(self.io.outputs, outputs))
|
|
|
quality = self._extract_q_hat(formatted_outputs)
|
|
|
text = f"Symbolic synopsis → {summary}."
|
|
|
token_ids: list[int] = []
|
|
|
steps = 0
|
|
|
else:
|
|
|
text, token_ids, quality, formatted_outputs, steps = best_candidate
|
|
|
|
|
|
formatted = {
|
|
|
node.name: self._format_output(node.name, value)
|
|
|
for node, value in formatted_outputs
|
|
|
}
|
|
|
|
|
|
if not np.isfinite(quality):
|
|
|
quality = 0.0
|
|
|
quality = float(quality)
|
|
|
|
|
|
metadata = {
|
|
|
"summary": summary,
|
|
|
"descriptors": descriptors,
|
|
|
"constraints": state_constraints or {},
|
|
|
"decoding": decoding.to_dict(),
|
|
|
"seeds": seeds,
|
|
|
"steps": steps,
|
|
|
"system_prompt": system_prompt if isinstance(system_prompt, str) else None,
|
|
|
"user_prompt": user_prompt if isinstance(user_prompt, str) else None,
|
|
|
}
|
|
|
|
|
|
response = {
|
|
|
"text": text,
|
|
|
"tokens": token_ids,
|
|
|
"quality": quality,
|
|
|
"q_hat": quality,
|
|
|
"provider": _DEFAULT_PROVIDER,
|
|
|
"model": _DEFAULT_MODEL,
|
|
|
"metadata": metadata,
|
|
|
}
|
|
|
response.update(formatted)
|
|
|
return response
|
|
|
|
|
|
|
|
|
__all__ = ["EndpointHandler"]
|
|
|
|