MLE-Morpho-Logic-Engine / mle /inference /reasoning_engine.py
Harry00's picture
feat: complete MLE engine implementation
ebaf2ce verified
"""
MLE Inference Module: Reasoning Engine
========================================
The complete reasoning pipeline that integrates all modules:
Query β†’ Encoding β†’ Routing β†’ Binding β†’ Energy Minimization β†’ Response
The "chain of thought" is an internal trajectory through state space:
1. ENCODE: Transform input query into a binary vector
2. ROUTE: Activate relevant memory regions (top-500 beam)
3. BIND: Construct composite representations from activated memories
4. RELAX: Minimize energy to find coherent state
5. DECODE: Extract response from final state
Multiple reasoning iterations are possible (iterative deepening):
each iteration uses the previous result as context for the next.
"""
import numpy as np
from typing import Optional, List, Dict, Any, Tuple
import logging
import time
import hashlib
from ..memory.sparse_address_table import SparseAddressTable
from ..routing.recursive_jit_router import RecursiveJITRouter, RoutingResult
from ..binding.semantic_binding import BindingEngine, BinaryBinding
from ..energy.energy_model import EnergyModel, EnergyFunction, RelaxationDynamics, HopfieldDynamics
from ..utils.simd_ops import (
N_BITS, N_WORDS,
random_binary_vector, random_binary_vectors,
hamming_distance, hamming_similarity,
xor_vectors, majority_vote
)
logger = logging.getLogger(__name__)
class QueryEncoder:
"""
Encode text/symbolic queries into binary vectors.
Encoding strategies:
1. Named concept lookup (exact match in codebook)
2. Hash-based encoding (deterministic binary vector from string)
3. Composite encoding (bind multiple concepts together)
"""
def __init__(self, binding_engine: BindingEngine):
self.binding = binding_engine
def encode(self, query: Any) -> np.ndarray:
"""Encode a query into a binary vector.
Supports:
- str: hash-based or codebook lookup
- np.ndarray: pass-through (already a vector)
- list of str: composite binding of concepts
- dict: role-filler structure encoding
"""
if isinstance(query, np.ndarray):
return query.copy()
elif isinstance(query, str):
return self._encode_string(query)
elif isinstance(query, (list, tuple)):
return self._encode_composite(query)
elif isinstance(query, dict):
return self._encode_structure(query)
else:
raise TypeError(f"Cannot encode query of type {type(query)}")
def _encode_string(self, text: str) -> np.ndarray:
"""Encode a string to binary vector.
First tries codebook lookup, then falls back to hash.
"""
# Try codebook lookup
vec = self.binding.get_concept(text)
if vec is not None:
return vec.copy()
# Hash-based encoding: deterministic binary vector from string
return self._hash_to_vector(text)
def _hash_to_vector(self, text: str) -> np.ndarray:
"""Deterministic hash-based encoding.
Uses SHA-512 repeatedly to fill 4096 bits.
"""
n_hashes = N_BITS // 512 + 1 # SHA-512 = 512 bits
bits = []
for i in range(n_hashes):
h = hashlib.sha512(f"{text}:{i}".encode()).digest()
bits.append(np.frombuffer(h, dtype=np.uint8))
all_bytes = np.concatenate(bits)[:N_BITS // 8]
return all_bytes.view(np.uint64).copy()
def _encode_composite(self, concepts: List[str]) -> np.ndarray:
"""Encode a composite query by bundling encoded concepts."""
vectors = [self._encode_string(c) for c in concepts]
if len(vectors) == 1:
return vectors[0]
return BinaryBinding.bundle(*vectors)
def _encode_structure(self, structure: Dict[str, str]) -> np.ndarray:
"""Encode a structured query (role:filler pairs).
Example: {"subject": "cat", "relation": "is_a"} β†’
bind(encode("subject"), encode("cat")) βŠ• bind(encode("relation"), encode("is_a"))
"""
pairs = []
for role, filler in structure.items():
role_vec = self._encode_string(role)
filler_vec = self._encode_string(filler)
pairs.append(BinaryBinding.bind(role_vec, filler_vec))
return BinaryBinding.bundle(*pairs) if len(pairs) > 1 else pairs[0]
class ResponseDecoder:
"""
Decode final state vector into interpretable response.
Strategies:
1. Nearest neighbor: find closest named concept(s) in codebook
2. Unbinding: recover specific fillers for given roles
3. Activation readout: return most-activated memory entries
"""
def __init__(self, memory: SparseAddressTable, binding_engine: BindingEngine):
self.memory = memory
self.binding = binding_engine
def decode(self, state: np.ndarray, top_k: int = 5,
roles: Optional[List[str]] = None) -> Dict[str, Any]:
"""Decode state vector into response.
Returns dict with:
- nearest_concepts: closest named concepts with similarities
- nearest_memories: closest memory entries
- role_fillers: decoded role-filler pairs (if roles provided)
- confidence: overall decoding confidence
"""
result = {}
# 1. Nearest concepts in codebook
concept_scores = []
for name, vec in self.binding._codebook.items():
sim = hamming_similarity(state, vec)
concept_scores.append((name, float(sim)))
concept_scores.sort(key=lambda x: x[1], reverse=True)
result['nearest_concepts'] = concept_scores[:top_k]
# 2. Nearest memory entries
memory_results = self.memory.query_nearest(state, k=top_k, use_lsh=True)
nearest_memories = []
for idx, dist in memory_results:
meta = self.memory._metadata[idx]
name = meta.get('name', f'entry_{idx}') if meta else f'entry_{idx}'
sim = 1.0 - dist / N_BITS
nearest_memories.append({
'index': idx,
'name': name,
'distance': dist,
'similarity': sim,
'metadata': meta,
})
result['nearest_memories'] = nearest_memories
# 3. Role-filler decoding
if roles:
role_fillers = {}
for role_name in roles:
role_vec = self.binding.get_concept(role_name)
if role_vec is None:
role_vec = QueryEncoder(self.binding)._hash_to_vector(role_name)
filler = BinaryBinding.unbind(state, role_vec)
# Find nearest concept to recovered filler
filler_scores = []
for name, vec in self.binding._codebook.items():
sim = hamming_similarity(filler, vec)
filler_scores.append((name, float(sim)))
filler_scores.sort(key=lambda x: x[1], reverse=True)
role_fillers[role_name] = filler_scores[:3]
result['role_fillers'] = role_fillers
# 4. Confidence (based on how close the best match is)
if concept_scores:
result['confidence'] = concept_scores[0][1]
else:
result['confidence'] = 0.0
return result
class ReasoningStep:
"""Record of a single reasoning step in the inference chain."""
__slots__ = ['step_num', 'state', 'energy', 'energy_components',
'routing_result', 'decoded', 'timestamp']
def __init__(self, step_num: int):
self.step_num = step_num
self.state: Optional[np.ndarray] = None
self.energy: float = float('inf')
self.energy_components: Dict[str, float] = {}
self.routing_result: Optional[RoutingResult] = None
self.decoded: Optional[Dict] = None
self.timestamp: float = 0.0
class ReasoningEngine:
"""
The MLE Reasoning Engine β€” core inference loop.
Architecture:
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Query │────▢│ Router │────▢│ Binder │────▢│ Energy β”‚
β”‚ Encoder β”‚ β”‚ (Beam) β”‚ β”‚ (Compose)β”‚ β”‚ (Relax) β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚ β”‚
β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
└───────────│ Response │◀────│ Decode β”‚β—€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚ β”‚ β”‚ β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Reasoning loop:
1. Encode query β†’ binary vector
2. Route β†’ find relevant memories (beam search)
3. Activate memories, construct bindings
4. Minimize energy (Hopfield + binary relaxation)
5. Decode final state β†’ response
6. [Optional] Use result as new query β†’ iterate
"""
def __init__(
self,
memory: Optional[SparseAddressTable] = None,
beam_width: int = 500,
max_routing_depth: int = 3,
max_reasoning_steps: int = 5,
energy_mode: str = 'hybrid',
hopfield_beta: float = 8.0,
relaxation_iterations: int = 50,
):
# Memory
self.memory = memory or SparseAddressTable()
# Binding engine
self.binding = BindingEngine(use_binary=True)
# Router
self.router = RecursiveJITRouter(
memory=self.memory,
beam_width=beam_width,
max_depth=max_routing_depth,
)
# Energy model
energy_fn = EnergyFunction()
self.energy_model = EnergyModel(
mode=energy_mode,
energy_fn=energy_fn,
binary_dynamics=RelaxationDynamics(
energy_fn,
max_iterations=relaxation_iterations,
n_candidates=32,
),
hopfield_dynamics=HopfieldDynamics(
beta=hopfield_beta,
max_iterations=20,
),
)
# Encoder/Decoder
self.encoder = QueryEncoder(self.binding)
self.decoder = ResponseDecoder(self.memory, self.binding)
# Config
self.max_reasoning_steps = max_reasoning_steps
# ── Knowledge Management ──────────────────────────────────────────────
def add_concept(self, name: str, metadata: Optional[Dict] = None) -> np.ndarray:
"""Add a named concept to both memory and binding codebook."""
vec = self.binding.register_concept(name)
self.memory.store_concept(name, content=vec, metadata=metadata)
return vec
def add_relation(self, subject: str, relation: str, obj: str,
metadata: Optional[Dict] = None) -> int:
"""Store a semantic relation as a bound vector in memory."""
# Ensure all concepts exist
for name in [subject, relation, obj]:
if self.binding.get_concept(name) is None:
self.add_concept(name)
# Create bound representation
s = self.binding.get_concept(subject)
r = self.binding.get_concept(relation)
o = self.binding.get_concept(obj)
bound = BinaryBinding.encode_triple(s, r, o)
# Store in memory
meta = metadata or {}
meta.update({
'type': 'relation',
'subject': subject,
'relation': relation,
'object': obj,
'name': f"{subject}_{relation}_{obj}",
})
# Address = blend of subject and relation (for routing)
address = BinaryBinding.bundle(s, r, o)
return self.memory.store(address, bound, metadata=meta,
name=f"{subject}_{relation}_{obj}")
# ── Core Reasoning ────────────────────────────────────────────────────
def reason(self, query: Any, max_steps: Optional[int] = None,
roles: Optional[List[str]] = None,
verbose: bool = False) -> Dict[str, Any]:
"""
Execute full reasoning pipeline.
Args:
query: input query (string, vector, list of strings, or dict)
max_steps: override max reasoning iterations
roles: roles to decode in response
verbose: print step-by-step info
Returns:
Dict with:
- response: decoded response dict
- reasoning_chain: list of ReasoningStep objects
- total_energy_reduction: cumulative energy decrease
- latency_ms: total time
"""
t0 = time.perf_counter()
steps = max_steps or self.max_reasoning_steps
chain: List[ReasoningStep] = []
# ── Step 0: Encode query ──────────────────────────────────────
query_vec = self.encoder.encode(query)
current_state = query_vec.copy()
if verbose:
logger.info(f"Query encoded. Starting reasoning with max {steps} steps.")
for step_num in range(steps):
step = ReasoningStep(step_num)
step.timestamp = time.perf_counter() - t0
# ── Step 1: Route β€” find relevant memories ────────────────
routing = self.router.route_and_activate(current_state)
step.routing_result = routing
if verbose:
logger.info(
f"Step {step_num}: Routed to {len(routing.indices)} candidates "
f"(depth={routing.depth}, latency={routing.latency_ms:.1f}ms)"
)
if len(routing.indices) == 0:
step.state = current_state
chain.append(step)
break
# ── Step 2: Gather context from activated memories ────────
active_indices = routing.indices[:min(100, len(routing.indices))]
context_vectors = np.ascontiguousarray(
self.memory._contents[active_indices]
)
act_strengths = routing.activations[:len(active_indices)]
# ── Step 3: Construct bindings for coherence checking ─────
bindings = self._extract_bindings(active_indices)
# ── Step 4: Energy minimization ───────────────────────────
energy_result = self.energy_model.minimize(
current_state,
context_vectors,
act_strengths,
bindings=bindings,
)
new_state = energy_result['final_state']
step.state = new_state
step.energy = energy_result['final_energy']
if verbose:
logger.info(
f"Step {step_num}: Energy {step.energy:.4f} "
f"(iterations={energy_result.get('total_iterations', '?')})"
)
chain.append(step)
# ── Check convergence ─────────────────────────────────────
if step_num > 0:
state_change = hamming_distance(current_state, new_state) / N_BITS
energy_change = abs(chain[-2].energy - step.energy)
if state_change < 0.01 and energy_change < 1e-4:
if verbose:
logger.info(f"Reasoning converged at step {step_num}")
break
current_state = new_state
# ── Decode final state ────────────────────────────────────────
final_state = chain[-1].state if chain else query_vec
response = self.decoder.decode(final_state, top_k=10, roles=roles)
# Add decoding to last step
if chain:
chain[-1].decoded = response
total_time = (time.perf_counter() - t0) * 1000
# Energy trajectory
energies = [s.energy for s in chain if s.energy != float('inf')]
energy_reduction = (energies[0] - energies[-1]) if len(energies) >= 2 else 0.0
return {
'response': response,
'reasoning_chain': chain,
'total_energy_reduction': energy_reduction,
'latency_ms': total_time,
'num_steps': len(chain),
'final_state': final_state,
}
def _extract_bindings(self, indices: np.ndarray) -> List[Tuple]:
"""Extract binding triples from activated memory entries for coherence checking."""
bindings = []
for idx in indices:
meta = self.memory._metadata[idx]
if meta and meta.get('type') == 'relation':
subj = meta.get('subject')
rel = meta.get('relation')
obj = meta.get('object')
if subj and rel and obj:
s_vec = self.binding.get_concept(subj)
r_vec = self.binding.get_concept(rel)
o_vec = self.binding.get_concept(obj)
if s_vec is not None and r_vec is not None and o_vec is not None:
bound = self.memory._contents[idx]
# Check: unbind(bound, bind(s,r)) should β‰ˆ o
role_composed = BinaryBinding.bind(s_vec, r_vec)
bindings.append((bound, role_composed, o_vec))
return bindings[:20] # Limit for performance
# ── Analogy Solving ───────────────────────────────────────────────
def solve_analogy(self, a: str, b: str, c: str,
candidates: Optional[List[str]] = None) -> Dict[str, Any]:
"""
Solve analogy: a is to b as c is to ?
Uses binding-based approach:
1. Compute relation R = bind(a, b) [XOR extracts difference]
2. Apply: query = bind(c, R)
3. Route query through memory
4. Decode nearest match
"""
t0 = time.perf_counter()
# Get or create concept vectors
for name in [a, b, c]:
if self.binding.get_concept(name) is None:
self.add_concept(name)
va = self.binding.get_concept(a)
vb = self.binding.get_concept(b)
vc = self.binding.get_concept(c)
# Compute analogy query
relation = BinaryBinding.bind(va, vb)
query = BinaryBinding.bind(vc, relation)
# Route and find matches
result = self.reason(query, max_steps=2)
# Also do direct codebook comparison
codebook_results = self.binding.solve_analogy(a, b, c, candidates)
return {
'query': f"{a} : {b} :: {c} : ?",
'codebook_ranking': codebook_results[:10],
'memory_response': result['response'],
'latency_ms': (time.perf_counter() - t0) * 1000,
}
# ── Association ───────────────────────────────────────────────────
def associate(self, concept: str, top_k: int = 5) -> List[Tuple[str, float]]:
"""Find concepts most associated with the given concept."""
vec = self.binding.get_concept(concept)
if vec is None:
vec = self.encoder.encode(concept)
# Route through memory
routing = self.router.route(vec, beam_width=100)
if len(routing.indices) == 0:
return []
# Collect associated concepts
associations = []
seen = set()
for idx in routing.indices:
meta = self.memory._metadata[idx]
if meta:
name = meta.get('name', '')
if name and name != concept and name not in seen:
sim = 1.0 - routing.distances[len(associations)] / N_BITS \
if len(associations) < len(routing.distances) else 0.0
associations.append((name, float(sim)))
seen.add(name)
if len(associations) >= top_k:
break
return associations
# ── Composition ───────────────────────────────────────────────────
def compose(self, *concepts: str) -> Dict[str, Any]:
"""Compose multiple concepts and find what the composition represents."""
vectors = []
for c in concepts:
vec = self.binding.get_concept(c)
if vec is None:
self.add_concept(c)
vec = self.binding.get_concept(c)
vectors.append(vec)
# Bundle all concepts
if len(vectors) == 1:
composed = vectors[0]
else:
composed = BinaryBinding.bundle(*vectors)
# Decode what this composition means
result = self.reason(composed, max_steps=2)
return {
'input_concepts': concepts,
'response': result['response'],
'latency_ms': result['latency_ms'],
}
# ── Stats ─────────────────────────────────────────────────────────
def stats(self) -> Dict[str, Any]:
"""Return engine statistics."""
return {
'memory': self.memory.stats(),
'codebook_size': len(self.binding._codebook),
'beam_width': self.router.beam_width,
'energy_mode': self.energy_model.mode,
}