LOGOS-SPCW-Matroska / logos /mtl /interpreter.py
GitHub Copilot
LOGOS v1.0: MTL Turing Complete, Genesis Kernel, SPCW Transceiver, Harmonizer
6d3aa82
import json
import re
from typing import List, Union, Any
from logos.memory.prime_db import PrimeTokenDB
class MTLInterpreter:
"""
Protocol 33: Meta Tensor Language (MTL) Interpreter
Philosophy: "Arithmetic as Topology"
Turing Complete: via Fractran loop & Lisp recursion.
"""
def __init__(self, genesis_path: str = None):
self.prime_db = PrimeTokenDB()
self.genesis = self._load_genesis(genesis_path)
# Register Genesis in PrimeDB if not exists (Soft Sync)
self._sync_genesis()
self.memory = {} # Temporary Registers (HOLD)
self.functions = {} # User-Defined Functions (DEFUN)
self.domains = {} # Keyed Tensor Domains (Protocol 35)
self.active_domain = None # Current domain key (Prime)
def _load_genesis(self, path):
if not path: return {}
try:
with open(path, 'r') as f:
return json.load(f)
except: return {}
def _sync_genesis(self):
# Genesis defines [2]=TIME, etc.
# We assume PrimeDB might have its own mapping, but Genesis axioms are fundamental.
# In a real system, we might enforce these or just use them for resolution.
pass
def execute(self, script: str) -> Any:
"""
Parses and executes an MTL S-Expression script.
"""
tokens = self._tokenize(script)
ast = self._parse(tokens)
return self._eval(ast)
def _tokenize(self, chars: str) -> List[str]:
"Convert a string of characters into a list of tokens."
return specs_clean(chars.replace('(', ' ( ').replace(')', ' ) ').replace('[', ' [ ').replace(']', ' ] ').split())
def _parse(self, tokens: List[str]):
"Read an expression from a sequence of tokens."
if len(tokens) == 0:
raise SyntaxError('unexpected EOF')
token = tokens.pop(0)
if token == '(':
L = []
while tokens[0] != ')':
L.append(self._parse(tokens))
tokens.pop(0) # pop ')'
return L
elif token == ')':
raise SyntaxError('unexpected )')
elif token == '[':
# Tensor Address Mode - Now with RECURSIVE support
contents = []
while tokens and tokens[0] != ']':
t = tokens[0]
if t == ',':
tokens.pop(0)
continue
elif t == '[':
# RECURSIVE: Parse nested bracket
tokens.pop(0) # consume '['
nested = []
while tokens and tokens[0] != ']':
nt = tokens.pop(0)
if nt == ',': continue
nested.append(self._atom(nt))
if tokens: tokens.pop(0) # consume ']'
contents.append(nested)
else:
tokens.pop(0)
contents.append(self._atom(t))
if tokens: tokens.pop(0) # pop final ']'
# For Fractran: Return raw list, not tensor_addr
# Check if this is a "list of lists" pattern (for instructions)
if contents and isinstance(contents[0], list):
return contents # Raw list of lists
return ["tensor_addr"] + contents
else:
return self._atom(token)
def _atom(self, token: str):
"Numbers become numbers; every other token is a symbol."
try: return int(token)
except ValueError:
try: return float(token)
except ValueError:
return str(token)
def _eval(self, x):
"Evaluate an expression in an environment."
if isinstance(x, str): # variable reference
return self.memory.get(x, x)
if not isinstance(x, list): # constant number
return x
# Empty list check
if len(x) == 0:
return []
op = x[0]
# Check if 'op' is a list itself (e.g., Fractran instruction list [[5, 2], [5, 3]])
# In that case, this is a raw data list, not an S-expression
if isinstance(op, list):
# Recursively evaluate each element
return [self._eval(item) for item in x]
# --- TENSOR ADDRESS RESOLUTION ---
if op == "tensor_addr":
# [A, B, C] -> A * B * C
# Elements can be integers (Primes) or Strings (Concepts -> Primes)
factors = x[1:]
product = 1
for f in factors:
val = self._eval(f) # Resolve variables or nested logic
# If val is a Prime Composite (int)
if isinstance(val, int):
product *= val
elif isinstance(val, str):
# Lookup in PrimeDB
# Clean up commas if any leached through (naive parser fix)
val = val.replace(',', '')
if not val: continue
# Explicit mapping check for Genesis
# Reverse lookup genesis? Or just use PrimeDB
prime = self.prime_db.get_token_prime(val)
product *= prime
return product
# --- ARITHMETIC TOPOLOGY (FRACTRAN) ---
if op == 'mult':
a = self._eval(x[1])
b = self._eval(x[2])
# Hadamard Product: [A,B] * [C,D] = [A*C, B*D]
if isinstance(a, list) and isinstance(b, list):
# Element-wise multiplication (zip shortest)
return [ai * bi for ai, bi in zip(a, b)]
# Scalar * List = Broadcast
if isinstance(a, list) and isinstance(b, int):
return [ai * b for ai in a]
if isinstance(a, int) and isinstance(b, list):
return [a * bi for bi in b]
# Standard scalar multiplication
return a * b
# --- CAST (Batch Operation) ---
if op == 'cast':
# (cast op list1 list2) -> Apply op element-wise
# More flexible than mult - can use any binary op
op_name = x[1]
list_a = self._eval(x[2])
list_b = self._eval(x[3])
if not isinstance(list_a, list): list_a = [list_a]
if not isinstance(list_b, list): list_b = [list_b]
results = []
for ai, bi in zip(list_a, list_b):
# Build and evaluate: (op ai bi)
call = [op_name, ai, bi]
results.append(self._eval(call))
return results
if op == 'div':
# Integer division
numerator = self._eval(x[1])
denominator = self._eval(x[2])
if denominator == 0: return 0
return numerator // denominator
if op == 'factor':
# Return list of factors
val = self._eval(x[1])
# Use PrimeDB's decode (simplified here or call out)
return self.prime_db.decode_state(val) # Returns list of tokens (strings)
# But maybe we want the numbers?
# Protocol says "Explodes into prime roots". Let's return the INT factors.
# We need a new method in PrimeDB or just a util here.
# For now, let's just return the conceptual trace
return self.prime_db.decode_state(val)
# --- MEMORY & ROUTING ---
if op == 'route':
data = self._eval(x[1])
target_node = self._eval(x[2])
print(f"🚀 [MTL ROUTER] Transporting '{data}' -> Node [{target_node}]")
return target_node
if op == 'hold':
# Spec: H_i[K_n]
# (hold [Key] [Value])
if len(x) == 3:
var = x[1]
val = self._eval(x[2])
self.memory[var] = val
return val
return None
# --- EXPANDED OPERATIONS (Handwritten Spec) ---
if op == 'index':
# I[K]_n -> return i-th factor of K
# (index [K] [i])
K = self._eval(x[1])
i = self._eval(x[2])
factors = self.prime_db.decode_state(K) # returns list
if 0 <= i < len(factors):
return factors[i]
return 0 # Null factor
if op == 'delta':
# Delta[Ka, Ki] -> return abs(Kn - Ki)
# (delta [A] [B])
A = self._eval(x[1])
B = self._eval(x[2])
return abs(A - B)
if op == 'grab':
# G[Ki, Kn] H[K] -> return/eval H[Ki, Kn]
# (grab [Func] [Args]) -> Apply function?
# In MTL lisp: (grab (lambda...) [args]) matches (apply)
# Simplified: (grab [Action] [Object]) -> Route/Execute
# Let's map it to recovering the Function from the Prime ID if possible
# Or simpler: Just return the evaluation of the argument
return self._eval(x[1])
if op == 'connect':
# X[Ki, Kn] -> return/build [Ki(Kn)] -> [K]
# (connect [A] [B]) -> A * B (Mult alias)
return self._eval(x[1]) * self._eval(x[2])
# =====================================================
# SYNAPTIC OPERATIONS (Protocol 39 - Prime [13])
# =====================================================
if op == 'relate':
# Create a Synapse (Link) between two atoms
# Synapse = A * B * 13 (RELATE factor)
# (relate A B) -> A * B * 13
a = self._eval(x[1])
b = self._eval(x[2])
synapse = a * b * 13 # [13] is RELATE prime
return synapse
if op == 'trace':
# Traceability: Check if a synapse contains a reference to atom
# (trace synapse atom) -> 1 if synapse % atom == 0, else 0
import math
synapse = self._eval(x[1])
atom = self._eval(x[2])
# Check if atom is a factor of synapse
if synapse % atom == 0:
return atom # Return the traced atom (confirms presence)
return 0
if op == 'dissolve-link':
# Remove the RELATE factor to get the raw pair
# (dissolve-link synapse) -> synapse / 13
synapse = self._eval(x[1])
if synapse % 13 == 0:
return synapse // 13
return synapse # Not a synapse, return as-is
if op == 'get-linked':
# Given a synapse and one atom, find the other
# (get-linked synapse known_atom) -> other_atom
synapse = self._eval(x[1])
known = self._eval(x[2])
# Synapse = A * B * 13
# If we know A, then B = Synapse / (A * 13)
if synapse % (known * 13) == 0:
return synapse // (known * 13)
return 0
# =====================================================
# LOGIC GATES: Tensor Topology Operations (Protocol 37)
# =====================================================
if op == 'or':
# =====================================================
# OR GATE: Superposition / Unified Manifold (LCM)
# =====================================================
# O(A, B) = LCM(A, B) = |A * B| / GCD(A, B)
# Returns the smallest manifold containing BOTH inputs
# Example: O(2, 3) = 6 ("Red OR Blue = Purple")
import math
a = self._eval(x[1])
b = self._eval(x[2])
# Handle lists: Element-wise LCM
if isinstance(a, list) and isinstance(b, list):
return [abs(ai * bi) // math.gcd(ai, bi) if ai and bi else 0
for ai, bi in zip(a, b)]
# Handle scalars
if isinstance(a, int) and isinstance(b, int):
if a == 0 or b == 0:
return 0
return abs(a * b) // math.gcd(a, b)
# Fallback: Product (creates shared space)
return a * b if isinstance(a, int) and isinstance(b, int) else [a, b]
if op == 'and':
# =====================================================
# AND GATE: Intersection / Common Factor (GCD)
# =====================================================
# A(A, B) = GCD(A, B)
# Returns the largest factor shared by BOTH inputs
# Example: A(6, 10) = 2 ("What do Flip and Around share? Mechanism.")
import math
a = self._eval(x[1])
b = self._eval(x[2])
# Handle lists: Element-wise GCD
if isinstance(a, list) and isinstance(b, list):
return [math.gcd(ai, bi) for ai, bi in zip(a, b)]
# Handle scalars
if isinstance(a, int) and isinstance(b, int):
return math.gcd(a, b)
return 0
if op == 'not':
# =====================================================
# NOT GATE: Structural Steering Mechanism (Protocol 36)
# =====================================================
# N(A)[A,B] → returns [B] (If match, redirect to second)
# N(A)[B,C,A] → returns [B(C)] (If match at end, apply B to C)
#
# This is NOT a binary inverter but a pattern-matching redirector
# for constrained navigation in tensor space.
key = self._eval(x[1]) # The steering key (A)
args = x[2:] # The tensor list to search [A,B] or [B,C,A]
# Evaluate all args
evaluated = [self._eval(a) for a in args]
# Case 1: N(A)[A,B] → If key matches first element, return second
if len(evaluated) >= 2 and evaluated[0] == key:
return evaluated[1]
# Case 2: N(A)[B,C,A] → If key matches LAST element, return B(C)
if len(evaluated) >= 3 and evaluated[-1] == key:
# Apply first element as function to second element
B = evaluated[0]
C = evaluated[1]
# If B is a function name (string), call it with C
if isinstance(B, str) and B in self.functions:
return self._eval([B, C])
# If B is callable operation (int), apply as multiplier
if isinstance(B, int) and isinstance(C, int):
return B * C
# If B is a list, apply element-wise with C
if isinstance(B, list) and isinstance(C, (int, list)):
return self._eval(['mult', B, C])
# Default: Return tuple/nested structure [B, C]
return [B, C]
# Case 3: No match → Return first argument (default path)
return evaluated[0] if evaluated else 0
if op == 'select':
# Tuning model responses
# S(arg)[K, n, i]
# (select [Options] [Index])
opts = self._eval(x[1])
idx = self._eval(x[2])
if isinstance(opts, list) and 0 <= idx < len(opts):
return opts[idx]
return opts
if op == 'bind':
# B(arg)[K] -> Encode -> H_B[K]
# (bind "Text" [Context])
content = x[1] # Raw literal usually
context = self._eval(x[2])
# 1. Encode content to Prime
# We need to call PrimeDB encode
# This is "Atomizing" on the fly
composite, _ = self.prime_db.encode_state([str(content)])
# 2. Bind to Context (Mult)
bound = composite * context
return bound
if op == 'position':
# P[K, i] ... Permutation
# (position [K]) -> Returns Prime Factors Vector/List
K = self._eval(x[1])
return self.prime_db.decode_state(K)
# =====================================================
# TENSOR LIST OPERATIONS (Protocol 35)
# =====================================================
# --- DOMAIN (Set Active Domain) ---
if op == 'domain':
# (domain [KEY]) -> Sets active domain, returns key
# All subsequent list ops are scoped to this domain
key = self._eval(x[1])
self.active_domain = key
if key not in self.domains:
self.domains[key] = {} # Initialize empty domain space
return key
# --- IN-DOMAIN (Execute within Domain) ---
if op == 'in-domain':
# (in-domain [KEY] body) -> Execute body within domain, then restore
key = self._eval(x[1])
old_domain = self.active_domain
self.active_domain = key
if key not in self.domains:
self.domains[key] = {}
result = self._eval(x[2])
self.active_domain = old_domain # Restore
return result
# --- GET-DOMAIN (Retrieve Domain Contents) ---
if op == 'get-domain':
# (get-domain [KEY]) -> Returns all tensors in domain
key = self._eval(x[1]) if len(x) > 1 else self.active_domain
return self.domains.get(key, {})
# --- STORE (Store tensor in active domain) ---
if op == 'store':
# (store name value) -> Stores in active domain
if self.active_domain is None:
return 0 # No domain active
name = x[1] if isinstance(x[1], str) else str(x[1])
value = self._eval(x[2])
self.domains[self.active_domain][name] = value
return value
# --- FETCH (Retrieve from active domain) ---
if op == 'fetch':
# (fetch name) -> Gets from active domain
if self.active_domain is None:
return 0
name = x[1] if isinstance(x[1], str) else str(x[1])
return self.domains[self.active_domain].get(name, 0)
# --- LIST CONSTRUCTOR ---
if op == 'list':
# (list a b c) -> [a, b, c]
return [self._eval(item) for item in x[1:]]
# --- CAR (First Element) ---
if op == 'car':
# (car [a b c]) -> a
lst = self._eval(x[1])
if isinstance(lst, list) and len(lst) > 0:
return lst[0]
return 0
# --- CDR (Rest of List) ---
if op == 'cdr':
# (cdr [a b c]) -> [b c]
lst = self._eval(x[1])
if isinstance(lst, list) and len(lst) > 1:
return lst[1:]
return []
# --- CONS (Prepend) ---
if op == 'cons':
# (cons elem [list]) -> [elem, ...list]
elem = self._eval(x[1])
lst = self._eval(x[2])
if isinstance(lst, list):
return [elem] + lst
return [elem, lst]
# --- SWAP (Exchange positions) ---
if op == 'swap':
# (swap [list] i j) -> list with positions i,j swapped
lst = self._eval(x[1])
i = self._eval(x[2])
j = self._eval(x[3])
if isinstance(lst, list) and 0 <= i < len(lst) and 0 <= j < len(lst):
result = lst.copy()
result[i], result[j] = result[j], result[i]
return result
return lst
# --- MOVE (Tensor Navigation) ---
if op == 'move':
# (move [tensor_list] direction)
# direction: 'left', 'right', 'up', 'down' or prime factor
# Rotates list or navigates topology
lst = self._eval(x[1])
direction = self._eval(x[2]) if len(x) > 2 else 'right'
if isinstance(lst, list) and len(lst) > 0:
if direction == 'right' or direction == 2: # MECHANISM = rotate
return lst[1:] + [lst[0]] # Rotate left (move right)
elif direction == 'left' or direction == 3: # RESULT = rotate back
return [lst[-1]] + lst[:-1] # Rotate right (move left)
elif isinstance(direction, int):
# Prime-based navigation: multiply each element
return [e * direction if isinstance(e, int) else e for e in lst]
return lst
# --- NTH (Index Access) ---
if op == 'nth':
# (nth [list] i) -> list[i]
lst = self._eval(x[1])
i = self._eval(x[2])
if isinstance(lst, list) and 0 <= i < len(lst):
return lst[i]
return 0
# --- LENGTH ---
if op == 'length':
# (length [list]) -> count
lst = self._eval(x[1])
if isinstance(lst, list):
return len(lst)
return 1
# --- MAP (Apply function to list) ---
if op == 'map':
# (map func [list])
func_name = x[1]
lst = self._eval(x[2])
if isinstance(lst, list) and func_name in self.functions:
results = []
for item in lst:
# Build function call: (func_name item)
call = [func_name, item]
results.append(self._eval(call))
return results
return lst
# --- REDUCE (Fold list with function) ---
if op == 'reduce':
# (reduce func [list] initial)
func_name = x[1]
lst = self._eval(x[2])
acc = self._eval(x[3]) if len(x) > 3 else 0
if isinstance(lst, list) and func_name in self.functions:
for item in lst:
# Build: (func_name acc item)
call = [func_name, acc, item]
acc = self._eval(call)
return acc
# =====================================================
# TURING COMPLETE PRIMITIVES (Protocol 34)
# =====================================================
# --- FRACTRAN LOOP (Conway's Register Machine) ---
if op == 'fractran':
# (fractran [[P1 Q1] [P2 Q2] ...] [InitialState])
# Strict Conway: Restart scan from beginning after each application
instructions = self._eval(x[1]) # List of [Numerator, Denominator]
N = self._eval(x[2]) # Initial State
if not isinstance(instructions, list):
return N # No instructions, return state
max_iterations = 10000 # Safety limit
iteration = 0
while iteration < max_iterations:
applied = False
for instr in instructions:
if isinstance(instr, list) and len(instr) >= 2:
P = instr[0] # Numerator
Q = instr[1] # Denominator
# Check: Is N divisible by Q?
if Q != 0 and N % Q == 0:
N = (N * P) // Q
applied = True
break # CRITICAL: Restart scan from beginning
if not applied:
break # Halt: No fraction applies
iteration += 1
return N
# --- DIVISIBILITY PREDICATE ---
if op == 'divisible?':
# (divisible? N D) -> 1 if N % D == 0, else 0
N = self._eval(x[1])
D = self._eval(x[2])
if D == 0: return 0
return 1 if N % D == 0 else 0
# --- CONDITIONAL (IF) ---
if op == 'if':
# (if condition then else)
condition = self._eval(x[1])
# Truthy: Non-zero integer, non-empty list
if condition and condition != 0:
return self._eval(x[2])
else:
if len(x) > 3:
return self._eval(x[3])
return 0
# --- CONDITIONAL (COND) ---
if op == 'cond':
# (cond (pred1 result1) (pred2 result2) ...)
for clause in x[1:]:
if isinstance(clause, list) and len(clause) >= 2:
predicate = self._eval(clause[0])
if predicate and predicate != 0:
return self._eval(clause[1])
return 0 # No clause matched
# --- FUNCTION DEFINITION (DEFUN) ---
if op == 'defun':
# (defun name (arg1 arg2) body)
name = x[1]
params = x[2] # Should be a list of param names
body = x[3]
self.functions[name] = {'params': params, 'body': body}
return name # Return function name as confirmation
# --- FUNCTION CALL ---
# Check if op is a defined function
if op in self.functions:
func = self.functions[op]
params = func['params']
body = func['body']
# Bind arguments to parameters in memory
old_memory = self.memory.copy() # Save state for immutability
for i, param in enumerate(params):
if i + 1 < len(x):
self.memory[param] = self._eval(x[i + 1])
result = self._eval(body)
# Restore memory (Immutable Style)
self.memory = old_memory
return result
# --- ARITHMETIC HELPERS ---
if op == 'add':
return self._eval(x[1]) + self._eval(x[2])
if op == 'sub':
return self._eval(x[1]) - self._eval(x[2])
if op == 'eq?':
return 1 if self._eval(x[1]) == self._eval(x[2]) else 0
if op == 'gt?':
return 1 if self._eval(x[1]) > self._eval(x[2]) else 0
if op == 'lt?':
return 1 if self._eval(x[1]) < self._eval(x[2]) else 0
# --- FALLBACK ---
return [self._eval(exp) for exp in x]
# Helper
def specs_clean(tokens):
return [t for t in tokens if t != '']