""" practicality_axioms.py ====================== The complete, unabridged Axiom Deduction Engine. Houses all 28 structural axioms, constructors, the UCB1 Bandit, and the full decoupled SequenceRayTracer. """ import math import random import torch import itertools from typing import Dict, List, Tuple, Set, Optional, Any, Callable from dataclasses import dataclass, field from collections import defaultdict, deque import practicality_core as core @dataclass class Hypothesis: hid: str binding: Dict[str, float] h_type: str claim: str derivation: List[str] pinned_vars: Dict[str, float] free_vars: List[str] confidence: float ce: float = float('inf') is_fully_determined: bool = False @dataclass class L9Certificate: residual_ce: float; dominant_vars: List[str]; dominant_exprs: List[str]; tension_class: str = "unknown" @dataclass class Baton: binding: Dict[str, float]; ce: float; ray_id: str = ""; depth: int = 0; l9: Optional[L9Certificate] = None @dataclass class AxiomRay: sequence: List[str]; ray_id: str; ray_type: str = "seed" depth: int = 0; parent_id: Optional[str] = None baton: Optional[Baton] = None; ce_prior: float = float('inf') @property def name(self) -> str: return "→".join(a[:3] for a in self.sequence) def extend(self, axiom, rtype, new_prior=float('inf')): return AxiomRay(sequence=self.sequence+[axiom], ray_id=f"{self.ray_id}+{axiom[:3]}", ray_type=rtype, depth=self.depth+1, parent_id=self.ray_id, ce_prior=new_prior) @dataclass class HypothesisTrace: hid: str; round_idx: int; ray_name: str; ray_type: str ce_before: float; ce_after: float; survived: bool; elapsed_ms: float g1_pass: bool = False; g2_pass: bool = False binding: Dict[str, float] = field(default_factory=dict) invariant_results: Dict[str, Tuple[float, bool]] = field(default_factory=dict) tension_class: str = "unknown"; depth: int = 0 @dataclass class StructuralModel: best_binding: Dict[str, float]; best_ce: float; best_ray: str = "—" failure_patterns: Set[str] = field(default_factory=set) resonant_pairs: List[Tuple[str, str]] = field(default_factory=list) class Axiom: CONTINUOUS = "CONTINUOUS"; DISCRETE = "DISCRETE"; QUADRATIC = "QUADRATIC" BILINEAR = "BILINEAR"; METRIC = "METRIC"; SYMMETRIC = "SYMMETRIC" ORDERED = "ORDERED"; MONOTONE = "MONOTONE"; CONVEX = "CONVEX" MONOTONE_PRODUCT = "MONOTONE_PRODUCT"; CONSERVED = "CONSERVED"; MUTABLE = "MUTABLE" NETWORK = "NETWORK"; EQUILIBRIUM = "EQUILIBRIUM"; SUPERPOSITION = "SUPERPOSITION" LOCALITY = "LOCALITY"; EXTREMAL = "EXTREMAL"; ENTROPY = "ENTROPY" INJECTIVE = "INJECTIVE"; SURJECTIVE = "SURJECTIVE"; TRANSITIVE = "TRANSITIVE" ATOMIC = "ATOMIC"; IMPLICATION = "IMPLICATION"; NEGATION = "NEGATION" COMPOSITE = "COMPOSITE"; HOLISM = "HOLISM"; PARSIMONY = "PARSIMONY"; DUALITY = "DUALITY" ALL_AXIOMS = [getattr(Axiom, a) for a in dir(Axiom) if not a.startswith("__")] # ══════════════════════════════════════════════════════════════════════ # AXIOM HYPOTHESIS CONSTRUCTORS # ══════════════════════════════════════════════════════════════════════ def _make_hyp(hid, binding, h_type, claim, derivation, pinned, free, conf, is_fd=False): return Hypothesis(hid=hid, binding=binding, h_type=h_type, claim=claim, derivation=derivation, pinned_vars=pinned, free_vars=free, confidence=conf, is_fully_determined=is_fd) def _hyp_continuous(p, bt, a, m): return [_make_hyp("cnt", bt.binding, "continuous", "Manifold", [], {}, p.variables, 0.45)] def _hyp_discrete(p, bt, a, m): b = dict(bt.binding); pinned = {} box = {v: core.IV(b.get(v, 0)-max(0.05*(p.bounds[v][1]-p.bounds[v][0]), 1e-4), b.get(v, 0)+max(0.05*(p.bounds[v][1]-p.bounds[v][0]), 1e-4)) for v in p.variables if v in p.bounds} cont = core._hc4(box, p.compiled_constraints) if cont: for v, iv in cont.items(): b[v] = iv.mid() if iv.width() < 1e-2: pinned[v] = b[v] return [_make_hyp("dis", b, "discrete", "TopoScope", [], pinned, p.variables, 0.80)] def _hyp_quadratic(p, bt, a, m): hyps = []; b = dict(bt.binding) if bt.l9 and bt.l9.dominant_vars: for v in bt.l9.dominant_vars[:2]: hyps.append(_make_hyp(f"qua_{v}", b, "quadratic", f"Root({v})", [], {v: b[v]}, p.variables, 0.70)) return hyps if hyps else [_make_hyp("qua", b, "quadratic", "Root", [], {}, p.variables, 0.55)] def _hyp_bilinear(p, bt, a, m): hyps = []; b = dict(bt.binding) for vi, vj in p.bilinear_pairs: lo_i, hi_i = p.bounds.get(vi, (0, 10)); lo_j, hi_j = p.bounds.get(vj, (0, 10)) product = abs(b.get(vi, 1)*b.get(vj, 1)) if product <= 0: continue gm = math.sqrt(product) if lo_i <= gm <= hi_i and lo_j <= gm <= hi_j: nb = dict(b); nb[vi] = nb[vj] = gm hyps.append(_make_hyp(f"bil_eq_{vi}_{vj}", nb, "bilinear", "GeoMean", ["bilinear"], {vi: gm, vj: gm}, [v for v in p.variables if v not in [vi, vj]], 0.70)) return hyps def _hyp_monotone_product(p, bt, a, m): hyps = []; b = dict(bt.binding) if not p.monotone_targets: return hyps try: tb = core._global_hc4_tighten_bounds(p) except: tb = dict(p.bounds) for v_small, v_large, k in p.monotone_targets: lo_s, hi_s = p.bounds.get(v_small, (-1e9, 1e9)); lo_l, hi_l = p.bounds.get(v_large, (-1e9, 1e9)) for ratio in [0.25, 0.5, 0.707, 0.9]: vs_sq = k*ratio if vs_sq <= 0: continue vs = math.sqrt(vs_sq); vl = k/vs if not(lo_s <= vs <= hi_s and lo_l <= vl <= hi_l and vs <= vl): continue nb = dict(b); nb[v_small] = vs; nb[v_large] = vl box = {u: core.IV(*tb.get(u, p.bounds.get(u, (-10, 10)))) for u in p.variables} box[v_small] = core.IV(vs-1e-6, vs+1e-6); box[v_large] = core.IV(vl-1e-6, vl+1e-6) cont = core._hc4(box, p.compiled_constraints); pinned = {v_small: vs, v_large: vl} if cont: for u, iv in cont.items(): if u in p.bounds: nb[u] = iv.mid() pinned = {u: nb[u] for u in p.variables if cont[u].width() < 1e-2} hyps.append(_make_hyp(f"mpr_{v_small}_{v_large}_r{int(ratio*100)}", nb, "monotone_product", "MonoProd", ["ordered", "hc4"], pinned, [u for u in p.variables if u not in pinned], 0.88)) return hyps def _hyp_metric(p, bt, a, m): hyps = []; b = dict(bt.binding) for mc in p.compiled_constraints: if mc.kind != "equality" or not mc.parsed or not mc.parsed.is_Add: continue sq_terms = [t for t in mc.parsed.args if t.is_Pow and t.args[1] == 2] if len(sq_terms) < 1: continue vars_in = [str(s) for s in mc.parsed.free_symbols if str(s) in p.variables] if not vars_in: continue const_terms = [float(t) for t in mc.parsed.args if t.is_Number] if not const_terms: continue target = -const_terms[0] if target <= 0: continue curr_val = sum(b.get(v, 0)**2 for v in vars_in) if curr_val < 1e-8: continue scale = math.sqrt(target/curr_val); nb = dict(b) for v in vars_in: lo, hi = p.bounds.get(v, (-10, 10)); nb[v] = max(lo, min(hi, b.get(v, 0)*scale)) hyps.append(_make_hyp(f"met_{hash(mc.expr_str)%9999}", nb, "metric", "RadialProject", ["metric", mc.expr_str], {}, list(p.variables), 0.75)) return hyps def _hyp_symmetric(p, bt, a, m): hyps = []; b = dict(bt.binding) vl = p.variables for i in range(min(len(vl), 8)): for j in range(i+1, min(len(vl), 8)): vi, vj = vl[i], vl[j] lo_i, hi_i = p.bounds.get(vi, (-1e9, 1e9)); lo_j, hi_j = p.bounds.get(vj, (-1e9, 1e9)) bvi, bvj = b.get(vi, 0), b.get(vj, 0) if lo_i <= bvj <= hi_i and lo_j <= bvi <= hi_j: nb = dict(b); nb[vi], nb[vj] = bvj, bvi hyps.append(_make_hyp(f"sym_{vi}_{vj}", nb, "symmetric", "Swap", ["symmetric"], {vi: nb[vi], vj: nb[vj]}, [v for v in p.variables if v not in [vi, vj]], 0.60)) return hyps[:5] def _hyp_ordered(p, bt, a, m): return [_make_hyp("ord", bt.binding, "ordered", "OrderBal", [], {}, p.variables, 0.67)] def _hyp_monotone(p, bt, a, m): return [_make_hyp("mon", bt.binding, "monotone", "MonoPath", [], {}, p.variables, 0.67)] def _hyp_convex(p, bt, a, m): return [_make_hyp("cvx", bt.binding, "convex", "ConvMix", [], {}, p.variables, 0.63)] def _hyp_conserved(p, bt, a, m): return [_make_hyp("csv", bt.binding, "conserved", "Conserve", [], {}, p.variables, 0.82)] def _hyp_mutable(p, bt, a, m): b = dict(bt.binding); pinned = {} if bt.l9 and bt.l9.dominant_vars: v = bt.l9.dominant_vars[0]; lo, hi = p.bounds.get(v, (-10, 10)) b[v] = max(lo, min(hi, b.get(v, 0)+random.gauss(0, (hi-lo)*0.05))); pinned = {v: b[v]} return [_make_hyp("mut", b, "mutable", "Perturb", [], pinned, p.variables, 0.40)] def _hyp_network(p, bt, a, m): return [_make_hyp("net", bt.binding, "network", "HubProp", [], {}, p.variables, 0.65)] def _hyp_equilibrium(p, bt, a, m): return [_make_hyp("eql", bt.binding, "equilibrium", "GradBal", [], {}, p.variables, 0.72)] def _hyp_superposition(p, bt, a, m): if a and len(a)>0: other=random.choice(a); lam=0.5 nb={v:lam*bt.binding.get(v,0)+(1-lam)*other.binding.get(v,0) for v in p.variables} return [_make_hyp("sup",nb,"superposition","Superpose",[],{},p.variables,0.58)] return [] def _hyp_locality(p, bt, a, m): return [_make_hyp("loc", bt.binding, "locality", "LocalFirst", [], {}, p.variables, 0.72)] def _hyp_extremal(p, bt, a, m): hyps = []; b = dict(bt.binding) if bt.l9 and bt.l9.dominant_vars: v = bt.l9.dominant_vars[0]; lo, hi = p.bounds.get(v, (0, 1)) for val in [lo, hi]: nb = dict(b); nb[v] = val; hyps.append(_make_hyp(f"ext_{v}_{val:.3f}", nb, "extremal", "PinBound", [], {v: val}, p.variables, 0.65)) return hyps def _hyp_entropy(p, bt, a, m): return [_make_hyp("ent", {v: random.uniform(*p.bounds[v]) for v in p.variables}, "entropy", "MaxEnt", [], {}, p.variables, 0.48)] def _hyp_injective(p, bt, a, m): return [_make_hyp("inj", bt.binding, "injective", "Distinct", [], {}, p.variables, 0.50)] def _hyp_surjective(p, bt, a, m): return [_make_hyp("sur", bt.binding, "surjective", "Sweep", [], {}, p.variables, 0.45)] def _hyp_transitive(p, bt, a, m): return [_make_hyp("tra", bt.binding, "transitive", "TransSnap", [], {}, p.variables, 0.77)] def _hyp_atomic(p, bt, a, m): hyps = []; b = dict(bt.binding) for mc in p.compiled_constraints: if not mc.projections: continue for v, projs in mc.projections.items(): for proj in projs: try: val = float(proj["func"](*[b.get(s, 0.0) for s in proj["syms"]])); lo, hi = p.bounds.get(v, (-1e9, 1e9)) if math.isfinite(val) and lo <= val <= hi: nb = dict(b); nb[v] = val; hyps.append(_make_hyp(f"ato_{v}", nb, "atomic", f"Solve({v})", [], {v: val}, p.variables, 0.70)); break except: pass if hyps: break return hyps def _hyp_implication(p, bt, a, m): return [_make_hyp("imp", bt.binding, "implication", "Imply", [], {}, p.variables, 0.78)] def _hyp_negation(p, bt, a, m): nb={v:max(p.bounds[v][0],min(p.bounds[v][1],(p.bounds[v][0]+p.bounds[v][1])-bt.binding.get(v,(p.bounds[v][0]+p.bounds[v][1])/2))) for v in p.variables} return [_make_hyp("neg",nb,"negation","Mirror",[],{},p.variables,0.42)] def _hyp_composite(p, bt, a, m): return [_make_hyp("cmp", bt.binding, "composite", "Enz", [], {}, p.variables, 0.90)] def _hyp_holism(p, bt, a, m): return [_make_hyp("hol", bt.binding, "holism", "Global", [], {}, p.variables, 0.73)] def _hyp_parsimony(p, bt, a, m): b = dict(bt.binding); nb = {} max_val = max((abs(v) for v in b.values() if math.isfinite(v)), default=1.0) for v in p.variables: val = b[v]; lo, hi = p.bounds[v] if not math.isfinite(val): nb[v] = 0.0; continue if abs(val) < 0.1 * max_val and lo <= 0.0 <= hi: nb[v] = 0.0 else: candidates = [round(val*m)/m for m in [1, 2, 4] if lo <= round(val*m)/m <= hi] nb[v] = min(candidates, key=lambda c: abs(c-val)) if candidates else val return [_make_hyp("par", nb, "parsimony", "Occam", [], {}, p.variables, 0.58)] def _hyp_duality(p, bt, a, m): return [_make_hyp("dua", bt.binding, "duality", "Tight", [], {}, p.variables, 0.68)] AXIOM_CONSTRUCTORS = { Axiom.CONTINUOUS: _hyp_continuous, Axiom.DISCRETE: _hyp_discrete, Axiom.QUADRATIC: _hyp_quadratic, Axiom.BILINEAR: _hyp_bilinear, Axiom.METRIC: _hyp_metric, Axiom.SYMMETRIC: _hyp_symmetric, Axiom.ORDERED: _hyp_ordered, Axiom.MONOTONE: _hyp_monotone, Axiom.CONVEX: _hyp_convex, Axiom.CONSERVED: _hyp_conserved, Axiom.MUTABLE: _hyp_mutable, Axiom.NETWORK: _hyp_network, Axiom.EQUILIBRIUM: _hyp_equilibrium, Axiom.SUPERPOSITION: _hyp_superposition, Axiom.LOCALITY: _hyp_locality, Axiom.EXTREMAL: _hyp_extremal, Axiom.ENTROPY: _hyp_entropy, Axiom.INJECTIVE: _hyp_injective, Axiom.SURJECTIVE: _hyp_surjective, Axiom.TRANSITIVE: _hyp_transitive, Axiom.ATOMIC: _hyp_atomic, Axiom.IMPLICATION: _hyp_implication, Axiom.NEGATION: _hyp_negation, Axiom.COMPOSITE: _hyp_composite, Axiom.HOLISM: _hyp_holism, Axiom.PARSIMONY: _hyp_parsimony, Axiom.DUALITY: _hyp_duality, Axiom.MONOTONE_PRODUCT: _hyp_monotone_product } # ══════════════════════════════════════════════════════════════════════ # SEED & TEMPLATE SYSTEM # ══════════════════════════════════════════════════════════════════════ def compute_sketch(problem: core.Problem) -> Dict[str, Any]: notes = [] affinities = {a: 1.0 for a in ALL_AXIOMS} if problem.bilinear_pairs: affinities[Axiom.BILINEAR] += 4.0 notes.append(f"Found bilinear pairs: {problem.bilinear_pairs}") if problem.monotone_targets: affinities[Axiom.MONOTONE_PRODUCT] += 5.0 notes.append("Boosted Monotone Product based on algebraic layout.") if any(mc.kind == "equality" and mc.parsed is not None and mc.parsed.is_Add for mc in problem.compiled_constraints): affinities[Axiom.METRIC] += 3.0 notes.append("System contains sum-of-squares style constraints.") return {"notes": notes, "affinities": affinities} def generate_templates(problem: core.Problem, sketch: Dict[str, Any]) -> List[Hypothesis]: templates = [] # 1. INT Grid Sweep & Algebraic Propagation tight_int_vars = [v for v in problem.variables if v in problem.int_vars and math.isfinite(problem.bounds[v][0]) and problem.bounds[v][1] - problem.bounds[v][0] <= 100] if tight_int_vars: grids = [] for v in tight_int_vars: lo = int(math.ceil(problem.bounds[v][0])) hi = int(math.floor(problem.bounds[v][1])) grids.append([(v, float(val)) for val in range(lo, hi+1)]) combos = list(itertools.product(*grids))[:1000] # Cap search width for combo in combos: pinned = dict(combo) resolved, log = core.algebraic_propagate_pinned(problem, pinned) templates.append( Hypothesis( hid=f"grid_{hash(str(combo))%100000}", binding=resolved, h_type="int_grid", claim="Integer Grid Sweep", derivation=log, pinned_vars=pinned, free_vars=[v for v in problem.variables if v not in resolved], confidence=0.95, is_fully_determined=(len(resolved) == len(problem.variables)) ) ) # 2. Transcendental Space Sweep (Fallback) trans_vars = [v for v in problem.variables if v not in tight_int_vars and math.isfinite(problem.bounds[v][0])] if trans_vars and len(trans_vars) <= 2: for v in trans_vars: lo, hi = problem.bounds[v] samples = [lo + (hi-lo)*r for r in [0.15, 0.5, 0.85]] for s in samples: pinned = {v: s} resolved, log = core.algebraic_propagate_pinned(problem, pinned) templates.append( Hypothesis( hid=f"trans_{v}_{int(s*100)%1000}", binding=resolved, h_type="grid_sweep", claim="Transcendental Sweep", derivation=log, pinned_vars=pinned, free_vars=[u for u in problem.variables if u not in resolved], confidence=0.75 ) ) return templates # ══════════════════════════════════════════════════════════════════════ # MATH ENGINE METHODS (Explicitly implemented inside local scope) # ══════════════════════════════════════════════════════════════════════ def _batched_deduce_and_evaluate(problem: core.Problem, hyps: List[Hypothesis], steps: int=80) -> List[Tuple[Dict, float, List[str], str]]: if not hyps: return [] skip_indices = [i for i, h in enumerate(hyps) if getattr(h, 'is_fully_determined', False) or len(h.free_vars) == 0] solve_indices = [i for i, h in enumerate(hyps) if i not in skip_indices] results = [None] * len(hyps) for i in skip_indices: hyp = hyps[i] try: ce = problem.scalar_energy(hyp.binding) dom_vars = list(hyp.pinned_vars.keys())[:3] results[i] = (hyp.binding, ce, dom_vars, "algebraic") except: results[i] = (hyp.binding, float('inf'), [], "algebraic_error") if not solve_indices: return [r for r in results if r is not None] adam_hyps = [hyps[i] for i in solve_indices] V = len(problem.variables) log_mask = [] log_lo, log_hi = [], [] for v in problem.variables: lo, hi = core._c15(problem.bounds[v][0]), core._c15(problem.bounds[v][1]) if v in problem.log_space_vars and lo > 0: log_mask.append(True) log_lo.append(math.log10(max(lo, 1e-30))) log_hi.append(math.log10(max(hi, 1e-30))) else: log_mask.append(False) log_lo.append(lo) log_hi.append(hi) log_mask_t = torch.tensor(log_mask, device=core.DEVICE, dtype=torch.bool) lo_param_t = torch.tensor(log_lo, device=core.DEVICE, dtype=torch.float32) hi_param_t = torch.tensor(log_hi, device=core.DEVICE, dtype=torch.float32) def _param_to_orig(P): orig = P.clone() if log_mask_t.any(): orig[:, log_mask_t] = torch.pow(10.0, P[:, log_mask_t]) return orig def _orig_to_param(x_val, j): if log_mask[j] and x_val > 0: return math.log10(max(x_val, 1e-30)) return x_val x_data_p, mask_data, target_data_p = [], [], [] for hyp in adam_hyps: xr, mr, tr = [], [], [] active_vars = problem.get_markov_blanket(set(hyp.pinned_vars.keys()), depth=2) for j, v in enumerate(problem.variables): lo, hi = core._c15(problem.bounds[v][0]), core._c15(problem.bounds[v][1]) if v in hyp.pinned_vars: p_val = _orig_to_param(core._c15(hyp.pinned_vars[v]), j) xr.append(p_val); mr.append(0.0); tr.append(p_val) else: p_val = _orig_to_param(core._c15(hyp.binding.get(v, (lo+hi)/2)), j) is_active = (v in active_vars) or (len(hyp.pinned_vars) == 0) xr.append(p_val); mr.append(1.0 if is_active else 0.0); tr.append(0.0) x_data_p.append(xr); mask_data.append(mr); target_data_p.append(tr) P = torch.tensor(x_data_p, device=core.DEVICE, dtype=torch.float32, requires_grad=True) mask = torch.tensor(mask_data, device=core.DEVICE, dtype=torch.float32) target = torch.tensor(target_data_p, device=core.DEVICE, dtype=torch.float32) optimizer = torch.optim.Adam([P], lr=0.01) for step in range(steps): optimizer.zero_grad() step_ratio = min(1.0, step / (steps * 0.8)) X_orig = _param_to_orig(P) ce = problem.tensor_energy(X_orig, step_ratio, is_optimizing=True) if isinstance(ce, torch.Tensor) and (ce < core.SOLVE_THRESHOLD).all() and step_ratio == 1.0: break ce.sum().backward() with torch.no_grad(): P.grad.clamp_(-10.0, 10.0) P.grad *= mask optimizer.step() P.data = torch.where(mask == 0.0, target, P.data) margin = 0.1 * (1.0 - step_ratio) lo_m = lo_param_t - (hi_param_t - lo_param_t) * margin hi_m = hi_param_t + (hi_param_t - lo_param_t) * margin P.data = torch.clamp(P.data, lo_m.unsqueeze(0), hi_m.unsqueeze(0)) X_orig_final = _param_to_orig(P) final_ce = problem.tensor_energy(X_orig_final, 1.0, is_optimizing=False).view(-1) ce_vals = final_ce.detach().cpu().numpy() X_vals = X_orig_final.detach().cpu().numpy() for b_idx, orig_idx in enumerate(solve_indices): final_b = {problem.variables[j]: float(X_vals[b_idx, j]) for j in range(V)} results[orig_idx] = (final_b, float(ce_vals[b_idx]), [], "systemic") return [r for r in results if r is not None] def _mprt_sample(problem: core.Problem, N: int): var_list = problem.variables; V = len(var_list) lo_t = torch.tensor([core._c15(problem.bounds.get(v, (-10.0, 10.0))[0]) for v in var_list], device=core.DEVICE, dtype=torch.float32) hi_t = torch.tensor([core._c15(problem.bounds.get(v, (-10.0, 10.0))[1]) for v in var_list], device=core.DEVICE, dtype=torch.float32) for i in range(V): if lo_t[i] >= hi_t[i]: m = (lo_t[i]+hi_t[i])/2; lo_t[i] = m - 1e-6; hi_t[i] = m + 1e-6 rand_base = torch.rand((N, V), device=core.DEVICE) lsv_indices = [problem.var_idx[v] for v in problem.log_space_vars if v in problem.var_idx] for idx in lsv_indices: lo_v, hi_v = lo_t[idx].item(), hi_t[idx].item() if lo_v > 0 and hi_v > lo_v: log_lo, log_hi = math.log10(max(lo_v, 1e-30)), math.log10(max(hi_v, 1e-30)) rand_base[:, idx] = torch.pow(10.0, torch.rand(N, device=core.DEVICE)*(log_hi-log_lo)+log_lo) / hi_v X = lo_t.unsqueeze(0) + (hi_t - lo_t).unsqueeze(0) * rand_base ce_batch = problem.tensor_energy(X, 1.0, is_optimizing=False).view(-1) best_idx = torch.argmin(ce_batch).item() return {v: float(X[best_idx, i].item()) for i, v in enumerate(var_list)}, ce_batch[best_idx].item() # ══════════════════════════════════════════════════════════════════════ # THE DECOUPLED SEQUENCE RAY TRACER # ══════════════════════════════════════════════════════════════════════ class SequenceRayTracer: def __init__(self, log_callback: Callable, update_state_callback: Callable): self.log = log_callback self.update_state = update_state_callback self.bandit = UCB1BanditSeeder() self.baton_registry = {} def trace(self, problem: core.Problem, init_b: Dict[str, float], init_ce: float, templates: List[Hypothesis], max_rays: int = 1500) -> Tuple[Dict[str, float], float, List[HypothesisTrace], str]: self.log(f"Initializing Axiom Ray Tracer on {problem.pid}...") sketch = compute_sketch(problem) for note in sketch["notes"]: self.log(f" [Sketch] {note}") best_ce = init_ce best_binding = dict(init_b) best_ray_name = "Zero-Shot" traces = [] solved = False # Build initial queue from verified templates queues = {"core": []} for idx, t in enumerate(templates): binding = dict(init_b); binding.update(t.pinned_vars) results = _batched_deduce_and_evaluate(problem, [t], steps=60) if results: final_b, final_ce, dom_vars, tc = results[0] if final_ce < best_ce: best_ce = final_ce best_binding = dict(final_b) best_ray_name = f"template:{t.h_type}" traces.append(HypothesisTrace( hid=t.hid, round_idx=idx, ray_name=f"template:{t.h_type}", ray_type="template", ce_before=init_ce, ce_after=final_ce, survived=True, elapsed_ms=0.0, g1_pass=(final_ce < core.SOLVE_THRESHOLD), binding=final_b, tension_class=tc )) # Register batons for structural branch mapping self.baton_registry[t.hid] = Baton( binding=final_b, ce=final_ce, ray_id=t.hid, l9=L9Certificate(final_ce, dom_vars, [], tc) ) # Build seed rays based on UCB1 Bandit weights affinities = sketch["affinities"] seeds_scored = sorted([(affinities.get(a, 1.0), idx, a) for idx, a in enumerate(ALL_AXIOMS)], key=lambda x: -x[0]) seed_rays = [AxiomRay([a], f"S{idx}", "seed", ce_prior=best_ce) for _, idx, a in seeds_scored[:150]] queues["core"].extend(seed_rays) model = StructuralModel(best_binding, best_ce) total_fired = 0 seed_baton = Baton(binding=best_binding, ce=best_ce, ray_id="SEED") while total_fired < max_rays and queues["core"]: batch_rays = queues["core"][:64] queues["core"] = queues["core"][64:] total_fired += len(batch_rays) batch_hyps = [] for idx, ray in enumerate(batch_rays): p_baton = ray.baton or seed_baton constructor = AXIOM_CONSTRUCTORS.get(ray.sequence[-1]) constructed_hyps = constructor(problem, p_baton, list(self.baton_registry.values()), model) if constructor else [] if not constructed_hyps: constructed_hyps = [_make_hyp("fb", p_baton.binding, "fallback", "Pass", [], {}, problem.variables, 0.1)] batch_hyps.append((idx, constructed_hyps[0])) eval_results = _batched_deduce_and_evaluate(problem, [h for _, h in batch_hyps], steps=80) for (ray_idx, hyp), (final_b, final_ce, dom_vars, tension_class) in zip(batch_hyps, eval_results): ray = batch_rays[ray_idx] parent_ce = (ray.baton or seed_baton).ce self.bandit.record_reward(ray.sequence[-1], parent_ce, final_ce) improved = final_ce < best_ce passed_pruning = final_ce < parent_ce * 0.98 if improved: best_ce = final_ce best_binding = dict(final_b) best_ray_name = ray.name model.best_binding = dict(final_b) model.best_ce = final_ce model.best_ray = ray.name self.update_state(best_ce=best_ce, total_fired=total_fired, best_ray=best_ray_name) if final_ce < core.SOLVE_THRESHOLD: solved = True out_baton = Baton( binding=final_b, ce=final_ce, ray_id=ray.ray_id, l9=L9Certificate(final_ce, dom_vars, [], tension_class) ) if final_ce < max(2.0, best_ce * 1.2): self.baton_registry[ray.ray_id] = out_baton if (passed_pruning or improved or ray.depth < 1) and ray.depth < 8: remaining_axioms = [a for a in ALL_AXIOMS if a not in ray.sequence] queues["core"].extend(self.bandit.intelligent_branch(ray, out_baton, remaining_axioms, branch_width=6)) if solved: self.log("Exact structural resonance found!") break return best_binding, best_ce, traces, best_ray_name # ══════════════════════════════════════════════════════════════════════ # UCB1 BANDIT SEEDER # ══════════════════════════════════════════════════════════════════════ class UCB1BanditSeeder: def __init__(self): self.stats = {a: {"tries": 0, "reward": 0.0} for a in ALL_AXIOMS} self.total_tries = 0 def record_reward(self, axiom, ce_before, ce_after): reward = max(0.0, ce_before - ce_after) self.stats[axiom]["tries"] += 1 self.stats[axiom]["reward"] += reward self.total_tries += 1 def intelligent_branch(self, ray, out_baton, remaining_axioms, branch_width): scored = [] for ax in remaining_axioms: tries = self.stats[ax]["tries"] if tries == 0: ucb = 999.0 else: avg_reward = self.stats[ax]["reward"] / tries exploration = math.sqrt(math.log(self.total_tries + 1) / tries) ucb = avg_reward + 0.5 * exploration scored.append((ucb, ax)) scored.sort(key=lambda x: x[0] * random.random(), reverse=True) children = [] for _, axiom in scored[:branch_width]: child = ray.extend(axiom, "branch", new_prior=out_baton.ce) if child: child.baton = out_baton children.append(child) return children