""" engine.py — Propagation Logic Inference Engine =============================================== P / G → Q The inference procedure itself, not a description of it. Given a carrier V and gradient family Γ, this engine: 1. Checks closure 2. Finds fixed points 3. Detects involutions and cycle structure 4. Derives forced boundary conditions The claim: classical logic, fuzzy logic, arithmetic, grammar — all fall out of different (V, Γ) choices. The structure is DERIVED, not assumed. A standard LLM pattern-matches to training examples. This engine applies the procedure to carriers it has never seen. """ from __future__ import annotations from typing import Any, Dict, List, Optional, Set, Tuple import sys import os sys.path.insert(0, os.path.dirname(__file__)) from pl.core import ( Pattern, Gradient, Context, PropagationChain, seed, G_neg, G_id, G_custom, G_fuzzy_neg, G_succ, G_pred, G_halve, G_sqrt, G_mod, KNOWN_SYSTEMS, ) # ============================================================================= # ENGINE # ============================================================================= class PropagationEngine: """ The mechanism as inference procedure. Usage: engine = PropagationEngine( carrier = {0, 1}, gradients = [G_neg(), G_id()], theta = 1.0, name = "classical_logic", ) print(engine.report()) """ def __init__( self, carrier: Set, gradients: List[Gradient], theta: float = 1.0, name: str = "engine", ): try: self.V = sorted(carrier, key=str) except TypeError: self.V = list(carrier) self.V_set = set(carrier) self.gradients = gradients self.theta = theta self.context = Context(gradients, theta) self.name = name # ── Core ────────────────────────────────────────────────────────────────── def propagate(self, p: Pattern, g: Gradient) -> Pattern: """P / G → Q""" return g.propagate(p) # ── Analysis ────────────────────────────────────────────────────────────── def check_closure(self) -> Dict: results = {} for g in self.gradients: is_closed, violations = g.is_closed_on(self.V_set) extension = {out for _, out in violations} results[g.name] = { "closed": is_closed, "violations": violations, "extension_required": extension, } return results def find_fixed_points(self) -> Dict: return {g.name: g.fixed_points(self.V_set) for g in self.gradients} def find_cycles(self) -> Dict: results = {} for g in self.gradients: g_cycles = {} for v in self.V_set: g_cycles[v] = g.cycle_length(v) results[g.name] = g_cycles return results def check_involution(self) -> Dict: results = {} for g in self.gradients: counterexamples = [] for v in self.V_set: try: v1 = g.transform(v) except (ValueError, KeyError): counterexamples.append((v, "CARRIER_EXIT", "CARRIER_EXIT")) continue try: v2 = g.transform(v1) except (ValueError, KeyError): # v1 is outside V — orbit escaped, not an involution counterexamples.append((v, v1, "CARRIER_EXIT")) continue if v2 != v: counterexamples.append((v, v1, v2)) results[g.name] = { "is_involution": len(counterexamples) == 0, "counterexamples": counterexamples, } return results def compute_propagation_rates(self) -> Dict: """ Theorem 2.1: Among incoherent patterns, rate ∝ 1/L_P. Lower load = higher propagation rate. """ rates = {} for g in self.gradients: g_rates = {} for v in self.V_set: for L in [0.0, 1.0, 2.0, 5.0]: p = Pattern(v=v, L=L) demand = self.context.demand(p) rate = float("inf") if demand == 0.0 else (1.0 / L if L > 0 else float("inf")) g_rates[f"v={v!r},L={L}"] = { "demand": round(demand, 4), "rate": rate, "coherent": demand == 0.0, } rates[g.name] = g_rates return rates def derive_forced_conditions(self) -> Dict: """ The boundary condition extrapolation procedure. Run all analyses. Derive what this (V, Γ) forces. """ closure = self.check_closure() fps = self.find_fixed_points() cycles = self.find_cycles() involutions = self.check_involution() forced = [] # ── Closure ─────────────────────────────────────────────────────────── all_closed = all(r["closed"] for r in closure.values()) if all_closed: forced.append(( "CLOSURE", f"The carrier V={set(self.V)} is closed under all " f"{len(self.gradients)} gradient(s). " f"Propagation stays within V. The carrier is self-consistent." )) else: for g_name, r in closure.items(): if not r["closed"]: forced.append(( "CLOSURE_VIOLATION", f"G[{g_name}] is NOT closed on V={set(self.V)}. " f"Violations: {r['violations'][:3]}. " f"The carrier MUST extend to include {r['extension_required']}. " f"This extension is forced, not chosen." )) # ── Fixed points ────────────────────────────────────────────────────── for g_name, fp_list in fps.items(): if not fp_list: forced.append(( "NO_FIXED_POINTS", f"G[{g_name}] has no fixed points on V={set(self.V)}. " f"Nothing survives this gradient unchanged. " f"All elements are in motion under G[{g_name}]." )) else: forced.append(( "FIXED_POINTS", f"G[{g_name}] fixes {fp_list}. " f"These are stable attractors — propagation leaves them unchanged. " f"They are the invariants of this gradient family." )) # ── Involutions ─────────────────────────────────────────────────────── for g_name, inv in involutions.items(): if inv["is_involution"]: forced.append(( "INVOLUTION", f"G[{g_name}] is an involution: applying it twice returns to origin. " f"Double-application law holds. This is DERIVED from the carrier, " f"not assumed as an axiom." )) elif inv["counterexamples"]: forced.append(( "NOT_INVOLUTION", f"G[{g_name}] is NOT an involution. " f"Double-application does not return to origin: " f"{inv['counterexamples'][:2]}." )) # ── Cycle structure ─────────────────────────────────────────────────── for g_name, cyc in cycles.items(): lengths = set(v for v in cyc.values() if v is not None) if not lengths: pass elif lengths == {1}: forced.append(( "IDENTITY_GRADIENT", f"G[{g_name}] is the identity on V: every element is a fixed point. " f"This gradient changes nothing — zero cost, zero transformation." )) elif len(lengths) == 1: k = next(iter(lengths)) forced.append(( f"UNIFORM_{k}_CYCLE", f"G[{g_name}] is a uniform {k}-cycle on V. " f"Every element has orbit length {k}. " f"Applying G[{g_name}] exactly {k} times returns to origin. " f"The carrier has uniform periodic structure." )) else: forced.append(( "MIXED_CYCLE_STRUCTURE", f"G[{g_name}] has mixed cycle structure: {dict(cyc)}. " f"Different elements have different orbit lengths. " f"The carrier has internal asymmetry." )) # ── Logic identification ─────────────────────────────────────────────── V_set = set(self.V) if V_set == {0, 1}: neg_invol = involutions.get("neg", involutions.get("fuzzy_neg", { "is_involution": False }))["is_involution"] neg_fps = fps.get("neg", fps.get("fuzzy_neg", [None])) if neg_invol and neg_fps == []: forced.append(( "CLASSICAL_LOGIC_FORCED", "V={0,1} + involutory negation with no fixed points = " "CLASSICAL LOGIC. " "Excluded middle holds: every element is 0 or 1, no middle. " "This is not an axiom system. It is the forced boundary condition " "of the two-element carrier with this gradient family." )) numeric_V = all(isinstance(v, (int, float)) for v in V_set) if numeric_V and len(V_set) >= 3: for g_name, fp_list in fps.items(): middle = [v for v in fp_list if v not in {0, 0.0, 1, 1.0}] if middle: forced.append(( "MANY_VALUED_LOGIC_FORCED", f"V={V_set} + G[{g_name}] fixing middle values {middle}: " f"MANY-VALUED LOGIC is forced. " f"Excluded middle fails at {middle}. " f"These values are neither fully designated nor undesignated. " f"Their existence in V is what forces the failure." )) # ── Theorem 2.1 ─────────────────────────────────────────────────────── forced.append(( "THEOREM_2.1", "Propagation rate theorem: among incoherent patterns (demand > 0), " "rate ∝ 1/L. Simpler patterns propagate faster. " "This holds on this carrier as on all carriers. " "Zipf's law, natural selection, the exponential fixed point — one theorem." )) return { "name": self.name, "carrier": list(self.V), "gradients": [g.name for g in self.gradients], "theta": self.theta, "forced_conditions": forced, "closure": closure, "fixed_points": fps, "cycles": cycles, "involutions": involutions, } def report(self) -> str: """Full derivation as human-readable report.""" fc = self.derive_forced_conditions() lines = [ "", "=" * 62, f" PROPAGATION ENGINE: {self.name.upper()}", "=" * 62, f" Carrier V : {set(fc['carrier'])}", f" Gradients Γ : {fc['gradients']}", f" Threshold θ : {fc['theta']}", "=" * 62, "", " DERIVED BOUNDARY CONDITIONS", " (not assumed — forced by V and Γ)", "", ] for i, (tag, condition) in enumerate(fc["forced_conditions"], 1): lines.append(f" [{i}] {tag}") words = condition.split() line = " " for word in words: if len(line) + len(word) + 1 > 60: lines.append(line) line = " " + word + " " else: line += word + " " lines.append(line.rstrip()) lines.append("") lines += [ "=" * 62, " These conditions were not assumed.", " They were derived by running P / G → Q.", "=" * 62, "", ] return "\n".join(lines) def as_training_text(self) -> str: """ Render derivation as training data. This is what the LM learns — the procedure, not descriptions of it. """ fc = self.derive_forced_conditions() lines = [ f"DOMAIN: {self.name}", f"CARRIER: {sorted(set(fc['carrier']), key=str)}", f"GRADIENTS: {fc['gradients']}", f"THETA: {fc['theta']}", "---", ] # Fixed points for g_name, fp_list in fc["fixed_points"].items(): lines.append(f"FIXED_POINTS[{g_name}]: {fp_list}") # Closure for g_name, r in fc["closure"].items(): if r["closed"]: lines.append(f"CLOSURE[{g_name}]: HOLDS") else: lines.append(f"CLOSURE[{g_name}]: VIOLATED → extend to include {r['extension_required']}") # Involutions for g_name, inv in fc["involutions"].items(): tag = "YES" if inv["is_involution"] else "NO" lines.append(f"INVOLUTION[{g_name}]: {tag}") # Cycles for g_name, cyc in fc["cycles"].items(): lengths = set(v for v in cyc.values() if v is not None) if lengths: k = next(iter(lengths)) if len(lengths) == 1 else "mixed" lines.append(f"CYCLE[{g_name}]: length={k}") # Forced conditions lines.append("FORCED:") for tag, _ in fc["forced_conditions"]: lines.append(f" {tag}") lines.append("END") return "\n".join(lines) # ============================================================================= # FACTORY — build engines from KNOWN_SYSTEMS # ============================================================================= def engine_from_system(system_name: str) -> PropagationEngine: """Build an engine from a registered system in KNOWN_SYSTEMS.""" if system_name not in KNOWN_SYSTEMS: available = list(KNOWN_SYSTEMS.keys()) raise ValueError(f"Unknown system {system_name!r}. Available: {available}") sys_def = KNOWN_SYSTEMS[system_name] return PropagationEngine( carrier=sys_def["carrier"], gradients=sys_def["gradients"](), name=system_name, ) # ============================================================================= # DEMONSTRATION # ============================================================================= if __name__ == "__main__": print("\n" + "="*62) print(" PROPAGATION LOGIC INFERENCE ENGINE") print(" P / G → Q") print("="*62) # ── 1. Classical logic ───────────────────────────────────────────────── e1 = engine_from_system("classical_logic") print(e1.report()) # ── 2. Three-valued logic ────────────────────────────────────────────── e2 = engine_from_system("three_valued_logic") print(e2.report()) # ── 3. Novel carrier: colors — never seen in training ────────────────── print("="*62) print(" NOVEL CARRIER: {red, green, blue}") print(" This carrier was NEVER in any training data.") print(" The engine derives its boundary conditions from scratch.") print("="*62) e3 = PropagationEngine( carrier={"red", "green", "blue"}, gradients=[ G_custom("complement", {"red": "green", "green": "blue", "blue": "red"}), G_id(), ], theta=1.0, name="color_carrier", ) print(e3.report()) # ── 4. Forced extension: ℕ → ℤ ──────────────────────────────────────── print("="*62) print(" FORCED EXTENSION: V={0,1,2,3} + predecessor") print(" Demonstrates how ℕ → ℤ is forced by closure violation.") print("="*62) from pl.core import G_pred e4 = PropagationEngine( carrier={0, 1, 2, 3}, gradients=[G_pred(), G_id()], theta=1.0, name="N_closure_violation", ) print(e4.report()) # ── 5. Modular arithmetic ────────────────────────────────────────────── e5 = engine_from_system("Z4") print(e5.report()) print("\n Training text format (for the LM):") print("-" * 40) print(e1.as_training_text())