| """ |
| engine.py β Propagation Logic Inference Engine |
| =============================================== |
| |
| P / G β Q |
| |
| The inference procedure itself, not a description of it. |
| |
| Given a carrier V and gradient family Ξ, this engine: |
| 1. Checks closure |
| 2. Finds fixed points |
| 3. Detects involutions and cycle structure |
| 4. Derives forced boundary conditions |
| |
| The claim: classical logic, fuzzy logic, arithmetic, grammar β |
| all fall out of different (V, Ξ) choices. |
| The structure is DERIVED, not assumed. |
| |
| A standard LLM pattern-matches to training examples. |
| This engine applies the procedure to carriers it has never seen. |
| """ |
|
|
| from __future__ import annotations |
| from typing import Any, Dict, List, Optional, Set, Tuple |
| import sys |
| import os |
| sys.path.insert(0, os.path.dirname(__file__)) |
|
|
| from pl.core import ( |
| Pattern, Gradient, Context, PropagationChain, |
| seed, G_neg, G_id, G_custom, |
| G_fuzzy_neg, G_succ, G_pred, G_halve, G_sqrt, |
| G_mod, KNOWN_SYSTEMS, |
| ) |
|
|
|
|
| |
| |
| |
|
|
| class PropagationEngine: |
| """ |
| The mechanism as inference procedure. |
| |
| Usage: |
| engine = PropagationEngine( |
| carrier = {0, 1}, |
| gradients = [G_neg(), G_id()], |
| theta = 1.0, |
| name = "classical_logic", |
| ) |
| print(engine.report()) |
| """ |
|
|
| def __init__( |
| self, |
| carrier: Set, |
| gradients: List[Gradient], |
| theta: float = 1.0, |
| name: str = "engine", |
| ): |
| try: |
| self.V = sorted(carrier, key=str) |
| except TypeError: |
| self.V = list(carrier) |
| self.V_set = set(carrier) |
| self.gradients = gradients |
| self.theta = theta |
| self.context = Context(gradients, theta) |
| self.name = name |
|
|
| |
|
|
| def propagate(self, p: Pattern, g: Gradient) -> Pattern: |
| """P / G β Q""" |
| return g.propagate(p) |
|
|
| |
|
|
| def check_closure(self) -> Dict: |
| results = {} |
| for g in self.gradients: |
| is_closed, violations = g.is_closed_on(self.V_set) |
| extension = {out for _, out in violations} |
| results[g.name] = { |
| "closed": is_closed, |
| "violations": violations, |
| "extension_required": extension, |
| } |
| return results |
|
|
| def find_fixed_points(self) -> Dict: |
| return {g.name: g.fixed_points(self.V_set) for g in self.gradients} |
|
|
| def find_cycles(self) -> Dict: |
| results = {} |
| for g in self.gradients: |
| g_cycles = {} |
| for v in self.V_set: |
| g_cycles[v] = g.cycle_length(v) |
| results[g.name] = g_cycles |
| return results |
|
|
| def check_involution(self) -> Dict: |
| results = {} |
| for g in self.gradients: |
| counterexamples = [] |
| for v in self.V_set: |
| try: |
| v1 = g.transform(v) |
| except (ValueError, KeyError): |
| counterexamples.append((v, "CARRIER_EXIT", "CARRIER_EXIT")) |
| continue |
| try: |
| v2 = g.transform(v1) |
| except (ValueError, KeyError): |
| |
| counterexamples.append((v, v1, "CARRIER_EXIT")) |
| continue |
| if v2 != v: |
| counterexamples.append((v, v1, v2)) |
| results[g.name] = { |
| "is_involution": len(counterexamples) == 0, |
| "counterexamples": counterexamples, |
| } |
| return results |
|
|
| def compute_propagation_rates(self) -> Dict: |
| """ |
| Theorem 2.1: Among incoherent patterns, rate β 1/L_P. |
| Lower load = higher propagation rate. |
| """ |
| rates = {} |
| for g in self.gradients: |
| g_rates = {} |
| for v in self.V_set: |
| for L in [0.0, 1.0, 2.0, 5.0]: |
| p = Pattern(v=v, L=L) |
| demand = self.context.demand(p) |
| rate = float("inf") if demand == 0.0 else (1.0 / L if L > 0 else float("inf")) |
| g_rates[f"v={v!r},L={L}"] = { |
| "demand": round(demand, 4), |
| "rate": rate, |
| "coherent": demand == 0.0, |
| } |
| rates[g.name] = g_rates |
| return rates |
|
|
| def derive_forced_conditions(self) -> Dict: |
| """ |
| The boundary condition extrapolation procedure. |
| Run all analyses. Derive what this (V, Ξ) forces. |
| """ |
| closure = self.check_closure() |
| fps = self.find_fixed_points() |
| cycles = self.find_cycles() |
| involutions = self.check_involution() |
|
|
| forced = [] |
|
|
| |
| all_closed = all(r["closed"] for r in closure.values()) |
| if all_closed: |
| forced.append(( |
| "CLOSURE", |
| f"The carrier V={set(self.V)} is closed under all " |
| f"{len(self.gradients)} gradient(s). " |
| f"Propagation stays within V. The carrier is self-consistent." |
| )) |
| else: |
| for g_name, r in closure.items(): |
| if not r["closed"]: |
| forced.append(( |
| "CLOSURE_VIOLATION", |
| f"G[{g_name}] is NOT closed on V={set(self.V)}. " |
| f"Violations: {r['violations'][:3]}. " |
| f"The carrier MUST extend to include {r['extension_required']}. " |
| f"This extension is forced, not chosen." |
| )) |
|
|
| |
| for g_name, fp_list in fps.items(): |
| if not fp_list: |
| forced.append(( |
| "NO_FIXED_POINTS", |
| f"G[{g_name}] has no fixed points on V={set(self.V)}. " |
| f"Nothing survives this gradient unchanged. " |
| f"All elements are in motion under G[{g_name}]." |
| )) |
| else: |
| forced.append(( |
| "FIXED_POINTS", |
| f"G[{g_name}] fixes {fp_list}. " |
| f"These are stable attractors β propagation leaves them unchanged. " |
| f"They are the invariants of this gradient family." |
| )) |
|
|
| |
| for g_name, inv in involutions.items(): |
| if inv["is_involution"]: |
| forced.append(( |
| "INVOLUTION", |
| f"G[{g_name}] is an involution: applying it twice returns to origin. " |
| f"Double-application law holds. This is DERIVED from the carrier, " |
| f"not assumed as an axiom." |
| )) |
| elif inv["counterexamples"]: |
| forced.append(( |
| "NOT_INVOLUTION", |
| f"G[{g_name}] is NOT an involution. " |
| f"Double-application does not return to origin: " |
| f"{inv['counterexamples'][:2]}." |
| )) |
|
|
| |
| for g_name, cyc in cycles.items(): |
| lengths = set(v for v in cyc.values() if v is not None) |
| if not lengths: |
| pass |
| elif lengths == {1}: |
| forced.append(( |
| "IDENTITY_GRADIENT", |
| f"G[{g_name}] is the identity on V: every element is a fixed point. " |
| f"This gradient changes nothing β zero cost, zero transformation." |
| )) |
| elif len(lengths) == 1: |
| k = next(iter(lengths)) |
| forced.append(( |
| f"UNIFORM_{k}_CYCLE", |
| f"G[{g_name}] is a uniform {k}-cycle on V. " |
| f"Every element has orbit length {k}. " |
| f"Applying G[{g_name}] exactly {k} times returns to origin. " |
| f"The carrier has uniform periodic structure." |
| )) |
| else: |
| forced.append(( |
| "MIXED_CYCLE_STRUCTURE", |
| f"G[{g_name}] has mixed cycle structure: {dict(cyc)}. " |
| f"Different elements have different orbit lengths. " |
| f"The carrier has internal asymmetry." |
| )) |
|
|
| |
| V_set = set(self.V) |
| if V_set == {0, 1}: |
| neg_invol = involutions.get("neg", involutions.get("fuzzy_neg", { |
| "is_involution": False |
| }))["is_involution"] |
| neg_fps = fps.get("neg", fps.get("fuzzy_neg", [None])) |
| if neg_invol and neg_fps == []: |
| forced.append(( |
| "CLASSICAL_LOGIC_FORCED", |
| "V={0,1} + involutory negation with no fixed points = " |
| "CLASSICAL LOGIC. " |
| "Excluded middle holds: every element is 0 or 1, no middle. " |
| "This is not an axiom system. It is the forced boundary condition " |
| "of the two-element carrier with this gradient family." |
| )) |
|
|
| numeric_V = all(isinstance(v, (int, float)) for v in V_set) |
| if numeric_V and len(V_set) >= 3: |
| for g_name, fp_list in fps.items(): |
| middle = [v for v in fp_list |
| if v not in {0, 0.0, 1, 1.0}] |
| if middle: |
| forced.append(( |
| "MANY_VALUED_LOGIC_FORCED", |
| f"V={V_set} + G[{g_name}] fixing middle values {middle}: " |
| f"MANY-VALUED LOGIC is forced. " |
| f"Excluded middle fails at {middle}. " |
| f"These values are neither fully designated nor undesignated. " |
| f"Their existence in V is what forces the failure." |
| )) |
|
|
| |
| forced.append(( |
| "THEOREM_2.1", |
| "Propagation rate theorem: among incoherent patterns (demand > 0), " |
| "rate β 1/L. Simpler patterns propagate faster. " |
| "This holds on this carrier as on all carriers. " |
| "Zipf's law, natural selection, the exponential fixed point β one theorem." |
| )) |
|
|
| return { |
| "name": self.name, |
| "carrier": list(self.V), |
| "gradients": [g.name for g in self.gradients], |
| "theta": self.theta, |
| "forced_conditions": forced, |
| "closure": closure, |
| "fixed_points": fps, |
| "cycles": cycles, |
| "involutions": involutions, |
| } |
|
|
| def report(self) -> str: |
| """Full derivation as human-readable report.""" |
| fc = self.derive_forced_conditions() |
|
|
| lines = [ |
| "", |
| "=" * 62, |
| f" PROPAGATION ENGINE: {self.name.upper()}", |
| "=" * 62, |
| f" Carrier V : {set(fc['carrier'])}", |
| f" Gradients Ξ : {fc['gradients']}", |
| f" Threshold ΞΈ : {fc['theta']}", |
| "=" * 62, |
| "", |
| " DERIVED BOUNDARY CONDITIONS", |
| " (not assumed β forced by V and Ξ)", |
| "", |
| ] |
|
|
| for i, (tag, condition) in enumerate(fc["forced_conditions"], 1): |
| lines.append(f" [{i}] {tag}") |
| words = condition.split() |
| line = " " |
| for word in words: |
| if len(line) + len(word) + 1 > 60: |
| lines.append(line) |
| line = " " + word + " " |
| else: |
| line += word + " " |
| lines.append(line.rstrip()) |
| lines.append("") |
|
|
| lines += [ |
| "=" * 62, |
| " These conditions were not assumed.", |
| " They were derived by running P / G β Q.", |
| "=" * 62, |
| "", |
| ] |
| return "\n".join(lines) |
|
|
| def as_training_text(self) -> str: |
| """ |
| Render derivation as training data. |
| This is what the LM learns β the procedure, not descriptions of it. |
| """ |
| fc = self.derive_forced_conditions() |
| lines = [ |
| f"DOMAIN: {self.name}", |
| f"CARRIER: {sorted(set(fc['carrier']), key=str)}", |
| f"GRADIENTS: {fc['gradients']}", |
| f"THETA: {fc['theta']}", |
| "---", |
| ] |
|
|
| |
| for g_name, fp_list in fc["fixed_points"].items(): |
| lines.append(f"FIXED_POINTS[{g_name}]: {fp_list}") |
|
|
| |
| for g_name, r in fc["closure"].items(): |
| if r["closed"]: |
| lines.append(f"CLOSURE[{g_name}]: HOLDS") |
| else: |
| lines.append(f"CLOSURE[{g_name}]: VIOLATED β extend to include {r['extension_required']}") |
|
|
| |
| for g_name, inv in fc["involutions"].items(): |
| tag = "YES" if inv["is_involution"] else "NO" |
| lines.append(f"INVOLUTION[{g_name}]: {tag}") |
|
|
| |
| for g_name, cyc in fc["cycles"].items(): |
| lengths = set(v for v in cyc.values() if v is not None) |
| if lengths: |
| k = next(iter(lengths)) if len(lengths) == 1 else "mixed" |
| lines.append(f"CYCLE[{g_name}]: length={k}") |
|
|
| |
| lines.append("FORCED:") |
| for tag, _ in fc["forced_conditions"]: |
| lines.append(f" {tag}") |
|
|
| lines.append("END") |
| return "\n".join(lines) |
|
|
|
|
| |
| |
| |
|
|
| def engine_from_system(system_name: str) -> PropagationEngine: |
| """Build an engine from a registered system in KNOWN_SYSTEMS.""" |
| if system_name not in KNOWN_SYSTEMS: |
| available = list(KNOWN_SYSTEMS.keys()) |
| raise ValueError(f"Unknown system {system_name!r}. Available: {available}") |
| sys_def = KNOWN_SYSTEMS[system_name] |
| return PropagationEngine( |
| carrier=sys_def["carrier"], |
| gradients=sys_def["gradients"](), |
| name=system_name, |
| ) |
|
|
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| print("\n" + "="*62) |
| print(" PROPAGATION LOGIC INFERENCE ENGINE") |
| print(" P / G β Q") |
| print("="*62) |
|
|
| |
| e1 = engine_from_system("classical_logic") |
| print(e1.report()) |
|
|
| |
| e2 = engine_from_system("three_valued_logic") |
| print(e2.report()) |
|
|
| |
| print("="*62) |
| print(" NOVEL CARRIER: {red, green, blue}") |
| print(" This carrier was NEVER in any training data.") |
| print(" The engine derives its boundary conditions from scratch.") |
| print("="*62) |
| e3 = PropagationEngine( |
| carrier={"red", "green", "blue"}, |
| gradients=[ |
| G_custom("complement", {"red": "green", "green": "blue", "blue": "red"}), |
| G_id(), |
| ], |
| theta=1.0, |
| name="color_carrier", |
| ) |
| print(e3.report()) |
|
|
| |
| print("="*62) |
| print(" FORCED EXTENSION: V={0,1,2,3} + predecessor") |
| print(" Demonstrates how β β β€ is forced by closure violation.") |
| print("="*62) |
| from pl.core import G_pred |
| e4 = PropagationEngine( |
| carrier={0, 1, 2, 3}, |
| gradients=[G_pred(), G_id()], |
| theta=1.0, |
| name="N_closure_violation", |
| ) |
| print(e4.report()) |
|
|
| |
| e5 = engine_from_system("Z4") |
| print(e5.report()) |
|
|
| print("\n Training text format (for the LM):") |
| print("-" * 40) |
| print(e1.as_training_text()) |
|
|