ApplePiesFromScratch's picture
Upload folder using huggingface_hub
b5d4048 verified
"""
engine.py β€” Propagation Logic Inference Engine
===============================================
P / G β†’ Q
The inference procedure itself, not a description of it.
Given a carrier V and gradient family Ξ“, this engine:
1. Checks closure
2. Finds fixed points
3. Detects involutions and cycle structure
4. Derives forced boundary conditions
The claim: classical logic, fuzzy logic, arithmetic, grammar β€”
all fall out of different (V, Ξ“) choices.
The structure is DERIVED, not assumed.
A standard LLM pattern-matches to training examples.
This engine applies the procedure to carriers it has never seen.
"""
from __future__ import annotations
from typing import Any, Dict, List, Optional, Set, Tuple
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from pl.core import (
Pattern, Gradient, Context, PropagationChain,
seed, G_neg, G_id, G_custom,
G_fuzzy_neg, G_succ, G_pred, G_halve, G_sqrt,
G_mod, KNOWN_SYSTEMS,
)
# =============================================================================
# ENGINE
# =============================================================================
class PropagationEngine:
"""
The mechanism as inference procedure.
Usage:
engine = PropagationEngine(
carrier = {0, 1},
gradients = [G_neg(), G_id()],
theta = 1.0,
name = "classical_logic",
)
print(engine.report())
"""
def __init__(
self,
carrier: Set,
gradients: List[Gradient],
theta: float = 1.0,
name: str = "engine",
):
try:
self.V = sorted(carrier, key=str)
except TypeError:
self.V = list(carrier)
self.V_set = set(carrier)
self.gradients = gradients
self.theta = theta
self.context = Context(gradients, theta)
self.name = name
# ── Core ──────────────────────────────────────────────────────────────────
def propagate(self, p: Pattern, g: Gradient) -> Pattern:
"""P / G β†’ Q"""
return g.propagate(p)
# ── Analysis ──────────────────────────────────────────────────────────────
def check_closure(self) -> Dict:
results = {}
for g in self.gradients:
is_closed, violations = g.is_closed_on(self.V_set)
extension = {out for _, out in violations}
results[g.name] = {
"closed": is_closed,
"violations": violations,
"extension_required": extension,
}
return results
def find_fixed_points(self) -> Dict:
return {g.name: g.fixed_points(self.V_set) for g in self.gradients}
def find_cycles(self) -> Dict:
results = {}
for g in self.gradients:
g_cycles = {}
for v in self.V_set:
g_cycles[v] = g.cycle_length(v)
results[g.name] = g_cycles
return results
def check_involution(self) -> Dict:
results = {}
for g in self.gradients:
counterexamples = []
for v in self.V_set:
try:
v1 = g.transform(v)
except (ValueError, KeyError):
counterexamples.append((v, "CARRIER_EXIT", "CARRIER_EXIT"))
continue
try:
v2 = g.transform(v1)
except (ValueError, KeyError):
# v1 is outside V β€” orbit escaped, not an involution
counterexamples.append((v, v1, "CARRIER_EXIT"))
continue
if v2 != v:
counterexamples.append((v, v1, v2))
results[g.name] = {
"is_involution": len(counterexamples) == 0,
"counterexamples": counterexamples,
}
return results
def compute_propagation_rates(self) -> Dict:
"""
Theorem 2.1: Among incoherent patterns, rate ∝ 1/L_P.
Lower load = higher propagation rate.
"""
rates = {}
for g in self.gradients:
g_rates = {}
for v in self.V_set:
for L in [0.0, 1.0, 2.0, 5.0]:
p = Pattern(v=v, L=L)
demand = self.context.demand(p)
rate = float("inf") if demand == 0.0 else (1.0 / L if L > 0 else float("inf"))
g_rates[f"v={v!r},L={L}"] = {
"demand": round(demand, 4),
"rate": rate,
"coherent": demand == 0.0,
}
rates[g.name] = g_rates
return rates
def derive_forced_conditions(self) -> Dict:
"""
The boundary condition extrapolation procedure.
Run all analyses. Derive what this (V, Ξ“) forces.
"""
closure = self.check_closure()
fps = self.find_fixed_points()
cycles = self.find_cycles()
involutions = self.check_involution()
forced = []
# ── Closure ───────────────────────────────────────────────────────────
all_closed = all(r["closed"] for r in closure.values())
if all_closed:
forced.append((
"CLOSURE",
f"The carrier V={set(self.V)} is closed under all "
f"{len(self.gradients)} gradient(s). "
f"Propagation stays within V. The carrier is self-consistent."
))
else:
for g_name, r in closure.items():
if not r["closed"]:
forced.append((
"CLOSURE_VIOLATION",
f"G[{g_name}] is NOT closed on V={set(self.V)}. "
f"Violations: {r['violations'][:3]}. "
f"The carrier MUST extend to include {r['extension_required']}. "
f"This extension is forced, not chosen."
))
# ── Fixed points ──────────────────────────────────────────────────────
for g_name, fp_list in fps.items():
if not fp_list:
forced.append((
"NO_FIXED_POINTS",
f"G[{g_name}] has no fixed points on V={set(self.V)}. "
f"Nothing survives this gradient unchanged. "
f"All elements are in motion under G[{g_name}]."
))
else:
forced.append((
"FIXED_POINTS",
f"G[{g_name}] fixes {fp_list}. "
f"These are stable attractors β€” propagation leaves them unchanged. "
f"They are the invariants of this gradient family."
))
# ── Involutions ───────────────────────────────────────────────────────
for g_name, inv in involutions.items():
if inv["is_involution"]:
forced.append((
"INVOLUTION",
f"G[{g_name}] is an involution: applying it twice returns to origin. "
f"Double-application law holds. This is DERIVED from the carrier, "
f"not assumed as an axiom."
))
elif inv["counterexamples"]:
forced.append((
"NOT_INVOLUTION",
f"G[{g_name}] is NOT an involution. "
f"Double-application does not return to origin: "
f"{inv['counterexamples'][:2]}."
))
# ── Cycle structure ───────────────────────────────────────────────────
for g_name, cyc in cycles.items():
lengths = set(v for v in cyc.values() if v is not None)
if not lengths:
pass
elif lengths == {1}:
forced.append((
"IDENTITY_GRADIENT",
f"G[{g_name}] is the identity on V: every element is a fixed point. "
f"This gradient changes nothing β€” zero cost, zero transformation."
))
elif len(lengths) == 1:
k = next(iter(lengths))
forced.append((
f"UNIFORM_{k}_CYCLE",
f"G[{g_name}] is a uniform {k}-cycle on V. "
f"Every element has orbit length {k}. "
f"Applying G[{g_name}] exactly {k} times returns to origin. "
f"The carrier has uniform periodic structure."
))
else:
forced.append((
"MIXED_CYCLE_STRUCTURE",
f"G[{g_name}] has mixed cycle structure: {dict(cyc)}. "
f"Different elements have different orbit lengths. "
f"The carrier has internal asymmetry."
))
# ── Logic identification ───────────────────────────────────────────────
V_set = set(self.V)
if V_set == {0, 1}:
neg_invol = involutions.get("neg", involutions.get("fuzzy_neg", {
"is_involution": False
}))["is_involution"]
neg_fps = fps.get("neg", fps.get("fuzzy_neg", [None]))
if neg_invol and neg_fps == []:
forced.append((
"CLASSICAL_LOGIC_FORCED",
"V={0,1} + involutory negation with no fixed points = "
"CLASSICAL LOGIC. "
"Excluded middle holds: every element is 0 or 1, no middle. "
"This is not an axiom system. It is the forced boundary condition "
"of the two-element carrier with this gradient family."
))
numeric_V = all(isinstance(v, (int, float)) for v in V_set)
if numeric_V and len(V_set) >= 3:
for g_name, fp_list in fps.items():
middle = [v for v in fp_list
if v not in {0, 0.0, 1, 1.0}]
if middle:
forced.append((
"MANY_VALUED_LOGIC_FORCED",
f"V={V_set} + G[{g_name}] fixing middle values {middle}: "
f"MANY-VALUED LOGIC is forced. "
f"Excluded middle fails at {middle}. "
f"These values are neither fully designated nor undesignated. "
f"Their existence in V is what forces the failure."
))
# ── Theorem 2.1 ───────────────────────────────────────────────────────
forced.append((
"THEOREM_2.1",
"Propagation rate theorem: among incoherent patterns (demand > 0), "
"rate ∝ 1/L. Simpler patterns propagate faster. "
"This holds on this carrier as on all carriers. "
"Zipf's law, natural selection, the exponential fixed point β€” one theorem."
))
return {
"name": self.name,
"carrier": list(self.V),
"gradients": [g.name for g in self.gradients],
"theta": self.theta,
"forced_conditions": forced,
"closure": closure,
"fixed_points": fps,
"cycles": cycles,
"involutions": involutions,
}
def report(self) -> str:
"""Full derivation as human-readable report."""
fc = self.derive_forced_conditions()
lines = [
"",
"=" * 62,
f" PROPAGATION ENGINE: {self.name.upper()}",
"=" * 62,
f" Carrier V : {set(fc['carrier'])}",
f" Gradients Ξ“ : {fc['gradients']}",
f" Threshold ΞΈ : {fc['theta']}",
"=" * 62,
"",
" DERIVED BOUNDARY CONDITIONS",
" (not assumed β€” forced by V and Ξ“)",
"",
]
for i, (tag, condition) in enumerate(fc["forced_conditions"], 1):
lines.append(f" [{i}] {tag}")
words = condition.split()
line = " "
for word in words:
if len(line) + len(word) + 1 > 60:
lines.append(line)
line = " " + word + " "
else:
line += word + " "
lines.append(line.rstrip())
lines.append("")
lines += [
"=" * 62,
" These conditions were not assumed.",
" They were derived by running P / G β†’ Q.",
"=" * 62,
"",
]
return "\n".join(lines)
def as_training_text(self) -> str:
"""
Render derivation as training data.
This is what the LM learns β€” the procedure, not descriptions of it.
"""
fc = self.derive_forced_conditions()
lines = [
f"DOMAIN: {self.name}",
f"CARRIER: {sorted(set(fc['carrier']), key=str)}",
f"GRADIENTS: {fc['gradients']}",
f"THETA: {fc['theta']}",
"---",
]
# Fixed points
for g_name, fp_list in fc["fixed_points"].items():
lines.append(f"FIXED_POINTS[{g_name}]: {fp_list}")
# Closure
for g_name, r in fc["closure"].items():
if r["closed"]:
lines.append(f"CLOSURE[{g_name}]: HOLDS")
else:
lines.append(f"CLOSURE[{g_name}]: VIOLATED β†’ extend to include {r['extension_required']}")
# Involutions
for g_name, inv in fc["involutions"].items():
tag = "YES" if inv["is_involution"] else "NO"
lines.append(f"INVOLUTION[{g_name}]: {tag}")
# Cycles
for g_name, cyc in fc["cycles"].items():
lengths = set(v for v in cyc.values() if v is not None)
if lengths:
k = next(iter(lengths)) if len(lengths) == 1 else "mixed"
lines.append(f"CYCLE[{g_name}]: length={k}")
# Forced conditions
lines.append("FORCED:")
for tag, _ in fc["forced_conditions"]:
lines.append(f" {tag}")
lines.append("END")
return "\n".join(lines)
# =============================================================================
# FACTORY β€” build engines from KNOWN_SYSTEMS
# =============================================================================
def engine_from_system(system_name: str) -> PropagationEngine:
"""Build an engine from a registered system in KNOWN_SYSTEMS."""
if system_name not in KNOWN_SYSTEMS:
available = list(KNOWN_SYSTEMS.keys())
raise ValueError(f"Unknown system {system_name!r}. Available: {available}")
sys_def = KNOWN_SYSTEMS[system_name]
return PropagationEngine(
carrier=sys_def["carrier"],
gradients=sys_def["gradients"](),
name=system_name,
)
# =============================================================================
# DEMONSTRATION
# =============================================================================
if __name__ == "__main__":
print("\n" + "="*62)
print(" PROPAGATION LOGIC INFERENCE ENGINE")
print(" P / G β†’ Q")
print("="*62)
# ── 1. Classical logic ─────────────────────────────────────────────────
e1 = engine_from_system("classical_logic")
print(e1.report())
# ── 2. Three-valued logic ──────────────────────────────────────────────
e2 = engine_from_system("three_valued_logic")
print(e2.report())
# ── 3. Novel carrier: colors β€” never seen in training ──────────────────
print("="*62)
print(" NOVEL CARRIER: {red, green, blue}")
print(" This carrier was NEVER in any training data.")
print(" The engine derives its boundary conditions from scratch.")
print("="*62)
e3 = PropagationEngine(
carrier={"red", "green", "blue"},
gradients=[
G_custom("complement", {"red": "green", "green": "blue", "blue": "red"}),
G_id(),
],
theta=1.0,
name="color_carrier",
)
print(e3.report())
# ── 4. Forced extension: β„• β†’ β„€ ────────────────────────────────────────
print("="*62)
print(" FORCED EXTENSION: V={0,1,2,3} + predecessor")
print(" Demonstrates how β„• β†’ β„€ is forced by closure violation.")
print("="*62)
from pl.core import G_pred
e4 = PropagationEngine(
carrier={0, 1, 2, 3},
gradients=[G_pred(), G_id()],
theta=1.0,
name="N_closure_violation",
)
print(e4.report())
# ── 5. Modular arithmetic ──────────────────────────────────────────────
e5 = engine_from_system("Z4")
print(e5.report())
print("\n Training text format (for the LM):")
print("-" * 40)
print(e1.as_training_text())