|
|
"""Custom inference handler for Hugging Face Inference Endpoints. |
|
|
|
|
|
This module exposes :class:`EndpointHandler`, the entrypoint used by the |
|
|
Hugging Face serving stack when ``--task custom`` is selected. The handler |
|
|
loads the exported Noesis decoder ONNX graph and accepts symbolic intent |
|
|
vectors (``psi``) along with an optional ``slow_state`` memory tensor. The |
|
|
outputs mirror the values produced by the training runtime: |
|
|
|
|
|
* ``z_out`` – semantic embedding projected back into symbolic space. |
|
|
* ``choice``, ``pain``, ``memory`` and ``quality`` – diagnostic scalars. |
|
|
* ``slow_state`` – updated slow memory tensor suitable for recurrent usage. |
|
|
|
|
|
The handler is intentionally lightweight so it can run without the rest of the |
|
|
AletheiaEngine Python package being installed. |
|
|
""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import importlib |
|
|
import importlib.util |
|
|
from dataclasses import dataclass |
|
|
from pathlib import Path |
|
|
import hashlib |
|
|
import re |
|
|
from typing import Any, Mapping, MutableMapping, Optional, Sequence, Tuple |
|
|
|
|
|
import numpy as np |
|
|
|
|
|
|
|
|
_WORD_RE = re.compile(r"\w+", re.UNICODE) |
|
|
|
|
|
_INTENT_VOCAB = [ |
|
|
"clarity", |
|
|
"empathy", |
|
|
"analysis", |
|
|
"evidence", |
|
|
"caution", |
|
|
"curiosity", |
|
|
"context", |
|
|
"precision", |
|
|
"ethics", |
|
|
"resilience", |
|
|
"coherence", |
|
|
"safety", |
|
|
"humility", |
|
|
"breadth", |
|
|
"depth", |
|
|
"innovation", |
|
|
"structure", |
|
|
"rigour", |
|
|
"balance", |
|
|
"confidence", |
|
|
] |
|
|
|
|
|
_DEFAULT_PROVIDER = "aletheia-noesis" |
|
|
_DEFAULT_MODEL = "noesis-transformer-onnx" |
|
|
|
|
|
|
|
|
class _TextEncoder: |
|
|
"""Deterministic text → vector encoder. |
|
|
|
|
|
The Hugging Face Inference Endpoints frequently pass user prompts as |
|
|
strings via the ``inputs`` field. The Noesis decoder, however, expects a |
|
|
symbolic vector (``psi``) as input. To provide a graceful fallback the |
|
|
handler lazily converts short text prompts into a stable float32 vector by |
|
|
hashing tokens onto a hypersphere. This mirrors the lightweight |
|
|
``TextEncoder256`` implementation bundled with the full AletheiaEngine |
|
|
package while avoiding a heavy import dependency inside the endpoint |
|
|
container. |
|
|
""" |
|
|
|
|
|
def __init__(self, dim: int) -> None: |
|
|
self.dim = dim |
|
|
|
|
|
@staticmethod |
|
|
def _tokens(text: str) -> list[str]: |
|
|
return [tok.lower() for tok in _WORD_RE.findall(text)] |
|
|
|
|
|
@staticmethod |
|
|
def _seed(tok: str) -> int: |
|
|
|
|
|
value = 2166136261 |
|
|
for byte in tok.encode("utf-8"): |
|
|
value ^= byte |
|
|
value = (value * 16777619) & 0xFFFFFFFF |
|
|
return int(value) |
|
|
|
|
|
def encode(self, text: str) -> np.ndarray: |
|
|
tokens = self._tokens(text) |
|
|
if not tokens: |
|
|
return np.zeros((1, self.dim), dtype=np.float32) |
|
|
|
|
|
vecs = [] |
|
|
for tok in tokens: |
|
|
rs = np.random.RandomState(self._seed(tok)) |
|
|
embedding = rs.normal(0.0, 1.0, size=(self.dim,)).astype(np.float32) |
|
|
norm = float(np.linalg.norm(embedding)) or 1.0 |
|
|
vecs.append(embedding / norm) |
|
|
|
|
|
stacked = np.stack(vecs, axis=0) |
|
|
pooled = stacked.mean(axis=0, dtype=np.float32, keepdims=True) |
|
|
pooled_norm = float(np.linalg.norm(pooled)) or 1.0 |
|
|
return pooled / pooled_norm |
|
|
|
|
|
|
|
|
class _SimpleTokenizer: |
|
|
"""Minimal tokenizer mirroring the reference Noesis runtime.""" |
|
|
|
|
|
def __init__(self) -> None: |
|
|
special_tokens = ["<pad>", "<bos>", "<eos>", "<unk>"] |
|
|
alphabet = list("abcdefghijklmnopqrstuvwxyz0123456789 .,;:'\"!?-\n") |
|
|
self._tokens = special_tokens + alphabet |
|
|
self._token_to_id = {token: idx for idx, token in enumerate(self._tokens)} |
|
|
|
|
|
@property |
|
|
def pad_token_id(self) -> int: |
|
|
return 0 |
|
|
|
|
|
@property |
|
|
def bos_token_id(self) -> int: |
|
|
return 1 |
|
|
|
|
|
@property |
|
|
def eos_token_id(self) -> int: |
|
|
return 2 |
|
|
|
|
|
@property |
|
|
def unk_token_id(self) -> int: |
|
|
return 3 |
|
|
|
|
|
def encode(self, text: str) -> list[int]: |
|
|
tokens = [self.bos_token_id] |
|
|
for char in text: |
|
|
tokens.append(self._token_to_id.get(char.lower(), self.unk_token_id)) |
|
|
tokens.append(self.eos_token_id) |
|
|
return tokens |
|
|
|
|
|
def decode(self, token_ids: Sequence[int]) -> str: |
|
|
"""Convert token IDs back into a text string.""" |
|
|
|
|
|
characters: list[str] = [] |
|
|
for idx in token_ids: |
|
|
if idx == self.eos_token_id: |
|
|
break |
|
|
if idx in {self.pad_token_id, self.bos_token_id}: |
|
|
continue |
|
|
|
|
|
if 0 <= idx < len(self._tokens): |
|
|
token = self._tokens[idx] |
|
|
if token not in {"<pad>", "<bos>", "<eos>", "<unk>"}: |
|
|
characters.append(token) |
|
|
else: |
|
|
characters.append("?") |
|
|
else: |
|
|
characters.append("?") |
|
|
|
|
|
return "".join(characters) |
|
|
|
|
|
|
|
|
def _summarise_intent(psi: Sequence[float], top_k: int = 4) -> list[str]: |
|
|
"""Convert strongest symbolic dimensions into descriptors.""" |
|
|
|
|
|
vector = np.asarray(list(psi), dtype=np.float32).reshape(-1) |
|
|
if vector.size == 0: |
|
|
return [] |
|
|
|
|
|
k = min(top_k, vector.size) |
|
|
magnitudes = np.abs(vector) |
|
|
top_indices = magnitudes.argsort()[::-1][:k] |
|
|
summary: list[str] = [] |
|
|
for index in top_indices.tolist(): |
|
|
descriptor = _INTENT_VOCAB[index % len(_INTENT_VOCAB)] |
|
|
direction = "elevated" if vector[index] >= 0 else "attenuated" |
|
|
summary.append(f"{descriptor} ({direction}, |ψ|={magnitudes[index]:.2f})") |
|
|
return summary |
|
|
|
|
|
|
|
|
@dataclass(frozen=True) |
|
|
class _DecodingParams: |
|
|
beam_size: int = 6 |
|
|
temperature: float = 0.8 |
|
|
top_p: float = 0.9 |
|
|
max_new_tokens: int = 1024 |
|
|
min_new_tokens: int = 16 |
|
|
stop_quality: float = 0.6 |
|
|
|
|
|
@classmethod |
|
|
def from_payload(cls, payload: Mapping[str, Any]) -> "_DecodingParams": |
|
|
source: Mapping[str, Any] | None = None |
|
|
if "decoding" in payload and isinstance(payload["decoding"], Mapping): |
|
|
source = payload["decoding"] |
|
|
elif "parameters" in payload and isinstance(payload["parameters"], Mapping): |
|
|
candidate = payload["parameters"].get("decoding") |
|
|
if isinstance(candidate, Mapping): |
|
|
source = candidate |
|
|
|
|
|
if not source: |
|
|
return cls() |
|
|
|
|
|
kwargs: dict[str, Any] = {} |
|
|
for field in cls.__dataclass_fields__.keys(): |
|
|
if field in source: |
|
|
try: |
|
|
kwargs[field] = type(getattr(cls(), field))(source[field]) |
|
|
except (TypeError, ValueError): |
|
|
continue |
|
|
return cls(**kwargs) |
|
|
|
|
|
def to_dict(self) -> dict[str, Any]: |
|
|
return {field: getattr(self, field) for field in self.__dataclass_fields__.keys()} |
|
|
|
|
|
|
|
|
@dataclass(frozen=True) |
|
|
class _ModelIO: |
|
|
"""Snapshot of ONNX input and output metadata.""" |
|
|
|
|
|
inputs: tuple[Any, ...] |
|
|
outputs: tuple[Any, ...] |
|
|
|
|
|
|
|
|
class EndpointHandler: |
|
|
"""Callable endpoint used by Hugging Face to drive inference.""" |
|
|
|
|
|
def __init__(self, path: str | None = None) -> None: |
|
|
self.model_dir = Path(path or Path(__file__).parent) |
|
|
self.session = self._load_session() |
|
|
self.io = self._capture_io() |
|
|
|
|
|
self.primary_input = self.io.inputs[0].name |
|
|
self.slow_input = self._find_input("slow_state") |
|
|
self.tokens_input = self._find_input("tokens") |
|
|
self._primary_dim = self._infer_primary_dim() |
|
|
self._text_encoder = _TextEncoder(self._primary_dim) |
|
|
self._tokenizer = _SimpleTokenizer() |
|
|
self._defaults = {} |
|
|
skip_inputs = {self.primary_input} |
|
|
if self.slow_input is not None: |
|
|
skip_inputs.add(self.slow_input) |
|
|
if self.tokens_input is not None: |
|
|
skip_inputs.add(self.tokens_input) |
|
|
for node in self.io.inputs: |
|
|
if node.name in skip_inputs: |
|
|
continue |
|
|
self._defaults[node.name] = self._zeros_like(node) |
|
|
if self.slow_input is not None: |
|
|
self._slow_fallback = self._zeros_like(self._input_map[self.slow_input]) |
|
|
else: |
|
|
self._slow_fallback = None |
|
|
if self.tokens_input is not None: |
|
|
token_node = self._input_map[self.tokens_input] |
|
|
self._token_sequence_length = self._infer_sequence_length(token_node) |
|
|
self._token_dtype = self._dtype_for(token_node) |
|
|
else: |
|
|
self._token_sequence_length = 0 |
|
|
self._token_dtype = np.int64 |
|
|
|
|
|
def _load_session(self): |
|
|
"""Load the ONNX session, tolerating alternate filenames.""" |
|
|
|
|
|
ort = self._import_onnxruntime() |
|
|
preferred_names = ("model.onnx", "model_infer.onnx") |
|
|
for name in preferred_names: |
|
|
candidate = self.model_dir / name |
|
|
if candidate.exists(): |
|
|
return ort.InferenceSession(str(candidate), providers=["CPUExecutionProvider"]) |
|
|
|
|
|
available = sorted(str(p.name) for p in self.model_dir.glob("*.onnx")) |
|
|
if len(available) == 1: |
|
|
|
|
|
return ort.InferenceSession(str(self.model_dir / available[0]), providers=["CPUExecutionProvider"]) |
|
|
|
|
|
choices = ", ".join(available) or "<none>" |
|
|
raise FileNotFoundError( |
|
|
"Could not locate any of %s in %s (available: %s)" |
|
|
% (", ".join(preferred_names), self.model_dir, choices) |
|
|
) |
|
|
|
|
|
@staticmethod |
|
|
def _import_onnxruntime(): |
|
|
"""Import :mod:`onnxruntime`, providing a helpful error if unavailable.""" |
|
|
|
|
|
spec = importlib.util.find_spec("onnxruntime") |
|
|
if spec is None: |
|
|
raise ModuleNotFoundError( |
|
|
"onnxruntime is required to load Noesis decoder ONNX graphs. " |
|
|
"Install it with 'pip install onnxruntime'." |
|
|
) |
|
|
return importlib.import_module("onnxruntime") |
|
|
|
|
|
@property |
|
|
def _input_map(self) -> Mapping[str, Any]: |
|
|
return {node.name: node for node in self.io.inputs} |
|
|
|
|
|
def _capture_io(self) -> _ModelIO: |
|
|
return _ModelIO(inputs=tuple(self.session.get_inputs()), outputs=tuple(self.session.get_outputs())) |
|
|
|
|
|
def _find_input(self, target: str) -> Optional[str]: |
|
|
target = target.lower() |
|
|
for node in self.io.inputs: |
|
|
if node.name.lower() == target: |
|
|
return node.name |
|
|
return None |
|
|
|
|
|
def _infer_primary_dim(self) -> int: |
|
|
node = self._input_map[self.primary_input] |
|
|
for dim in reversed(node.shape): |
|
|
if isinstance(dim, int) and dim > 0: |
|
|
return dim |
|
|
|
|
|
return 256 |
|
|
|
|
|
def _infer_sequence_length(self, node: Any) -> int: |
|
|
for dim in reversed(getattr(node, "shape", [])): |
|
|
if isinstance(dim, int) and dim > 0: |
|
|
return dim |
|
|
return 1 |
|
|
|
|
|
@staticmethod |
|
|
def _onnx_type_to_numpy(type_str: str | None) -> np.dtype: |
|
|
mapping = { |
|
|
"tensor(float)": np.float32, |
|
|
"tensor(float16)": np.float16, |
|
|
"tensor(double)": np.float64, |
|
|
"tensor(int64)": np.int64, |
|
|
"tensor(int32)": np.int32, |
|
|
"tensor(int16)": np.int16, |
|
|
"tensor(int8)": np.int8, |
|
|
"tensor(uint8)": np.uint8, |
|
|
"tensor(bool)": np.bool_, |
|
|
} |
|
|
return mapping.get(type_str, np.float32) |
|
|
|
|
|
def _dtype_for(self, node: Any) -> np.dtype: |
|
|
return self._onnx_type_to_numpy(getattr(node, "type", None)) |
|
|
|
|
|
def _zeros_like(self, node: Any) -> np.ndarray: |
|
|
shape: list[int] = [] |
|
|
for dim in node.shape: |
|
|
if isinstance(dim, int) and dim > 0: |
|
|
shape.append(dim) |
|
|
else: |
|
|
shape.append(1) |
|
|
dtype = self._dtype_for(node) |
|
|
return np.zeros(shape, dtype=dtype) |
|
|
|
|
|
def _coerce_array(self, value: Any, *, node: Any, allow_empty: bool = False) -> np.ndarray: |
|
|
dtype = self._dtype_for(node) |
|
|
array = np.asarray(value, dtype=dtype) |
|
|
if array.size == 0 and not allow_empty: |
|
|
raise ValueError("Received an empty array; provide at least one value.") |
|
|
if array.ndim == 1: |
|
|
array = np.expand_dims(array, axis=0) |
|
|
elif array.ndim > 2: |
|
|
raise ValueError("Expected a 1D or batched 2D array; received shape %s" % (array.shape,)) |
|
|
if array.dtype != dtype: |
|
|
array = array.astype(dtype, copy=False) |
|
|
return array |
|
|
|
|
|
def _prepare_inputs(self, payload: Mapping[str, Any]) -> MutableMapping[str, np.ndarray]: |
|
|
psi = payload.get("psi") |
|
|
if psi is None: |
|
|
psi = ( |
|
|
payload.get("vector") |
|
|
or payload.get("psi_s") |
|
|
or payload.get("inputs") |
|
|
or payload.get("prompt") |
|
|
or payload.get("text") |
|
|
) |
|
|
if psi is None: |
|
|
raise KeyError("Payload must include a 'psi' field containing the symbolic vector.") |
|
|
|
|
|
primary_node = self._input_map[self.primary_input] |
|
|
inputs: MutableMapping[str, np.ndarray] = { |
|
|
self.primary_input: self._vector_from_payload(psi, node=primary_node) |
|
|
} |
|
|
|
|
|
if self.slow_input is not None: |
|
|
slow_value = payload.get("slow_state") or payload.get("slow") or payload.get("state") |
|
|
if slow_value is None: |
|
|
inputs[self.slow_input] = self._slow_fallback.copy() |
|
|
else: |
|
|
inputs[self.slow_input] = self._coerce_array( |
|
|
slow_value, |
|
|
node=self._input_map[self.slow_input], |
|
|
allow_empty=True, |
|
|
) |
|
|
|
|
|
for name, default in self._defaults.items(): |
|
|
inputs[name] = default.copy() |
|
|
|
|
|
return inputs |
|
|
|
|
|
def _vector_from_payload(self, value: Any, *, node: Any) -> np.ndarray: |
|
|
if isinstance(value, str): |
|
|
encoded = self._text_encoder.encode(value) |
|
|
return self._coerce_array(encoded, node=node) |
|
|
|
|
|
if isinstance(value, (list, tuple)) and value and all(isinstance(v, str) for v in value): |
|
|
encoded = self._text_encoder.encode(" ".join(value)) |
|
|
return self._coerce_array(encoded, node=node) |
|
|
|
|
|
return self._coerce_array(value, node=node) |
|
|
|
|
|
@staticmethod |
|
|
def _candidate_seed(psi: np.ndarray) -> int: |
|
|
digest = hashlib.sha1(psi.tobytes()).digest() |
|
|
return int.from_bytes(digest[:4], "little", signed=False) |
|
|
|
|
|
def _token_array_from_ids(self, token_ids: Sequence[int]) -> np.ndarray: |
|
|
ids = list(token_ids) |
|
|
if self._token_sequence_length <= 0: |
|
|
return np.asarray([ids], dtype=self._token_dtype) |
|
|
|
|
|
padded = np.full( |
|
|
(1, self._token_sequence_length), |
|
|
fill_value=self._tokenizer.pad_token_id, |
|
|
dtype=self._token_dtype, |
|
|
) |
|
|
length = min(len(ids), self._token_sequence_length) |
|
|
if length > 0: |
|
|
padded[0, :length] = np.asarray(ids[:length], dtype=self._token_dtype) |
|
|
return padded |
|
|
|
|
|
def _run_candidate(self, base_feed: Mapping[str, np.ndarray], tokens: Sequence[int]) -> list[tuple[Any, np.ndarray]]: |
|
|
feed = { |
|
|
name: (value.copy() if isinstance(value, np.ndarray) else value) |
|
|
for name, value in base_feed.items() |
|
|
} |
|
|
if self.tokens_input is not None: |
|
|
feed[self.tokens_input] = self._token_array_from_ids(tokens) |
|
|
outputs = self.session.run(None, feed) |
|
|
return list(zip(self.io.outputs, outputs)) |
|
|
|
|
|
@staticmethod |
|
|
def _extract_logits(outputs: Sequence[tuple[Any, np.ndarray]]) -> Optional[np.ndarray]: |
|
|
for node, value in outputs: |
|
|
if getattr(node, "name", "").lower() == "logits": |
|
|
return np.asarray(value, dtype=np.float32) |
|
|
if outputs: |
|
|
return np.asarray(outputs[0][1], dtype=np.float32) |
|
|
return None |
|
|
|
|
|
@staticmethod |
|
|
def _sample_next_token( |
|
|
logits: np.ndarray, |
|
|
decoding: _DecodingParams, |
|
|
rng: np.random.Generator, |
|
|
) -> int: |
|
|
vector = np.asarray(logits, dtype=np.float64).reshape(-1) |
|
|
temperature = max(float(decoding.temperature), 1e-5) |
|
|
top_p = float(decoding.top_p) |
|
|
|
|
|
if temperature <= 1e-5 or not np.isfinite(vector).any(): |
|
|
return int(int(np.argmax(vector))) |
|
|
|
|
|
stabilized = vector / temperature |
|
|
stabilized -= np.max(stabilized) |
|
|
probs = np.exp(stabilized) |
|
|
probs = np.nan_to_num(probs, nan=0.0, posinf=0.0, neginf=0.0) |
|
|
total = probs.sum() |
|
|
if total <= 0.0: |
|
|
return int(np.argmax(vector)) |
|
|
probs /= total |
|
|
|
|
|
if top_p <= 0.0: |
|
|
return int(np.argmax(probs)) |
|
|
|
|
|
if 0.0 < top_p < 1.0: |
|
|
sorted_indices = np.argsort(-probs) |
|
|
sorted_probs = probs[sorted_indices] |
|
|
cumulative = np.cumsum(sorted_probs) |
|
|
mask = cumulative <= top_p |
|
|
if mask.size > 0: |
|
|
mask[0] = True |
|
|
filtered_indices = sorted_indices[mask] |
|
|
filtered_probs = sorted_probs[mask] |
|
|
filtered_total = filtered_probs.sum() |
|
|
if filtered_total <= 0.0: |
|
|
filtered_indices = sorted_indices |
|
|
filtered_probs = sorted_probs |
|
|
filtered_total = filtered_probs.sum() |
|
|
filtered_probs = filtered_probs / filtered_total |
|
|
choice = rng.choice(len(filtered_indices), p=filtered_probs) |
|
|
return int(filtered_indices[int(choice)]) |
|
|
|
|
|
choice = rng.choice(len(probs), p=probs) |
|
|
return int(choice) |
|
|
|
|
|
def _generate_sequence( |
|
|
self, |
|
|
base_feed: Mapping[str, np.ndarray], |
|
|
*, |
|
|
decoding: _DecodingParams, |
|
|
seed: int, |
|
|
) -> Optional[Tuple[str, list[int], float, list[tuple[Any, np.ndarray]], int]]: |
|
|
if self.tokens_input is None: |
|
|
return None |
|
|
|
|
|
rng = np.random.default_rng(seed) |
|
|
token_ids: list[int] = [self._tokenizer.bos_token_id] |
|
|
quality = float("-inf") |
|
|
formatted_outputs: list[tuple[Any, np.ndarray]] | None = None |
|
|
steps = 0 |
|
|
|
|
|
max_steps = max(decoding.max_new_tokens, 1) |
|
|
for _ in range(max_steps): |
|
|
outputs = self._run_candidate(base_feed, token_ids) |
|
|
logits = self._extract_logits(outputs) |
|
|
if logits is None: |
|
|
break |
|
|
last_index = min(len(token_ids) - 1, logits.shape[1] - 1) |
|
|
next_logits = logits[0, last_index].copy() |
|
|
|
|
|
|
|
|
|
|
|
if steps < decoding.min_new_tokens: |
|
|
next_logits[self._tokenizer.eos_token_id] -= 10.0 |
|
|
|
|
|
next_token = self._sample_next_token(next_logits, decoding, rng) |
|
|
token_ids.append(int(next_token)) |
|
|
steps += 1 |
|
|
|
|
|
|
|
|
if token_ids[-1] == self._tokenizer.eos_token_id and steps < decoding.min_new_tokens: |
|
|
|
|
|
space_token_id = self._tokenizer._token_to_id.get(" ", self._tokenizer._token_to_id.get("a", self._tokenizer.unk_token_id)) |
|
|
token_ids[-1] = space_token_id |
|
|
|
|
|
|
|
|
outputs = self._run_candidate(base_feed, token_ids) |
|
|
formatted_outputs = outputs |
|
|
quality = self._extract_q_hat(outputs) |
|
|
|
|
|
|
|
|
if token_ids[-1] == self._tokenizer.eos_token_id and steps >= decoding.min_new_tokens: |
|
|
break |
|
|
if self._token_sequence_length > 0 and len(token_ids) >= self._token_sequence_length: |
|
|
break |
|
|
|
|
|
if formatted_outputs is None: |
|
|
return None |
|
|
|
|
|
text = self._tokenizer.decode(token_ids) |
|
|
return text, token_ids, float(quality), formatted_outputs, steps |
|
|
|
|
|
@staticmethod |
|
|
def _extract_q_hat(outputs: Sequence[tuple[Any, np.ndarray]]) -> float: |
|
|
for node, value in outputs: |
|
|
if getattr(node, "name", "").lower() == "q_hat": |
|
|
return float(np.squeeze(np.asarray(value, dtype=np.float32))) |
|
|
|
|
|
for node, value in outputs: |
|
|
if "q" in getattr(node, "name", "").lower(): |
|
|
return float(np.squeeze(np.asarray(value, dtype=np.float32))) |
|
|
return float("-inf") |
|
|
|
|
|
@staticmethod |
|
|
def _format_output(name: str, value: np.ndarray) -> Any: |
|
|
value = np.asarray(value, dtype=np.float32) |
|
|
value = np.nan_to_num(value, nan=0.0, posinf=0.0, neginf=0.0) |
|
|
squeezed = np.squeeze(value) |
|
|
if squeezed.ndim == 0: |
|
|
return float(squeezed) |
|
|
return squeezed.tolist() |
|
|
|
|
|
def __call__(self, data: Mapping[str, Any]) -> Mapping[str, Any]: |
|
|
payload = data.get("inputs", data) |
|
|
if not isinstance(payload, Mapping): |
|
|
payload = {"psi": payload} |
|
|
|
|
|
feed = self._prepare_inputs(payload) |
|
|
psi_vector = np.asarray(feed[self.primary_input], dtype=np.float32).reshape(-1) |
|
|
state_constraints = payload.get("constraints") |
|
|
if not isinstance(state_constraints, Mapping): |
|
|
state_constraints = None |
|
|
decoding = _DecodingParams.from_payload(payload) |
|
|
system_prompt = payload.get("system_prompt") |
|
|
user_prompt = payload.get("user_prompt") |
|
|
|
|
|
descriptors = _summarise_intent(psi_vector) |
|
|
summary = ", ".join(descriptors) if descriptors else "balanced intent" |
|
|
|
|
|
best_candidate: Optional[Tuple[str, list[int], float, list[tuple[Any, np.ndarray]], int]] = None |
|
|
seeds: list[int] = [] |
|
|
|
|
|
if self.tokens_input is not None: |
|
|
beams = max(decoding.beam_size, 1) |
|
|
base_seed = self._candidate_seed(psi_vector) |
|
|
for beam_idx in range(beams): |
|
|
seed = base_seed + beam_idx |
|
|
seeds.append(seed) |
|
|
candidate = self._generate_sequence( |
|
|
feed, |
|
|
decoding=decoding, |
|
|
seed=seed, |
|
|
) |
|
|
if candidate is None: |
|
|
continue |
|
|
text, token_ids, quality, outputs, steps = candidate |
|
|
if ( |
|
|
best_candidate is None |
|
|
or quality > best_candidate[2] |
|
|
): |
|
|
best_candidate = candidate |
|
|
if quality >= decoding.stop_quality: |
|
|
break |
|
|
|
|
|
if best_candidate is None: |
|
|
outputs = self.session.run(None, feed) |
|
|
formatted_outputs = list(zip(self.io.outputs, outputs)) |
|
|
quality = self._extract_q_hat(formatted_outputs) |
|
|
text = f"Symbolic synopsis → {summary}." |
|
|
token_ids: list[int] = [] |
|
|
steps = 0 |
|
|
else: |
|
|
text, token_ids, quality, formatted_outputs, steps = best_candidate |
|
|
|
|
|
formatted = { |
|
|
node.name: self._format_output(node.name, value) |
|
|
for node, value in formatted_outputs |
|
|
} |
|
|
|
|
|
if not np.isfinite(quality): |
|
|
quality = 0.0 |
|
|
quality = float(quality) |
|
|
|
|
|
metadata = { |
|
|
"summary": summary, |
|
|
"descriptors": descriptors, |
|
|
"constraints": state_constraints or {}, |
|
|
"decoding": decoding.to_dict(), |
|
|
"seeds": seeds, |
|
|
"steps": steps, |
|
|
"system_prompt": system_prompt if isinstance(system_prompt, str) else None, |
|
|
"user_prompt": user_prompt if isinstance(user_prompt, str) else None, |
|
|
} |
|
|
|
|
|
response = { |
|
|
"text": text, |
|
|
"tokens": token_ids, |
|
|
"quality": quality, |
|
|
"q_hat": quality, |
|
|
"provider": _DEFAULT_PROVIDER, |
|
|
"model": _DEFAULT_MODEL, |
|
|
"metadata": metadata, |
|
|
} |
|
|
response.update(formatted) |
|
|
return response |
|
|
|
|
|
|
|
|
__all__ = ["EndpointHandler"] |
|
|
|