| """ |
| MLE Inference Module: Reasoning Engine |
| ======================================== |
| The complete reasoning pipeline that integrates all modules: |
| |
| Query β Encoding β Routing β Binding β Energy Minimization β Response |
| |
| The "chain of thought" is an internal trajectory through state space: |
| 1. ENCODE: Transform input query into a binary vector |
| 2. ROUTE: Activate relevant memory regions (top-500 beam) |
| 3. BIND: Construct composite representations from activated memories |
| 4. RELAX: Minimize energy to find coherent state |
| 5. DECODE: Extract response from final state |
| |
| Multiple reasoning iterations are possible (iterative deepening): |
| each iteration uses the previous result as context for the next. |
| """ |
|
|
| import numpy as np |
| from typing import Optional, List, Dict, Any, Tuple |
| import logging |
| import time |
| import hashlib |
|
|
| from ..memory.sparse_address_table import SparseAddressTable |
| from ..routing.recursive_jit_router import RecursiveJITRouter, RoutingResult |
| from ..binding.semantic_binding import BindingEngine, BinaryBinding |
| from ..energy.energy_model import EnergyModel, EnergyFunction, RelaxationDynamics, HopfieldDynamics |
| from ..utils.simd_ops import ( |
| N_BITS, N_WORDS, |
| random_binary_vector, random_binary_vectors, |
| hamming_distance, hamming_similarity, |
| xor_vectors, majority_vote |
| ) |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class QueryEncoder: |
| """ |
| Encode text/symbolic queries into binary vectors. |
| |
| Encoding strategies: |
| 1. Named concept lookup (exact match in codebook) |
| 2. Hash-based encoding (deterministic binary vector from string) |
| 3. Composite encoding (bind multiple concepts together) |
| """ |
|
|
| def __init__(self, binding_engine: BindingEngine): |
| self.binding = binding_engine |
|
|
| def encode(self, query: Any) -> np.ndarray: |
| """Encode a query into a binary vector. |
| |
| Supports: |
| - str: hash-based or codebook lookup |
| - np.ndarray: pass-through (already a vector) |
| - list of str: composite binding of concepts |
| - dict: role-filler structure encoding |
| """ |
| if isinstance(query, np.ndarray): |
| return query.copy() |
| elif isinstance(query, str): |
| return self._encode_string(query) |
| elif isinstance(query, (list, tuple)): |
| return self._encode_composite(query) |
| elif isinstance(query, dict): |
| return self._encode_structure(query) |
| else: |
| raise TypeError(f"Cannot encode query of type {type(query)}") |
|
|
| def _encode_string(self, text: str) -> np.ndarray: |
| """Encode a string to binary vector. |
| First tries codebook lookup, then falls back to hash. |
| """ |
| |
| vec = self.binding.get_concept(text) |
| if vec is not None: |
| return vec.copy() |
|
|
| |
| return self._hash_to_vector(text) |
|
|
| def _hash_to_vector(self, text: str) -> np.ndarray: |
| """Deterministic hash-based encoding. |
| Uses SHA-512 repeatedly to fill 4096 bits. |
| """ |
| n_hashes = N_BITS // 512 + 1 |
| bits = [] |
| for i in range(n_hashes): |
| h = hashlib.sha512(f"{text}:{i}".encode()).digest() |
| bits.append(np.frombuffer(h, dtype=np.uint8)) |
| all_bytes = np.concatenate(bits)[:N_BITS // 8] |
| return all_bytes.view(np.uint64).copy() |
|
|
| def _encode_composite(self, concepts: List[str]) -> np.ndarray: |
| """Encode a composite query by bundling encoded concepts.""" |
| vectors = [self._encode_string(c) for c in concepts] |
| if len(vectors) == 1: |
| return vectors[0] |
| return BinaryBinding.bundle(*vectors) |
|
|
| def _encode_structure(self, structure: Dict[str, str]) -> np.ndarray: |
| """Encode a structured query (role:filler pairs). |
| Example: {"subject": "cat", "relation": "is_a"} β |
| bind(encode("subject"), encode("cat")) β bind(encode("relation"), encode("is_a")) |
| """ |
| pairs = [] |
| for role, filler in structure.items(): |
| role_vec = self._encode_string(role) |
| filler_vec = self._encode_string(filler) |
| pairs.append(BinaryBinding.bind(role_vec, filler_vec)) |
| return BinaryBinding.bundle(*pairs) if len(pairs) > 1 else pairs[0] |
|
|
|
|
| class ResponseDecoder: |
| """ |
| Decode final state vector into interpretable response. |
| |
| Strategies: |
| 1. Nearest neighbor: find closest named concept(s) in codebook |
| 2. Unbinding: recover specific fillers for given roles |
| 3. Activation readout: return most-activated memory entries |
| """ |
|
|
| def __init__(self, memory: SparseAddressTable, binding_engine: BindingEngine): |
| self.memory = memory |
| self.binding = binding_engine |
|
|
| def decode(self, state: np.ndarray, top_k: int = 5, |
| roles: Optional[List[str]] = None) -> Dict[str, Any]: |
| """Decode state vector into response. |
| |
| Returns dict with: |
| - nearest_concepts: closest named concepts with similarities |
| - nearest_memories: closest memory entries |
| - role_fillers: decoded role-filler pairs (if roles provided) |
| - confidence: overall decoding confidence |
| """ |
| result = {} |
|
|
| |
| concept_scores = [] |
| for name, vec in self.binding._codebook.items(): |
| sim = hamming_similarity(state, vec) |
| concept_scores.append((name, float(sim))) |
| concept_scores.sort(key=lambda x: x[1], reverse=True) |
| result['nearest_concepts'] = concept_scores[:top_k] |
|
|
| |
| memory_results = self.memory.query_nearest(state, k=top_k, use_lsh=True) |
| nearest_memories = [] |
| for idx, dist in memory_results: |
| meta = self.memory._metadata[idx] |
| name = meta.get('name', f'entry_{idx}') if meta else f'entry_{idx}' |
| sim = 1.0 - dist / N_BITS |
| nearest_memories.append({ |
| 'index': idx, |
| 'name': name, |
| 'distance': dist, |
| 'similarity': sim, |
| 'metadata': meta, |
| }) |
| result['nearest_memories'] = nearest_memories |
|
|
| |
| if roles: |
| role_fillers = {} |
| for role_name in roles: |
| role_vec = self.binding.get_concept(role_name) |
| if role_vec is None: |
| role_vec = QueryEncoder(self.binding)._hash_to_vector(role_name) |
| filler = BinaryBinding.unbind(state, role_vec) |
| |
| filler_scores = [] |
| for name, vec in self.binding._codebook.items(): |
| sim = hamming_similarity(filler, vec) |
| filler_scores.append((name, float(sim))) |
| filler_scores.sort(key=lambda x: x[1], reverse=True) |
| role_fillers[role_name] = filler_scores[:3] |
| result['role_fillers'] = role_fillers |
|
|
| |
| if concept_scores: |
| result['confidence'] = concept_scores[0][1] |
| else: |
| result['confidence'] = 0.0 |
|
|
| return result |
|
|
|
|
| class ReasoningStep: |
| """Record of a single reasoning step in the inference chain.""" |
| __slots__ = ['step_num', 'state', 'energy', 'energy_components', |
| 'routing_result', 'decoded', 'timestamp'] |
|
|
| def __init__(self, step_num: int): |
| self.step_num = step_num |
| self.state: Optional[np.ndarray] = None |
| self.energy: float = float('inf') |
| self.energy_components: Dict[str, float] = {} |
| self.routing_result: Optional[RoutingResult] = None |
| self.decoded: Optional[Dict] = None |
| self.timestamp: float = 0.0 |
|
|
|
|
| class ReasoningEngine: |
| """ |
| The MLE Reasoning Engine β core inference loop. |
| |
| Architecture: |
| ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ |
| β Query ββββββΆβ Router ββββββΆβ Binder ββββββΆβ Energy β |
| β Encoder β β (Beam) β β (Compose)β β (Relax) β |
| ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ |
| β β |
| β ββββββββββββ ββββββββββββ β |
| βββββββββββββ Response βββββββ Decode βββββββββββββ |
| β β β β |
| ββββββββββββ ββββββββββββ |
| |
| Reasoning loop: |
| 1. Encode query β binary vector |
| 2. Route β find relevant memories (beam search) |
| 3. Activate memories, construct bindings |
| 4. Minimize energy (Hopfield + binary relaxation) |
| 5. Decode final state β response |
| 6. [Optional] Use result as new query β iterate |
| """ |
|
|
| def __init__( |
| self, |
| memory: Optional[SparseAddressTable] = None, |
| beam_width: int = 500, |
| max_routing_depth: int = 3, |
| max_reasoning_steps: int = 5, |
| energy_mode: str = 'hybrid', |
| hopfield_beta: float = 8.0, |
| relaxation_iterations: int = 50, |
| ): |
| |
| self.memory = memory or SparseAddressTable() |
|
|
| |
| self.binding = BindingEngine(use_binary=True) |
|
|
| |
| self.router = RecursiveJITRouter( |
| memory=self.memory, |
| beam_width=beam_width, |
| max_depth=max_routing_depth, |
| ) |
|
|
| |
| energy_fn = EnergyFunction() |
| self.energy_model = EnergyModel( |
| mode=energy_mode, |
| energy_fn=energy_fn, |
| binary_dynamics=RelaxationDynamics( |
| energy_fn, |
| max_iterations=relaxation_iterations, |
| n_candidates=32, |
| ), |
| hopfield_dynamics=HopfieldDynamics( |
| beta=hopfield_beta, |
| max_iterations=20, |
| ), |
| ) |
|
|
| |
| self.encoder = QueryEncoder(self.binding) |
| self.decoder = ResponseDecoder(self.memory, self.binding) |
|
|
| |
| self.max_reasoning_steps = max_reasoning_steps |
|
|
| |
|
|
| def add_concept(self, name: str, metadata: Optional[Dict] = None) -> np.ndarray: |
| """Add a named concept to both memory and binding codebook.""" |
| vec = self.binding.register_concept(name) |
| self.memory.store_concept(name, content=vec, metadata=metadata) |
| return vec |
|
|
| def add_relation(self, subject: str, relation: str, obj: str, |
| metadata: Optional[Dict] = None) -> int: |
| """Store a semantic relation as a bound vector in memory.""" |
| |
| for name in [subject, relation, obj]: |
| if self.binding.get_concept(name) is None: |
| self.add_concept(name) |
|
|
| |
| s = self.binding.get_concept(subject) |
| r = self.binding.get_concept(relation) |
| o = self.binding.get_concept(obj) |
| bound = BinaryBinding.encode_triple(s, r, o) |
|
|
| |
| meta = metadata or {} |
| meta.update({ |
| 'type': 'relation', |
| 'subject': subject, |
| 'relation': relation, |
| 'object': obj, |
| 'name': f"{subject}_{relation}_{obj}", |
| }) |
|
|
| |
| address = BinaryBinding.bundle(s, r, o) |
| return self.memory.store(address, bound, metadata=meta, |
| name=f"{subject}_{relation}_{obj}") |
|
|
| |
|
|
| def reason(self, query: Any, max_steps: Optional[int] = None, |
| roles: Optional[List[str]] = None, |
| verbose: bool = False) -> Dict[str, Any]: |
| """ |
| Execute full reasoning pipeline. |
| |
| Args: |
| query: input query (string, vector, list of strings, or dict) |
| max_steps: override max reasoning iterations |
| roles: roles to decode in response |
| verbose: print step-by-step info |
| |
| Returns: |
| Dict with: |
| - response: decoded response dict |
| - reasoning_chain: list of ReasoningStep objects |
| - total_energy_reduction: cumulative energy decrease |
| - latency_ms: total time |
| """ |
| t0 = time.perf_counter() |
| steps = max_steps or self.max_reasoning_steps |
| chain: List[ReasoningStep] = [] |
|
|
| |
| query_vec = self.encoder.encode(query) |
| current_state = query_vec.copy() |
|
|
| if verbose: |
| logger.info(f"Query encoded. Starting reasoning with max {steps} steps.") |
|
|
| for step_num in range(steps): |
| step = ReasoningStep(step_num) |
| step.timestamp = time.perf_counter() - t0 |
|
|
| |
| routing = self.router.route_and_activate(current_state) |
| step.routing_result = routing |
|
|
| if verbose: |
| logger.info( |
| f"Step {step_num}: Routed to {len(routing.indices)} candidates " |
| f"(depth={routing.depth}, latency={routing.latency_ms:.1f}ms)" |
| ) |
|
|
| if len(routing.indices) == 0: |
| step.state = current_state |
| chain.append(step) |
| break |
|
|
| |
| active_indices = routing.indices[:min(100, len(routing.indices))] |
| context_vectors = np.ascontiguousarray( |
| self.memory._contents[active_indices] |
| ) |
| act_strengths = routing.activations[:len(active_indices)] |
|
|
| |
| bindings = self._extract_bindings(active_indices) |
|
|
| |
| energy_result = self.energy_model.minimize( |
| current_state, |
| context_vectors, |
| act_strengths, |
| bindings=bindings, |
| ) |
|
|
| new_state = energy_result['final_state'] |
| step.state = new_state |
| step.energy = energy_result['final_energy'] |
|
|
| if verbose: |
| logger.info( |
| f"Step {step_num}: Energy {step.energy:.4f} " |
| f"(iterations={energy_result.get('total_iterations', '?')})" |
| ) |
|
|
| chain.append(step) |
|
|
| |
| if step_num > 0: |
| state_change = hamming_distance(current_state, new_state) / N_BITS |
| energy_change = abs(chain[-2].energy - step.energy) |
| if state_change < 0.01 and energy_change < 1e-4: |
| if verbose: |
| logger.info(f"Reasoning converged at step {step_num}") |
| break |
|
|
| current_state = new_state |
|
|
| |
| final_state = chain[-1].state if chain else query_vec |
| response = self.decoder.decode(final_state, top_k=10, roles=roles) |
|
|
| |
| if chain: |
| chain[-1].decoded = response |
|
|
| total_time = (time.perf_counter() - t0) * 1000 |
|
|
| |
| energies = [s.energy for s in chain if s.energy != float('inf')] |
| energy_reduction = (energies[0] - energies[-1]) if len(energies) >= 2 else 0.0 |
|
|
| return { |
| 'response': response, |
| 'reasoning_chain': chain, |
| 'total_energy_reduction': energy_reduction, |
| 'latency_ms': total_time, |
| 'num_steps': len(chain), |
| 'final_state': final_state, |
| } |
|
|
| def _extract_bindings(self, indices: np.ndarray) -> List[Tuple]: |
| """Extract binding triples from activated memory entries for coherence checking.""" |
| bindings = [] |
| for idx in indices: |
| meta = self.memory._metadata[idx] |
| if meta and meta.get('type') == 'relation': |
| subj = meta.get('subject') |
| rel = meta.get('relation') |
| obj = meta.get('object') |
| if subj and rel and obj: |
| s_vec = self.binding.get_concept(subj) |
| r_vec = self.binding.get_concept(rel) |
| o_vec = self.binding.get_concept(obj) |
| if s_vec is not None and r_vec is not None and o_vec is not None: |
| bound = self.memory._contents[idx] |
| |
| role_composed = BinaryBinding.bind(s_vec, r_vec) |
| bindings.append((bound, role_composed, o_vec)) |
| return bindings[:20] |
|
|
| |
|
|
| def solve_analogy(self, a: str, b: str, c: str, |
| candidates: Optional[List[str]] = None) -> Dict[str, Any]: |
| """ |
| Solve analogy: a is to b as c is to ? |
| |
| Uses binding-based approach: |
| 1. Compute relation R = bind(a, b) [XOR extracts difference] |
| 2. Apply: query = bind(c, R) |
| 3. Route query through memory |
| 4. Decode nearest match |
| """ |
| t0 = time.perf_counter() |
|
|
| |
| for name in [a, b, c]: |
| if self.binding.get_concept(name) is None: |
| self.add_concept(name) |
|
|
| va = self.binding.get_concept(a) |
| vb = self.binding.get_concept(b) |
| vc = self.binding.get_concept(c) |
|
|
| |
| relation = BinaryBinding.bind(va, vb) |
| query = BinaryBinding.bind(vc, relation) |
|
|
| |
| result = self.reason(query, max_steps=2) |
|
|
| |
| codebook_results = self.binding.solve_analogy(a, b, c, candidates) |
|
|
| return { |
| 'query': f"{a} : {b} :: {c} : ?", |
| 'codebook_ranking': codebook_results[:10], |
| 'memory_response': result['response'], |
| 'latency_ms': (time.perf_counter() - t0) * 1000, |
| } |
|
|
| |
|
|
| def associate(self, concept: str, top_k: int = 5) -> List[Tuple[str, float]]: |
| """Find concepts most associated with the given concept.""" |
| vec = self.binding.get_concept(concept) |
| if vec is None: |
| vec = self.encoder.encode(concept) |
|
|
| |
| routing = self.router.route(vec, beam_width=100) |
| if len(routing.indices) == 0: |
| return [] |
|
|
| |
| associations = [] |
| seen = set() |
| for idx in routing.indices: |
| meta = self.memory._metadata[idx] |
| if meta: |
| name = meta.get('name', '') |
| if name and name != concept and name not in seen: |
| sim = 1.0 - routing.distances[len(associations)] / N_BITS \ |
| if len(associations) < len(routing.distances) else 0.0 |
| associations.append((name, float(sim))) |
| seen.add(name) |
| if len(associations) >= top_k: |
| break |
|
|
| return associations |
|
|
| |
|
|
| def compose(self, *concepts: str) -> Dict[str, Any]: |
| """Compose multiple concepts and find what the composition represents.""" |
| vectors = [] |
| for c in concepts: |
| vec = self.binding.get_concept(c) |
| if vec is None: |
| self.add_concept(c) |
| vec = self.binding.get_concept(c) |
| vectors.append(vec) |
|
|
| |
| if len(vectors) == 1: |
| composed = vectors[0] |
| else: |
| composed = BinaryBinding.bundle(*vectors) |
|
|
| |
| result = self.reason(composed, max_steps=2) |
| return { |
| 'input_concepts': concepts, |
| 'response': result['response'], |
| 'latency_ms': result['latency_ms'], |
| } |
|
|
| |
|
|
| def stats(self) -> Dict[str, Any]: |
| """Return engine statistics.""" |
| return { |
| 'memory': self.memory.stats(), |
| 'codebook_size': len(self.binding._codebook), |
| 'beam_width': self.router.beam_width, |
| 'energy_mode': self.energy_model.mode, |
| } |
|
|