""" MLE Inference Module: Reasoning Engine ======================================== The complete reasoning pipeline that integrates all modules: Query → Encoding → Routing → Binding → Energy Minimization → Response The "chain of thought" is an internal trajectory through state space: 1. ENCODE: Transform input query into a binary vector 2. ROUTE: Activate relevant memory regions (top-500 beam) 3. BIND: Construct composite representations from activated memories 4. RELAX: Minimize energy to find coherent state 5. DECODE: Extract response from final state Multiple reasoning iterations are possible (iterative deepening): each iteration uses the previous result as context for the next. """ import numpy as np from typing import Optional, List, Dict, Any, Tuple import logging import time import hashlib from ..memory.sparse_address_table import SparseAddressTable from ..routing.recursive_jit_router import RecursiveJITRouter, RoutingResult from ..binding.semantic_binding import BindingEngine, BinaryBinding from ..energy.energy_model import EnergyModel, EnergyFunction, RelaxationDynamics, HopfieldDynamics from ..utils.simd_ops import ( N_BITS, N_WORDS, random_binary_vector, random_binary_vectors, hamming_distance, hamming_similarity, xor_vectors, majority_vote ) logger = logging.getLogger(__name__) class QueryEncoder: """ Encode text/symbolic queries into binary vectors. Encoding strategies: 1. Named concept lookup (exact match in codebook) 2. Hash-based encoding (deterministic binary vector from string) 3. Composite encoding (bind multiple concepts together) """ def __init__(self, binding_engine: BindingEngine): self.binding = binding_engine def encode(self, query: Any) -> np.ndarray: """Encode a query into a binary vector. Supports: - str: hash-based or codebook lookup - np.ndarray: pass-through (already a vector) - list of str: composite binding of concepts - dict: role-filler structure encoding """ if isinstance(query, np.ndarray): return query.copy() elif isinstance(query, str): return self._encode_string(query) elif isinstance(query, (list, tuple)): return self._encode_composite(query) elif isinstance(query, dict): return self._encode_structure(query) else: raise TypeError(f"Cannot encode query of type {type(query)}") def _encode_string(self, text: str) -> np.ndarray: """Encode a string to binary vector. First tries codebook lookup, then falls back to hash. """ # Try codebook lookup vec = self.binding.get_concept(text) if vec is not None: return vec.copy() # Hash-based encoding: deterministic binary vector from string return self._hash_to_vector(text) def _hash_to_vector(self, text: str) -> np.ndarray: """Deterministic hash-based encoding. Uses SHA-512 repeatedly to fill 4096 bits. """ n_hashes = N_BITS // 512 + 1 # SHA-512 = 512 bits bits = [] for i in range(n_hashes): h = hashlib.sha512(f"{text}:{i}".encode()).digest() bits.append(np.frombuffer(h, dtype=np.uint8)) all_bytes = np.concatenate(bits)[:N_BITS // 8] return all_bytes.view(np.uint64).copy() def _encode_composite(self, concepts: List[str]) -> np.ndarray: """Encode a composite query by bundling encoded concepts.""" vectors = [self._encode_string(c) for c in concepts] if len(vectors) == 1: return vectors[0] return BinaryBinding.bundle(*vectors) def _encode_structure(self, structure: Dict[str, str]) -> np.ndarray: """Encode a structured query (role:filler pairs). Example: {"subject": "cat", "relation": "is_a"} → bind(encode("subject"), encode("cat")) ⊕ bind(encode("relation"), encode("is_a")) """ pairs = [] for role, filler in structure.items(): role_vec = self._encode_string(role) filler_vec = self._encode_string(filler) pairs.append(BinaryBinding.bind(role_vec, filler_vec)) return BinaryBinding.bundle(*pairs) if len(pairs) > 1 else pairs[0] class ResponseDecoder: """ Decode final state vector into interpretable response. Strategies: 1. Nearest neighbor: find closest named concept(s) in codebook 2. Unbinding: recover specific fillers for given roles 3. Activation readout: return most-activated memory entries """ def __init__(self, memory: SparseAddressTable, binding_engine: BindingEngine): self.memory = memory self.binding = binding_engine def decode(self, state: np.ndarray, top_k: int = 5, roles: Optional[List[str]] = None) -> Dict[str, Any]: """Decode state vector into response. Returns dict with: - nearest_concepts: closest named concepts with similarities - nearest_memories: closest memory entries - role_fillers: decoded role-filler pairs (if roles provided) - confidence: overall decoding confidence """ result = {} # 1. Nearest concepts in codebook concept_scores = [] for name, vec in self.binding._codebook.items(): sim = hamming_similarity(state, vec) concept_scores.append((name, float(sim))) concept_scores.sort(key=lambda x: x[1], reverse=True) result['nearest_concepts'] = concept_scores[:top_k] # 2. Nearest memory entries memory_results = self.memory.query_nearest(state, k=top_k, use_lsh=True) nearest_memories = [] for idx, dist in memory_results: meta = self.memory._metadata[idx] name = meta.get('name', f'entry_{idx}') if meta else f'entry_{idx}' sim = 1.0 - dist / N_BITS nearest_memories.append({ 'index': idx, 'name': name, 'distance': dist, 'similarity': sim, 'metadata': meta, }) result['nearest_memories'] = nearest_memories # 3. Role-filler decoding if roles: role_fillers = {} for role_name in roles: role_vec = self.binding.get_concept(role_name) if role_vec is None: role_vec = QueryEncoder(self.binding)._hash_to_vector(role_name) filler = BinaryBinding.unbind(state, role_vec) # Find nearest concept to recovered filler filler_scores = [] for name, vec in self.binding._codebook.items(): sim = hamming_similarity(filler, vec) filler_scores.append((name, float(sim))) filler_scores.sort(key=lambda x: x[1], reverse=True) role_fillers[role_name] = filler_scores[:3] result['role_fillers'] = role_fillers # 4. Confidence (based on how close the best match is) if concept_scores: result['confidence'] = concept_scores[0][1] else: result['confidence'] = 0.0 return result class ReasoningStep: """Record of a single reasoning step in the inference chain.""" __slots__ = ['step_num', 'state', 'energy', 'energy_components', 'routing_result', 'decoded', 'timestamp'] def __init__(self, step_num: int): self.step_num = step_num self.state: Optional[np.ndarray] = None self.energy: float = float('inf') self.energy_components: Dict[str, float] = {} self.routing_result: Optional[RoutingResult] = None self.decoded: Optional[Dict] = None self.timestamp: float = 0.0 class ReasoningEngine: """ The MLE Reasoning Engine — core inference loop. Architecture: ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ Query │────▶│ Router │────▶│ Binder │────▶│ Energy │ │ Encoder │ │ (Beam) │ │ (Compose)│ │ (Relax) │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │ │ ┌──────────┐ ┌──────────┐ │ └───────────│ Response │◀────│ Decode │◀──────────┘ │ │ │ │ └──────────┘ └──────────┘ Reasoning loop: 1. Encode query → binary vector 2. Route → find relevant memories (beam search) 3. Activate memories, construct bindings 4. Minimize energy (Hopfield + binary relaxation) 5. Decode final state → response 6. [Optional] Use result as new query → iterate """ def __init__( self, memory: Optional[SparseAddressTable] = None, beam_width: int = 500, max_routing_depth: int = 3, max_reasoning_steps: int = 5, energy_mode: str = 'hybrid', hopfield_beta: float = 8.0, relaxation_iterations: int = 50, ): # Memory self.memory = memory or SparseAddressTable() # Binding engine self.binding = BindingEngine(use_binary=True) # Router self.router = RecursiveJITRouter( memory=self.memory, beam_width=beam_width, max_depth=max_routing_depth, ) # Energy model energy_fn = EnergyFunction() self.energy_model = EnergyModel( mode=energy_mode, energy_fn=energy_fn, binary_dynamics=RelaxationDynamics( energy_fn, max_iterations=relaxation_iterations, n_candidates=32, ), hopfield_dynamics=HopfieldDynamics( beta=hopfield_beta, max_iterations=20, ), ) # Encoder/Decoder self.encoder = QueryEncoder(self.binding) self.decoder = ResponseDecoder(self.memory, self.binding) # Config self.max_reasoning_steps = max_reasoning_steps # ── Knowledge Management ────────────────────────────────────────────── def add_concept(self, name: str, metadata: Optional[Dict] = None) -> np.ndarray: """Add a named concept to both memory and binding codebook.""" vec = self.binding.register_concept(name) self.memory.store_concept(name, content=vec, metadata=metadata) return vec def add_relation(self, subject: str, relation: str, obj: str, metadata: Optional[Dict] = None) -> int: """Store a semantic relation as a bound vector in memory.""" # Ensure all concepts exist for name in [subject, relation, obj]: if self.binding.get_concept(name) is None: self.add_concept(name) # Create bound representation s = self.binding.get_concept(subject) r = self.binding.get_concept(relation) o = self.binding.get_concept(obj) bound = BinaryBinding.encode_triple(s, r, o) # Store in memory meta = metadata or {} meta.update({ 'type': 'relation', 'subject': subject, 'relation': relation, 'object': obj, 'name': f"{subject}_{relation}_{obj}", }) # Address = blend of subject and relation (for routing) address = BinaryBinding.bundle(s, r, o) return self.memory.store(address, bound, metadata=meta, name=f"{subject}_{relation}_{obj}") # ── Core Reasoning ──────────────────────────────────────────────────── def reason(self, query: Any, max_steps: Optional[int] = None, roles: Optional[List[str]] = None, verbose: bool = False) -> Dict[str, Any]: """ Execute full reasoning pipeline. Args: query: input query (string, vector, list of strings, or dict) max_steps: override max reasoning iterations roles: roles to decode in response verbose: print step-by-step info Returns: Dict with: - response: decoded response dict - reasoning_chain: list of ReasoningStep objects - total_energy_reduction: cumulative energy decrease - latency_ms: total time """ t0 = time.perf_counter() steps = max_steps or self.max_reasoning_steps chain: List[ReasoningStep] = [] # ── Step 0: Encode query ────────────────────────────────────── query_vec = self.encoder.encode(query) current_state = query_vec.copy() if verbose: logger.info(f"Query encoded. Starting reasoning with max {steps} steps.") for step_num in range(steps): step = ReasoningStep(step_num) step.timestamp = time.perf_counter() - t0 # ── Step 1: Route — find relevant memories ──────────────── routing = self.router.route_and_activate(current_state) step.routing_result = routing if verbose: logger.info( f"Step {step_num}: Routed to {len(routing.indices)} candidates " f"(depth={routing.depth}, latency={routing.latency_ms:.1f}ms)" ) if len(routing.indices) == 0: step.state = current_state chain.append(step) break # ── Step 2: Gather context from activated memories ──────── active_indices = routing.indices[:min(100, len(routing.indices))] context_vectors = np.ascontiguousarray( self.memory._contents[active_indices] ) act_strengths = routing.activations[:len(active_indices)] # ── Step 3: Construct bindings for coherence checking ───── bindings = self._extract_bindings(active_indices) # ── Step 4: Energy minimization ─────────────────────────── energy_result = self.energy_model.minimize( current_state, context_vectors, act_strengths, bindings=bindings, ) new_state = energy_result['final_state'] step.state = new_state step.energy = energy_result['final_energy'] if verbose: logger.info( f"Step {step_num}: Energy {step.energy:.4f} " f"(iterations={energy_result.get('total_iterations', '?')})" ) chain.append(step) # ── Check convergence ───────────────────────────────────── if step_num > 0: state_change = hamming_distance(current_state, new_state) / N_BITS energy_change = abs(chain[-2].energy - step.energy) if state_change < 0.01 and energy_change < 1e-4: if verbose: logger.info(f"Reasoning converged at step {step_num}") break current_state = new_state # ── Decode final state ──────────────────────────────────────── final_state = chain[-1].state if chain else query_vec response = self.decoder.decode(final_state, top_k=10, roles=roles) # Add decoding to last step if chain: chain[-1].decoded = response total_time = (time.perf_counter() - t0) * 1000 # Energy trajectory energies = [s.energy for s in chain if s.energy != float('inf')] energy_reduction = (energies[0] - energies[-1]) if len(energies) >= 2 else 0.0 return { 'response': response, 'reasoning_chain': chain, 'total_energy_reduction': energy_reduction, 'latency_ms': total_time, 'num_steps': len(chain), 'final_state': final_state, } def _extract_bindings(self, indices: np.ndarray) -> List[Tuple]: """Extract binding triples from activated memory entries for coherence checking.""" bindings = [] for idx in indices: meta = self.memory._metadata[idx] if meta and meta.get('type') == 'relation': subj = meta.get('subject') rel = meta.get('relation') obj = meta.get('object') if subj and rel and obj: s_vec = self.binding.get_concept(subj) r_vec = self.binding.get_concept(rel) o_vec = self.binding.get_concept(obj) if s_vec is not None and r_vec is not None and o_vec is not None: bound = self.memory._contents[idx] # Check: unbind(bound, bind(s,r)) should ≈ o role_composed = BinaryBinding.bind(s_vec, r_vec) bindings.append((bound, role_composed, o_vec)) return bindings[:20] # Limit for performance # ── Analogy Solving ─────────────────────────────────────────────── def solve_analogy(self, a: str, b: str, c: str, candidates: Optional[List[str]] = None) -> Dict[str, Any]: """ Solve analogy: a is to b as c is to ? Uses binding-based approach: 1. Compute relation R = bind(a, b) [XOR extracts difference] 2. Apply: query = bind(c, R) 3. Route query through memory 4. Decode nearest match """ t0 = time.perf_counter() # Get or create concept vectors for name in [a, b, c]: if self.binding.get_concept(name) is None: self.add_concept(name) va = self.binding.get_concept(a) vb = self.binding.get_concept(b) vc = self.binding.get_concept(c) # Compute analogy query relation = BinaryBinding.bind(va, vb) query = BinaryBinding.bind(vc, relation) # Route and find matches result = self.reason(query, max_steps=2) # Also do direct codebook comparison codebook_results = self.binding.solve_analogy(a, b, c, candidates) return { 'query': f"{a} : {b} :: {c} : ?", 'codebook_ranking': codebook_results[:10], 'memory_response': result['response'], 'latency_ms': (time.perf_counter() - t0) * 1000, } # ── Association ─────────────────────────────────────────────────── def associate(self, concept: str, top_k: int = 5) -> List[Tuple[str, float]]: """Find concepts most associated with the given concept.""" vec = self.binding.get_concept(concept) if vec is None: vec = self.encoder.encode(concept) # Route through memory routing = self.router.route(vec, beam_width=100) if len(routing.indices) == 0: return [] # Collect associated concepts associations = [] seen = set() for idx in routing.indices: meta = self.memory._metadata[idx] if meta: name = meta.get('name', '') if name and name != concept and name not in seen: sim = 1.0 - routing.distances[len(associations)] / N_BITS \ if len(associations) < len(routing.distances) else 0.0 associations.append((name, float(sim))) seen.add(name) if len(associations) >= top_k: break return associations # ── Composition ─────────────────────────────────────────────────── def compose(self, *concepts: str) -> Dict[str, Any]: """Compose multiple concepts and find what the composition represents.""" vectors = [] for c in concepts: vec = self.binding.get_concept(c) if vec is None: self.add_concept(c) vec = self.binding.get_concept(c) vectors.append(vec) # Bundle all concepts if len(vectors) == 1: composed = vectors[0] else: composed = BinaryBinding.bundle(*vectors) # Decode what this composition means result = self.reason(composed, max_steps=2) return { 'input_concepts': concepts, 'response': result['response'], 'latency_ms': result['latency_ms'], } # ── Stats ───────────────────────────────────────────────────────── def stats(self) -> Dict[str, Any]: """Return engine statistics.""" return { 'memory': self.memory.stats(), 'codebook_size': len(self.binding._codebook), 'beam_width': self.router.beam_width, 'energy_mode': self.energy_model.mode, }