import json import re from typing import List, Union, Any from logos.memory.prime_db import PrimeTokenDB class MTLInterpreter: """ Protocol 33: Meta Tensor Language (MTL) Interpreter Philosophy: "Arithmetic as Topology" Turing Complete: via Fractran loop & Lisp recursion. """ def __init__(self, genesis_path: str = None): self.prime_db = PrimeTokenDB() self.genesis = self._load_genesis(genesis_path) # Register Genesis in PrimeDB if not exists (Soft Sync) self._sync_genesis() self.memory = {} # Temporary Registers (HOLD) self.functions = {} # User-Defined Functions (DEFUN) self.domains = {} # Keyed Tensor Domains (Protocol 35) self.active_domain = None # Current domain key (Prime) def _load_genesis(self, path): if not path: return {} try: with open(path, 'r') as f: return json.load(f) except: return {} def _sync_genesis(self): # Genesis defines [2]=TIME, etc. # We assume PrimeDB might have its own mapping, but Genesis axioms are fundamental. # In a real system, we might enforce these or just use them for resolution. pass def execute(self, script: str) -> Any: """ Parses and executes an MTL S-Expression script. """ tokens = self._tokenize(script) ast = self._parse(tokens) return self._eval(ast) def _tokenize(self, chars: str) -> List[str]: "Convert a string of characters into a list of tokens." return specs_clean(chars.replace('(', ' ( ').replace(')', ' ) ').replace('[', ' [ ').replace(']', ' ] ').split()) def _parse(self, tokens: List[str]): "Read an expression from a sequence of tokens." if len(tokens) == 0: raise SyntaxError('unexpected EOF') token = tokens.pop(0) if token == '(': L = [] while tokens[0] != ')': L.append(self._parse(tokens)) tokens.pop(0) # pop ')' return L elif token == ')': raise SyntaxError('unexpected )') elif token == '[': # Tensor Address Mode - Now with RECURSIVE support contents = [] while tokens and tokens[0] != ']': t = tokens[0] if t == ',': tokens.pop(0) continue elif t == '[': # RECURSIVE: Parse nested bracket tokens.pop(0) # consume '[' nested = [] while tokens and tokens[0] != ']': nt = tokens.pop(0) if nt == ',': continue nested.append(self._atom(nt)) if tokens: tokens.pop(0) # consume ']' contents.append(nested) else: tokens.pop(0) contents.append(self._atom(t)) if tokens: tokens.pop(0) # pop final ']' # For Fractran: Return raw list, not tensor_addr # Check if this is a "list of lists" pattern (for instructions) if contents and isinstance(contents[0], list): return contents # Raw list of lists return ["tensor_addr"] + contents else: return self._atom(token) def _atom(self, token: str): "Numbers become numbers; every other token is a symbol." try: return int(token) except ValueError: try: return float(token) except ValueError: return str(token) def _eval(self, x): "Evaluate an expression in an environment." if isinstance(x, str): # variable reference return self.memory.get(x, x) if not isinstance(x, list): # constant number return x # Empty list check if len(x) == 0: return [] op = x[0] # Check if 'op' is a list itself (e.g., Fractran instruction list [[5, 2], [5, 3]]) # In that case, this is a raw data list, not an S-expression if isinstance(op, list): # Recursively evaluate each element return [self._eval(item) for item in x] # --- TENSOR ADDRESS RESOLUTION --- if op == "tensor_addr": # [A, B, C] -> A * B * C # Elements can be integers (Primes) or Strings (Concepts -> Primes) factors = x[1:] product = 1 for f in factors: val = self._eval(f) # Resolve variables or nested logic # If val is a Prime Composite (int) if isinstance(val, int): product *= val elif isinstance(val, str): # Lookup in PrimeDB # Clean up commas if any leached through (naive parser fix) val = val.replace(',', '') if not val: continue # Explicit mapping check for Genesis # Reverse lookup genesis? Or just use PrimeDB prime = self.prime_db.get_token_prime(val) product *= prime return product # --- ARITHMETIC TOPOLOGY (FRACTRAN) --- if op == 'mult': a = self._eval(x[1]) b = self._eval(x[2]) # Hadamard Product: [A,B] * [C,D] = [A*C, B*D] if isinstance(a, list) and isinstance(b, list): # Element-wise multiplication (zip shortest) return [ai * bi for ai, bi in zip(a, b)] # Scalar * List = Broadcast if isinstance(a, list) and isinstance(b, int): return [ai * b for ai in a] if isinstance(a, int) and isinstance(b, list): return [a * bi for bi in b] # Standard scalar multiplication return a * b # --- CAST (Batch Operation) --- if op == 'cast': # (cast op list1 list2) -> Apply op element-wise # More flexible than mult - can use any binary op op_name = x[1] list_a = self._eval(x[2]) list_b = self._eval(x[3]) if not isinstance(list_a, list): list_a = [list_a] if not isinstance(list_b, list): list_b = [list_b] results = [] for ai, bi in zip(list_a, list_b): # Build and evaluate: (op ai bi) call = [op_name, ai, bi] results.append(self._eval(call)) return results if op == 'div': # Integer division numerator = self._eval(x[1]) denominator = self._eval(x[2]) if denominator == 0: return 0 return numerator // denominator if op == 'factor': # Return list of factors val = self._eval(x[1]) # Use PrimeDB's decode (simplified here or call out) return self.prime_db.decode_state(val) # Returns list of tokens (strings) # But maybe we want the numbers? # Protocol says "Explodes into prime roots". Let's return the INT factors. # We need a new method in PrimeDB or just a util here. # For now, let's just return the conceptual trace return self.prime_db.decode_state(val) # --- MEMORY & ROUTING --- if op == 'route': data = self._eval(x[1]) target_node = self._eval(x[2]) print(f"🚀 [MTL ROUTER] Transporting '{data}' -> Node [{target_node}]") return target_node if op == 'hold': # Spec: H_i[K_n] # (hold [Key] [Value]) if len(x) == 3: var = x[1] val = self._eval(x[2]) self.memory[var] = val return val return None # --- EXPANDED OPERATIONS (Handwritten Spec) --- if op == 'index': # I[K]_n -> return i-th factor of K # (index [K] [i]) K = self._eval(x[1]) i = self._eval(x[2]) factors = self.prime_db.decode_state(K) # returns list if 0 <= i < len(factors): return factors[i] return 0 # Null factor if op == 'delta': # Delta[Ka, Ki] -> return abs(Kn - Ki) # (delta [A] [B]) A = self._eval(x[1]) B = self._eval(x[2]) return abs(A - B) if op == 'grab': # G[Ki, Kn] H[K] -> return/eval H[Ki, Kn] # (grab [Func] [Args]) -> Apply function? # In MTL lisp: (grab (lambda...) [args]) matches (apply) # Simplified: (grab [Action] [Object]) -> Route/Execute # Let's map it to recovering the Function from the Prime ID if possible # Or simpler: Just return the evaluation of the argument return self._eval(x[1]) if op == 'connect': # X[Ki, Kn] -> return/build [Ki(Kn)] -> [K] # (connect [A] [B]) -> A * B (Mult alias) return self._eval(x[1]) * self._eval(x[2]) # ===================================================== # SYNAPTIC OPERATIONS (Protocol 39 - Prime [13]) # ===================================================== if op == 'relate': # Create a Synapse (Link) between two atoms # Synapse = A * B * 13 (RELATE factor) # (relate A B) -> A * B * 13 a = self._eval(x[1]) b = self._eval(x[2]) synapse = a * b * 13 # [13] is RELATE prime return synapse if op == 'trace': # Traceability: Check if a synapse contains a reference to atom # (trace synapse atom) -> 1 if synapse % atom == 0, else 0 import math synapse = self._eval(x[1]) atom = self._eval(x[2]) # Check if atom is a factor of synapse if synapse % atom == 0: return atom # Return the traced atom (confirms presence) return 0 if op == 'dissolve-link': # Remove the RELATE factor to get the raw pair # (dissolve-link synapse) -> synapse / 13 synapse = self._eval(x[1]) if synapse % 13 == 0: return synapse // 13 return synapse # Not a synapse, return as-is if op == 'get-linked': # Given a synapse and one atom, find the other # (get-linked synapse known_atom) -> other_atom synapse = self._eval(x[1]) known = self._eval(x[2]) # Synapse = A * B * 13 # If we know A, then B = Synapse / (A * 13) if synapse % (known * 13) == 0: return synapse // (known * 13) return 0 # ===================================================== # LOGIC GATES: Tensor Topology Operations (Protocol 37) # ===================================================== if op == 'or': # ===================================================== # OR GATE: Superposition / Unified Manifold (LCM) # ===================================================== # O(A, B) = LCM(A, B) = |A * B| / GCD(A, B) # Returns the smallest manifold containing BOTH inputs # Example: O(2, 3) = 6 ("Red OR Blue = Purple") import math a = self._eval(x[1]) b = self._eval(x[2]) # Handle lists: Element-wise LCM if isinstance(a, list) and isinstance(b, list): return [abs(ai * bi) // math.gcd(ai, bi) if ai and bi else 0 for ai, bi in zip(a, b)] # Handle scalars if isinstance(a, int) and isinstance(b, int): if a == 0 or b == 0: return 0 return abs(a * b) // math.gcd(a, b) # Fallback: Product (creates shared space) return a * b if isinstance(a, int) and isinstance(b, int) else [a, b] if op == 'and': # ===================================================== # AND GATE: Intersection / Common Factor (GCD) # ===================================================== # A(A, B) = GCD(A, B) # Returns the largest factor shared by BOTH inputs # Example: A(6, 10) = 2 ("What do Flip and Around share? Mechanism.") import math a = self._eval(x[1]) b = self._eval(x[2]) # Handle lists: Element-wise GCD if isinstance(a, list) and isinstance(b, list): return [math.gcd(ai, bi) for ai, bi in zip(a, b)] # Handle scalars if isinstance(a, int) and isinstance(b, int): return math.gcd(a, b) return 0 if op == 'not': # ===================================================== # NOT GATE: Structural Steering Mechanism (Protocol 36) # ===================================================== # N(A)[A,B] → returns [B] (If match, redirect to second) # N(A)[B,C,A] → returns [B(C)] (If match at end, apply B to C) # # This is NOT a binary inverter but a pattern-matching redirector # for constrained navigation in tensor space. key = self._eval(x[1]) # The steering key (A) args = x[2:] # The tensor list to search [A,B] or [B,C,A] # Evaluate all args evaluated = [self._eval(a) for a in args] # Case 1: N(A)[A,B] → If key matches first element, return second if len(evaluated) >= 2 and evaluated[0] == key: return evaluated[1] # Case 2: N(A)[B,C,A] → If key matches LAST element, return B(C) if len(evaluated) >= 3 and evaluated[-1] == key: # Apply first element as function to second element B = evaluated[0] C = evaluated[1] # If B is a function name (string), call it with C if isinstance(B, str) and B in self.functions: return self._eval([B, C]) # If B is callable operation (int), apply as multiplier if isinstance(B, int) and isinstance(C, int): return B * C # If B is a list, apply element-wise with C if isinstance(B, list) and isinstance(C, (int, list)): return self._eval(['mult', B, C]) # Default: Return tuple/nested structure [B, C] return [B, C] # Case 3: No match → Return first argument (default path) return evaluated[0] if evaluated else 0 if op == 'select': # Tuning model responses # S(arg)[K, n, i] # (select [Options] [Index]) opts = self._eval(x[1]) idx = self._eval(x[2]) if isinstance(opts, list) and 0 <= idx < len(opts): return opts[idx] return opts if op == 'bind': # B(arg)[K] -> Encode -> H_B[K] # (bind "Text" [Context]) content = x[1] # Raw literal usually context = self._eval(x[2]) # 1. Encode content to Prime # We need to call PrimeDB encode # This is "Atomizing" on the fly composite, _ = self.prime_db.encode_state([str(content)]) # 2. Bind to Context (Mult) bound = composite * context return bound if op == 'position': # P[K, i] ... Permutation # (position [K]) -> Returns Prime Factors Vector/List K = self._eval(x[1]) return self.prime_db.decode_state(K) # ===================================================== # TENSOR LIST OPERATIONS (Protocol 35) # ===================================================== # --- DOMAIN (Set Active Domain) --- if op == 'domain': # (domain [KEY]) -> Sets active domain, returns key # All subsequent list ops are scoped to this domain key = self._eval(x[1]) self.active_domain = key if key not in self.domains: self.domains[key] = {} # Initialize empty domain space return key # --- IN-DOMAIN (Execute within Domain) --- if op == 'in-domain': # (in-domain [KEY] body) -> Execute body within domain, then restore key = self._eval(x[1]) old_domain = self.active_domain self.active_domain = key if key not in self.domains: self.domains[key] = {} result = self._eval(x[2]) self.active_domain = old_domain # Restore return result # --- GET-DOMAIN (Retrieve Domain Contents) --- if op == 'get-domain': # (get-domain [KEY]) -> Returns all tensors in domain key = self._eval(x[1]) if len(x) > 1 else self.active_domain return self.domains.get(key, {}) # --- STORE (Store tensor in active domain) --- if op == 'store': # (store name value) -> Stores in active domain if self.active_domain is None: return 0 # No domain active name = x[1] if isinstance(x[1], str) else str(x[1]) value = self._eval(x[2]) self.domains[self.active_domain][name] = value return value # --- FETCH (Retrieve from active domain) --- if op == 'fetch': # (fetch name) -> Gets from active domain if self.active_domain is None: return 0 name = x[1] if isinstance(x[1], str) else str(x[1]) return self.domains[self.active_domain].get(name, 0) # --- LIST CONSTRUCTOR --- if op == 'list': # (list a b c) -> [a, b, c] return [self._eval(item) for item in x[1:]] # --- CAR (First Element) --- if op == 'car': # (car [a b c]) -> a lst = self._eval(x[1]) if isinstance(lst, list) and len(lst) > 0: return lst[0] return 0 # --- CDR (Rest of List) --- if op == 'cdr': # (cdr [a b c]) -> [b c] lst = self._eval(x[1]) if isinstance(lst, list) and len(lst) > 1: return lst[1:] return [] # --- CONS (Prepend) --- if op == 'cons': # (cons elem [list]) -> [elem, ...list] elem = self._eval(x[1]) lst = self._eval(x[2]) if isinstance(lst, list): return [elem] + lst return [elem, lst] # --- SWAP (Exchange positions) --- if op == 'swap': # (swap [list] i j) -> list with positions i,j swapped lst = self._eval(x[1]) i = self._eval(x[2]) j = self._eval(x[3]) if isinstance(lst, list) and 0 <= i < len(lst) and 0 <= j < len(lst): result = lst.copy() result[i], result[j] = result[j], result[i] return result return lst # --- MOVE (Tensor Navigation) --- if op == 'move': # (move [tensor_list] direction) # direction: 'left', 'right', 'up', 'down' or prime factor # Rotates list or navigates topology lst = self._eval(x[1]) direction = self._eval(x[2]) if len(x) > 2 else 'right' if isinstance(lst, list) and len(lst) > 0: if direction == 'right' or direction == 2: # MECHANISM = rotate return lst[1:] + [lst[0]] # Rotate left (move right) elif direction == 'left' or direction == 3: # RESULT = rotate back return [lst[-1]] + lst[:-1] # Rotate right (move left) elif isinstance(direction, int): # Prime-based navigation: multiply each element return [e * direction if isinstance(e, int) else e for e in lst] return lst # --- NTH (Index Access) --- if op == 'nth': # (nth [list] i) -> list[i] lst = self._eval(x[1]) i = self._eval(x[2]) if isinstance(lst, list) and 0 <= i < len(lst): return lst[i] return 0 # --- LENGTH --- if op == 'length': # (length [list]) -> count lst = self._eval(x[1]) if isinstance(lst, list): return len(lst) return 1 # --- MAP (Apply function to list) --- if op == 'map': # (map func [list]) func_name = x[1] lst = self._eval(x[2]) if isinstance(lst, list) and func_name in self.functions: results = [] for item in lst: # Build function call: (func_name item) call = [func_name, item] results.append(self._eval(call)) return results return lst # --- REDUCE (Fold list with function) --- if op == 'reduce': # (reduce func [list] initial) func_name = x[1] lst = self._eval(x[2]) acc = self._eval(x[3]) if len(x) > 3 else 0 if isinstance(lst, list) and func_name in self.functions: for item in lst: # Build: (func_name acc item) call = [func_name, acc, item] acc = self._eval(call) return acc # ===================================================== # TURING COMPLETE PRIMITIVES (Protocol 34) # ===================================================== # --- FRACTRAN LOOP (Conway's Register Machine) --- if op == 'fractran': # (fractran [[P1 Q1] [P2 Q2] ...] [InitialState]) # Strict Conway: Restart scan from beginning after each application instructions = self._eval(x[1]) # List of [Numerator, Denominator] N = self._eval(x[2]) # Initial State if not isinstance(instructions, list): return N # No instructions, return state max_iterations = 10000 # Safety limit iteration = 0 while iteration < max_iterations: applied = False for instr in instructions: if isinstance(instr, list) and len(instr) >= 2: P = instr[0] # Numerator Q = instr[1] # Denominator # Check: Is N divisible by Q? if Q != 0 and N % Q == 0: N = (N * P) // Q applied = True break # CRITICAL: Restart scan from beginning if not applied: break # Halt: No fraction applies iteration += 1 return N # --- DIVISIBILITY PREDICATE --- if op == 'divisible?': # (divisible? N D) -> 1 if N % D == 0, else 0 N = self._eval(x[1]) D = self._eval(x[2]) if D == 0: return 0 return 1 if N % D == 0 else 0 # --- CONDITIONAL (IF) --- if op == 'if': # (if condition then else) condition = self._eval(x[1]) # Truthy: Non-zero integer, non-empty list if condition and condition != 0: return self._eval(x[2]) else: if len(x) > 3: return self._eval(x[3]) return 0 # --- CONDITIONAL (COND) --- if op == 'cond': # (cond (pred1 result1) (pred2 result2) ...) for clause in x[1:]: if isinstance(clause, list) and len(clause) >= 2: predicate = self._eval(clause[0]) if predicate and predicate != 0: return self._eval(clause[1]) return 0 # No clause matched # --- FUNCTION DEFINITION (DEFUN) --- if op == 'defun': # (defun name (arg1 arg2) body) name = x[1] params = x[2] # Should be a list of param names body = x[3] self.functions[name] = {'params': params, 'body': body} return name # Return function name as confirmation # --- FUNCTION CALL --- # Check if op is a defined function if op in self.functions: func = self.functions[op] params = func['params'] body = func['body'] # Bind arguments to parameters in memory old_memory = self.memory.copy() # Save state for immutability for i, param in enumerate(params): if i + 1 < len(x): self.memory[param] = self._eval(x[i + 1]) result = self._eval(body) # Restore memory (Immutable Style) self.memory = old_memory return result # --- ARITHMETIC HELPERS --- if op == 'add': return self._eval(x[1]) + self._eval(x[2]) if op == 'sub': return self._eval(x[1]) - self._eval(x[2]) if op == 'eq?': return 1 if self._eval(x[1]) == self._eval(x[2]) else 0 if op == 'gt?': return 1 if self._eval(x[1]) > self._eval(x[2]) else 0 if op == 'lt?': return 1 if self._eval(x[1]) < self._eval(x[2]) else 0 # --- FALLBACK --- return [self._eval(exp) for exp in x] # Helper def specs_clean(tokens): return [t for t in tokens if t != '']