Spaces:
Sleeping
Sleeping
| """ | |
| practicality_axioms.py | |
| ====================== | |
| The complete, unabridged Axiom Deduction Engine. | |
| Houses all 28 structural axioms, constructors, the UCB1 Bandit, | |
| and the full decoupled SequenceRayTracer. | |
| """ | |
| import math | |
| import random | |
| import torch | |
| import itertools | |
| from typing import Dict, List, Tuple, Set, Optional, Any, Callable | |
| from dataclasses import dataclass, field | |
| from collections import defaultdict, deque | |
| import practicality_core as core | |
| class Hypothesis: | |
| hid: str | |
| binding: Dict[str, float] | |
| h_type: str | |
| claim: str | |
| derivation: List[str] | |
| pinned_vars: Dict[str, float] | |
| free_vars: List[str] | |
| confidence: float | |
| ce: float = float('inf') | |
| is_fully_determined: bool = False | |
| class L9Certificate: | |
| residual_ce: float; dominant_vars: List[str]; dominant_exprs: List[str]; tension_class: str = "unknown" | |
| class Baton: | |
| binding: Dict[str, float]; ce: float; ray_id: str = ""; depth: int = 0; l9: Optional[L9Certificate] = None | |
| class AxiomRay: | |
| sequence: List[str]; ray_id: str; ray_type: str = "seed" | |
| depth: int = 0; parent_id: Optional[str] = None | |
| baton: Optional[Baton] = None; ce_prior: float = float('inf') | |
| def name(self) -> str: return "β".join(a[:3] for a in self.sequence) | |
| def extend(self, axiom, rtype, new_prior=float('inf')): | |
| return AxiomRay(sequence=self.sequence+[axiom], ray_id=f"{self.ray_id}+{axiom[:3]}", | |
| ray_type=rtype, depth=self.depth+1, parent_id=self.ray_id, ce_prior=new_prior) | |
| class HypothesisTrace: | |
| hid: str; round_idx: int; ray_name: str; ray_type: str | |
| ce_before: float; ce_after: float; survived: bool; elapsed_ms: float | |
| g1_pass: bool = False; g2_pass: bool = False | |
| binding: Dict[str, float] = field(default_factory=dict) | |
| invariant_results: Dict[str, Tuple[float, bool]] = field(default_factory=dict) | |
| tension_class: str = "unknown"; depth: int = 0 | |
| class StructuralModel: | |
| best_binding: Dict[str, float]; best_ce: float; best_ray: str = "β" | |
| failure_patterns: Set[str] = field(default_factory=set) | |
| resonant_pairs: List[Tuple[str, str]] = field(default_factory=list) | |
| class Axiom: | |
| CONTINUOUS = "CONTINUOUS"; DISCRETE = "DISCRETE"; QUADRATIC = "QUADRATIC" | |
| BILINEAR = "BILINEAR"; METRIC = "METRIC"; SYMMETRIC = "SYMMETRIC" | |
| ORDERED = "ORDERED"; MONOTONE = "MONOTONE"; CONVEX = "CONVEX" | |
| MONOTONE_PRODUCT = "MONOTONE_PRODUCT"; CONSERVED = "CONSERVED"; MUTABLE = "MUTABLE" | |
| NETWORK = "NETWORK"; EQUILIBRIUM = "EQUILIBRIUM"; SUPERPOSITION = "SUPERPOSITION" | |
| LOCALITY = "LOCALITY"; EXTREMAL = "EXTREMAL"; ENTROPY = "ENTROPY" | |
| INJECTIVE = "INJECTIVE"; SURJECTIVE = "SURJECTIVE"; TRANSITIVE = "TRANSITIVE" | |
| ATOMIC = "ATOMIC"; IMPLICATION = "IMPLICATION"; NEGATION = "NEGATION" | |
| COMPOSITE = "COMPOSITE"; HOLISM = "HOLISM"; PARSIMONY = "PARSIMONY"; DUALITY = "DUALITY" | |
| ALL_AXIOMS = [getattr(Axiom, a) for a in dir(Axiom) if not a.startswith("__")] | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # AXIOM HYPOTHESIS CONSTRUCTORS | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _make_hyp(hid, binding, h_type, claim, derivation, pinned, free, conf, is_fd=False): | |
| return Hypothesis(hid=hid, binding=binding, h_type=h_type, claim=claim, | |
| derivation=derivation, pinned_vars=pinned, free_vars=free, | |
| confidence=conf, is_fully_determined=is_fd) | |
| def _hyp_continuous(p, bt, a, m): return [_make_hyp("cnt", bt.binding, "continuous", "Manifold", [], {}, p.variables, 0.45)] | |
| def _hyp_discrete(p, bt, a, m): | |
| b = dict(bt.binding); pinned = {} | |
| box = {v: core.IV(b.get(v, 0)-max(0.05*(p.bounds[v][1]-p.bounds[v][0]), 1e-4), | |
| b.get(v, 0)+max(0.05*(p.bounds[v][1]-p.bounds[v][0]), 1e-4)) for v in p.variables if v in p.bounds} | |
| cont = core._hc4(box, p.compiled_constraints) | |
| if cont: | |
| for v, iv in cont.items(): | |
| b[v] = iv.mid() | |
| if iv.width() < 1e-2: pinned[v] = b[v] | |
| return [_make_hyp("dis", b, "discrete", "TopoScope", [], pinned, p.variables, 0.80)] | |
| def _hyp_quadratic(p, bt, a, m): | |
| hyps = []; b = dict(bt.binding) | |
| if bt.l9 and bt.l9.dominant_vars: | |
| for v in bt.l9.dominant_vars[:2]: hyps.append(_make_hyp(f"qua_{v}", b, "quadratic", f"Root({v})", [], {v: b[v]}, p.variables, 0.70)) | |
| return hyps if hyps else [_make_hyp("qua", b, "quadratic", "Root", [], {}, p.variables, 0.55)] | |
| def _hyp_bilinear(p, bt, a, m): | |
| hyps = []; b = dict(bt.binding) | |
| for vi, vj in p.bilinear_pairs: | |
| lo_i, hi_i = p.bounds.get(vi, (0, 10)); lo_j, hi_j = p.bounds.get(vj, (0, 10)) | |
| product = abs(b.get(vi, 1)*b.get(vj, 1)) | |
| if product <= 0: continue | |
| gm = math.sqrt(product) | |
| if lo_i <= gm <= hi_i and lo_j <= gm <= hi_j: | |
| nb = dict(b); nb[vi] = nb[vj] = gm | |
| hyps.append(_make_hyp(f"bil_eq_{vi}_{vj}", nb, "bilinear", "GeoMean", ["bilinear"], {vi: gm, vj: gm}, [v for v in p.variables if v not in [vi, vj]], 0.70)) | |
| return hyps | |
| def _hyp_monotone_product(p, bt, a, m): | |
| hyps = []; b = dict(bt.binding) | |
| if not p.monotone_targets: return hyps | |
| try: tb = core._global_hc4_tighten_bounds(p) | |
| except: tb = dict(p.bounds) | |
| for v_small, v_large, k in p.monotone_targets: | |
| lo_s, hi_s = p.bounds.get(v_small, (-1e9, 1e9)); lo_l, hi_l = p.bounds.get(v_large, (-1e9, 1e9)) | |
| for ratio in [0.25, 0.5, 0.707, 0.9]: | |
| vs_sq = k*ratio | |
| if vs_sq <= 0: continue | |
| vs = math.sqrt(vs_sq); vl = k/vs | |
| if not(lo_s <= vs <= hi_s and lo_l <= vl <= hi_l and vs <= vl): continue | |
| nb = dict(b); nb[v_small] = vs; nb[v_large] = vl | |
| box = {u: core.IV(*tb.get(u, p.bounds.get(u, (-10, 10)))) for u in p.variables} | |
| box[v_small] = core.IV(vs-1e-6, vs+1e-6); box[v_large] = core.IV(vl-1e-6, vl+1e-6) | |
| cont = core._hc4(box, p.compiled_constraints); pinned = {v_small: vs, v_large: vl} | |
| if cont: | |
| for u, iv in cont.items(): | |
| if u in p.bounds: nb[u] = iv.mid() | |
| pinned = {u: nb[u] for u in p.variables if cont[u].width() < 1e-2} | |
| hyps.append(_make_hyp(f"mpr_{v_small}_{v_large}_r{int(ratio*100)}", nb, "monotone_product", | |
| "MonoProd", ["ordered", "hc4"], pinned, [u for u in p.variables if u not in pinned], 0.88)) | |
| return hyps | |
| def _hyp_metric(p, bt, a, m): | |
| hyps = []; b = dict(bt.binding) | |
| for mc in p.compiled_constraints: | |
| if mc.kind != "equality" or not mc.parsed or not mc.parsed.is_Add: continue | |
| sq_terms = [t for t in mc.parsed.args if t.is_Pow and t.args[1] == 2] | |
| if len(sq_terms) < 1: continue | |
| vars_in = [str(s) for s in mc.parsed.free_symbols if str(s) in p.variables] | |
| if not vars_in: continue | |
| const_terms = [float(t) for t in mc.parsed.args if t.is_Number] | |
| if not const_terms: continue | |
| target = -const_terms[0] | |
| if target <= 0: continue | |
| curr_val = sum(b.get(v, 0)**2 for v in vars_in) | |
| if curr_val < 1e-8: continue | |
| scale = math.sqrt(target/curr_val); nb = dict(b) | |
| for v in vars_in: | |
| lo, hi = p.bounds.get(v, (-10, 10)); nb[v] = max(lo, min(hi, b.get(v, 0)*scale)) | |
| hyps.append(_make_hyp(f"met_{hash(mc.expr_str)%9999}", nb, "metric", "RadialProject", ["metric", mc.expr_str], {}, list(p.variables), 0.75)) | |
| return hyps | |
| def _hyp_symmetric(p, bt, a, m): | |
| hyps = []; b = dict(bt.binding) | |
| vl = p.variables | |
| for i in range(min(len(vl), 8)): | |
| for j in range(i+1, min(len(vl), 8)): | |
| vi, vj = vl[i], vl[j] | |
| lo_i, hi_i = p.bounds.get(vi, (-1e9, 1e9)); lo_j, hi_j = p.bounds.get(vj, (-1e9, 1e9)) | |
| bvi, bvj = b.get(vi, 0), b.get(vj, 0) | |
| if lo_i <= bvj <= hi_i and lo_j <= bvi <= hi_j: | |
| nb = dict(b); nb[vi], nb[vj] = bvj, bvi | |
| hyps.append(_make_hyp(f"sym_{vi}_{vj}", nb, "symmetric", "Swap", ["symmetric"], {vi: nb[vi], vj: nb[vj]}, [v for v in p.variables if v not in [vi, vj]], 0.60)) | |
| return hyps[:5] | |
| def _hyp_ordered(p, bt, a, m): return [_make_hyp("ord", bt.binding, "ordered", "OrderBal", [], {}, p.variables, 0.67)] | |
| def _hyp_monotone(p, bt, a, m): return [_make_hyp("mon", bt.binding, "monotone", "MonoPath", [], {}, p.variables, 0.67)] | |
| def _hyp_convex(p, bt, a, m): return [_make_hyp("cvx", bt.binding, "convex", "ConvMix", [], {}, p.variables, 0.63)] | |
| def _hyp_conserved(p, bt, a, m): return [_make_hyp("csv", bt.binding, "conserved", "Conserve", [], {}, p.variables, 0.82)] | |
| def _hyp_mutable(p, bt, a, m): | |
| b = dict(bt.binding); pinned = {} | |
| if bt.l9 and bt.l9.dominant_vars: | |
| v = bt.l9.dominant_vars[0]; lo, hi = p.bounds.get(v, (-10, 10)) | |
| b[v] = max(lo, min(hi, b.get(v, 0)+random.gauss(0, (hi-lo)*0.05))); pinned = {v: b[v]} | |
| return [_make_hyp("mut", b, "mutable", "Perturb", [], pinned, p.variables, 0.40)] | |
| def _hyp_network(p, bt, a, m): return [_make_hyp("net", bt.binding, "network", "HubProp", [], {}, p.variables, 0.65)] | |
| def _hyp_equilibrium(p, bt, a, m): return [_make_hyp("eql", bt.binding, "equilibrium", "GradBal", [], {}, p.variables, 0.72)] | |
| def _hyp_superposition(p, bt, a, m): | |
| if a and len(a)>0: | |
| other=random.choice(a); lam=0.5 | |
| nb={v:lam*bt.binding.get(v,0)+(1-lam)*other.binding.get(v,0) for v in p.variables} | |
| return [_make_hyp("sup",nb,"superposition","Superpose",[],{},p.variables,0.58)] | |
| return [] | |
| def _hyp_locality(p, bt, a, m): return [_make_hyp("loc", bt.binding, "locality", "LocalFirst", [], {}, p.variables, 0.72)] | |
| def _hyp_extremal(p, bt, a, m): | |
| hyps = []; b = dict(bt.binding) | |
| if bt.l9 and bt.l9.dominant_vars: | |
| v = bt.l9.dominant_vars[0]; lo, hi = p.bounds.get(v, (0, 1)) | |
| for val in [lo, hi]: nb = dict(b); nb[v] = val; hyps.append(_make_hyp(f"ext_{v}_{val:.3f}", nb, "extremal", "PinBound", [], {v: val}, p.variables, 0.65)) | |
| return hyps | |
| def _hyp_entropy(p, bt, a, m): return [_make_hyp("ent", {v: random.uniform(*p.bounds[v]) for v in p.variables}, "entropy", "MaxEnt", [], {}, p.variables, 0.48)] | |
| def _hyp_injective(p, bt, a, m): return [_make_hyp("inj", bt.binding, "injective", "Distinct", [], {}, p.variables, 0.50)] | |
| def _hyp_surjective(p, bt, a, m): return [_make_hyp("sur", bt.binding, "surjective", "Sweep", [], {}, p.variables, 0.45)] | |
| def _hyp_transitive(p, bt, a, m): return [_make_hyp("tra", bt.binding, "transitive", "TransSnap", [], {}, p.variables, 0.77)] | |
| def _hyp_atomic(p, bt, a, m): | |
| hyps = []; b = dict(bt.binding) | |
| for mc in p.compiled_constraints: | |
| if not mc.projections: continue | |
| for v, projs in mc.projections.items(): | |
| for proj in projs: | |
| try: | |
| val = float(proj["func"](*[b.get(s, 0.0) for s in proj["syms"]])); lo, hi = p.bounds.get(v, (-1e9, 1e9)) | |
| if math.isfinite(val) and lo <= val <= hi: | |
| nb = dict(b); nb[v] = val; hyps.append(_make_hyp(f"ato_{v}", nb, "atomic", f"Solve({v})", [], {v: val}, p.variables, 0.70)); break | |
| except: pass | |
| if hyps: break | |
| return hyps | |
| def _hyp_implication(p, bt, a, m): return [_make_hyp("imp", bt.binding, "implication", "Imply", [], {}, p.variables, 0.78)] | |
| def _hyp_negation(p, bt, a, m): | |
| nb={v:max(p.bounds[v][0],min(p.bounds[v][1],(p.bounds[v][0]+p.bounds[v][1])-bt.binding.get(v,(p.bounds[v][0]+p.bounds[v][1])/2))) for v in p.variables} | |
| return [_make_hyp("neg",nb,"negation","Mirror",[],{},p.variables,0.42)] | |
| def _hyp_composite(p, bt, a, m): return [_make_hyp("cmp", bt.binding, "composite", "Enz", [], {}, p.variables, 0.90)] | |
| def _hyp_holism(p, bt, a, m): return [_make_hyp("hol", bt.binding, "holism", "Global", [], {}, p.variables, 0.73)] | |
| def _hyp_parsimony(p, bt, a, m): | |
| b = dict(bt.binding); nb = {} | |
| max_val = max((abs(v) for v in b.values() if math.isfinite(v)), default=1.0) | |
| for v in p.variables: | |
| val = b[v]; lo, hi = p.bounds[v] | |
| if not math.isfinite(val): nb[v] = 0.0; continue | |
| if abs(val) < 0.1 * max_val and lo <= 0.0 <= hi: nb[v] = 0.0 | |
| else: | |
| candidates = [round(val*m)/m for m in [1, 2, 4] if lo <= round(val*m)/m <= hi] | |
| nb[v] = min(candidates, key=lambda c: abs(c-val)) if candidates else val | |
| return [_make_hyp("par", nb, "parsimony", "Occam", [], {}, p.variables, 0.58)] | |
| def _hyp_duality(p, bt, a, m): return [_make_hyp("dua", bt.binding, "duality", "Tight", [], {}, p.variables, 0.68)] | |
| AXIOM_CONSTRUCTORS = { | |
| Axiom.CONTINUOUS: _hyp_continuous, Axiom.DISCRETE: _hyp_discrete, | |
| Axiom.QUADRATIC: _hyp_quadratic, Axiom.BILINEAR: _hyp_bilinear, | |
| Axiom.METRIC: _hyp_metric, Axiom.SYMMETRIC: _hyp_symmetric, | |
| Axiom.ORDERED: _hyp_ordered, Axiom.MONOTONE: _hyp_monotone, | |
| Axiom.CONVEX: _hyp_convex, Axiom.CONSERVED: _hyp_conserved, | |
| Axiom.MUTABLE: _hyp_mutable, Axiom.NETWORK: _hyp_network, | |
| Axiom.EQUILIBRIUM: _hyp_equilibrium, Axiom.SUPERPOSITION: _hyp_superposition, | |
| Axiom.LOCALITY: _hyp_locality, Axiom.EXTREMAL: _hyp_extremal, | |
| Axiom.ENTROPY: _hyp_entropy, Axiom.INJECTIVE: _hyp_injective, | |
| Axiom.SURJECTIVE: _hyp_surjective, Axiom.TRANSITIVE: _hyp_transitive, | |
| Axiom.ATOMIC: _hyp_atomic, Axiom.IMPLICATION: _hyp_implication, | |
| Axiom.NEGATION: _hyp_negation, Axiom.COMPOSITE: _hyp_composite, | |
| Axiom.HOLISM: _hyp_holism, Axiom.PARSIMONY: _hyp_parsimony, | |
| Axiom.DUALITY: _hyp_duality, Axiom.MONOTONE_PRODUCT: _hyp_monotone_product | |
| } | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # SEED & TEMPLATE SYSTEM | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def compute_sketch(problem: core.Problem) -> Dict[str, Any]: | |
| notes = [] | |
| affinities = {a: 1.0 for a in ALL_AXIOMS} | |
| if problem.bilinear_pairs: | |
| affinities[Axiom.BILINEAR] += 4.0 | |
| notes.append(f"Found bilinear pairs: {problem.bilinear_pairs}") | |
| if problem.monotone_targets: | |
| affinities[Axiom.MONOTONE_PRODUCT] += 5.0 | |
| notes.append("Boosted Monotone Product based on algebraic layout.") | |
| if any(mc.kind == "equality" and mc.parsed is not None and mc.parsed.is_Add for mc in problem.compiled_constraints): | |
| affinities[Axiom.METRIC] += 3.0 | |
| notes.append("System contains sum-of-squares style constraints.") | |
| return {"notes": notes, "affinities": affinities} | |
| def generate_templates(problem: core.Problem, sketch: Dict[str, Any]) -> List[Hypothesis]: | |
| templates = [] | |
| # 1. INT Grid Sweep & Algebraic Propagation | |
| tight_int_vars = [v for v in problem.variables if v in problem.int_vars | |
| and math.isfinite(problem.bounds[v][0]) | |
| and problem.bounds[v][1] - problem.bounds[v][0] <= 100] | |
| if tight_int_vars: | |
| grids = [] | |
| for v in tight_int_vars: | |
| lo = int(math.ceil(problem.bounds[v][0])) | |
| hi = int(math.floor(problem.bounds[v][1])) | |
| grids.append([(v, float(val)) for val in range(lo, hi+1)]) | |
| combos = list(itertools.product(*grids))[:1000] # Cap search width | |
| for combo in combos: | |
| pinned = dict(combo) | |
| resolved, log = core.algebraic_propagate_pinned(problem, pinned) | |
| templates.append( | |
| Hypothesis( | |
| hid=f"grid_{hash(str(combo))%100000}", binding=resolved, | |
| h_type="int_grid", claim="Integer Grid Sweep", derivation=log, | |
| pinned_vars=pinned, free_vars=[v for v in problem.variables if v not in resolved], | |
| confidence=0.95, is_fully_determined=(len(resolved) == len(problem.variables)) | |
| ) | |
| ) | |
| # 2. Transcendental Space Sweep (Fallback) | |
| trans_vars = [v for v in problem.variables if v not in tight_int_vars and math.isfinite(problem.bounds[v][0])] | |
| if trans_vars and len(trans_vars) <= 2: | |
| for v in trans_vars: | |
| lo, hi = problem.bounds[v] | |
| samples = [lo + (hi-lo)*r for r in [0.15, 0.5, 0.85]] | |
| for s in samples: | |
| pinned = {v: s} | |
| resolved, log = core.algebraic_propagate_pinned(problem, pinned) | |
| templates.append( | |
| Hypothesis( | |
| hid=f"trans_{v}_{int(s*100)%1000}", binding=resolved, | |
| h_type="grid_sweep", claim="Transcendental Sweep", derivation=log, | |
| pinned_vars=pinned, free_vars=[u for u in problem.variables if u not in resolved], | |
| confidence=0.75 | |
| ) | |
| ) | |
| return templates | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # MATH ENGINE METHODS (Explicitly implemented inside local scope) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _batched_deduce_and_evaluate(problem: core.Problem, hyps: List[Hypothesis], steps: int=80) -> List[Tuple[Dict, float, List[str], str]]: | |
| if not hyps: return [] | |
| skip_indices = [i for i, h in enumerate(hyps) if getattr(h, 'is_fully_determined', False) or len(h.free_vars) == 0] | |
| solve_indices = [i for i, h in enumerate(hyps) if i not in skip_indices] | |
| results = [None] * len(hyps) | |
| for i in skip_indices: | |
| hyp = hyps[i] | |
| try: | |
| ce = problem.scalar_energy(hyp.binding) | |
| dom_vars = list(hyp.pinned_vars.keys())[:3] | |
| results[i] = (hyp.binding, ce, dom_vars, "algebraic") | |
| except: results[i] = (hyp.binding, float('inf'), [], "algebraic_error") | |
| if not solve_indices: return [r for r in results if r is not None] | |
| adam_hyps = [hyps[i] for i in solve_indices] | |
| V = len(problem.variables) | |
| log_mask = [] | |
| log_lo, log_hi = [], [] | |
| for v in problem.variables: | |
| lo, hi = core._c15(problem.bounds[v][0]), core._c15(problem.bounds[v][1]) | |
| if v in problem.log_space_vars and lo > 0: | |
| log_mask.append(True) | |
| log_lo.append(math.log10(max(lo, 1e-30))) | |
| log_hi.append(math.log10(max(hi, 1e-30))) | |
| else: | |
| log_mask.append(False) | |
| log_lo.append(lo) | |
| log_hi.append(hi) | |
| log_mask_t = torch.tensor(log_mask, device=core.DEVICE, dtype=torch.bool) | |
| lo_param_t = torch.tensor(log_lo, device=core.DEVICE, dtype=torch.float32) | |
| hi_param_t = torch.tensor(log_hi, device=core.DEVICE, dtype=torch.float32) | |
| def _param_to_orig(P): | |
| orig = P.clone() | |
| if log_mask_t.any(): orig[:, log_mask_t] = torch.pow(10.0, P[:, log_mask_t]) | |
| return orig | |
| def _orig_to_param(x_val, j): | |
| if log_mask[j] and x_val > 0: return math.log10(max(x_val, 1e-30)) | |
| return x_val | |
| x_data_p, mask_data, target_data_p = [], [], [] | |
| for hyp in adam_hyps: | |
| xr, mr, tr = [], [], [] | |
| active_vars = problem.get_markov_blanket(set(hyp.pinned_vars.keys()), depth=2) | |
| for j, v in enumerate(problem.variables): | |
| lo, hi = core._c15(problem.bounds[v][0]), core._c15(problem.bounds[v][1]) | |
| if v in hyp.pinned_vars: | |
| p_val = _orig_to_param(core._c15(hyp.pinned_vars[v]), j) | |
| xr.append(p_val); mr.append(0.0); tr.append(p_val) | |
| else: | |
| p_val = _orig_to_param(core._c15(hyp.binding.get(v, (lo+hi)/2)), j) | |
| is_active = (v in active_vars) or (len(hyp.pinned_vars) == 0) | |
| xr.append(p_val); mr.append(1.0 if is_active else 0.0); tr.append(0.0) | |
| x_data_p.append(xr); mask_data.append(mr); target_data_p.append(tr) | |
| P = torch.tensor(x_data_p, device=core.DEVICE, dtype=torch.float32, requires_grad=True) | |
| mask = torch.tensor(mask_data, device=core.DEVICE, dtype=torch.float32) | |
| target = torch.tensor(target_data_p, device=core.DEVICE, dtype=torch.float32) | |
| optimizer = torch.optim.Adam([P], lr=0.01) | |
| for step in range(steps): | |
| optimizer.zero_grad() | |
| step_ratio = min(1.0, step / (steps * 0.8)) | |
| X_orig = _param_to_orig(P) | |
| ce = problem.tensor_energy(X_orig, step_ratio, is_optimizing=True) | |
| if isinstance(ce, torch.Tensor) and (ce < core.SOLVE_THRESHOLD).all() and step_ratio == 1.0: break | |
| ce.sum().backward() | |
| with torch.no_grad(): | |
| P.grad.clamp_(-10.0, 10.0) | |
| P.grad *= mask | |
| optimizer.step() | |
| P.data = torch.where(mask == 0.0, target, P.data) | |
| margin = 0.1 * (1.0 - step_ratio) | |
| lo_m = lo_param_t - (hi_param_t - lo_param_t) * margin | |
| hi_m = hi_param_t + (hi_param_t - lo_param_t) * margin | |
| P.data = torch.clamp(P.data, lo_m.unsqueeze(0), hi_m.unsqueeze(0)) | |
| X_orig_final = _param_to_orig(P) | |
| final_ce = problem.tensor_energy(X_orig_final, 1.0, is_optimizing=False).view(-1) | |
| ce_vals = final_ce.detach().cpu().numpy() | |
| X_vals = X_orig_final.detach().cpu().numpy() | |
| for b_idx, orig_idx in enumerate(solve_indices): | |
| final_b = {problem.variables[j]: float(X_vals[b_idx, j]) for j in range(V)} | |
| results[orig_idx] = (final_b, float(ce_vals[b_idx]), [], "systemic") | |
| return [r for r in results if r is not None] | |
| def _mprt_sample(problem: core.Problem, N: int): | |
| var_list = problem.variables; V = len(var_list) | |
| lo_t = torch.tensor([core._c15(problem.bounds.get(v, (-10.0, 10.0))[0]) for v in var_list], device=core.DEVICE, dtype=torch.float32) | |
| hi_t = torch.tensor([core._c15(problem.bounds.get(v, (-10.0, 10.0))[1]) for v in var_list], device=core.DEVICE, dtype=torch.float32) | |
| for i in range(V): | |
| if lo_t[i] >= hi_t[i]: m = (lo_t[i]+hi_t[i])/2; lo_t[i] = m - 1e-6; hi_t[i] = m + 1e-6 | |
| rand_base = torch.rand((N, V), device=core.DEVICE) | |
| lsv_indices = [problem.var_idx[v] for v in problem.log_space_vars if v in problem.var_idx] | |
| for idx in lsv_indices: | |
| lo_v, hi_v = lo_t[idx].item(), hi_t[idx].item() | |
| if lo_v > 0 and hi_v > lo_v: | |
| log_lo, log_hi = math.log10(max(lo_v, 1e-30)), math.log10(max(hi_v, 1e-30)) | |
| rand_base[:, idx] = torch.pow(10.0, torch.rand(N, device=core.DEVICE)*(log_hi-log_lo)+log_lo) / hi_v | |
| X = lo_t.unsqueeze(0) + (hi_t - lo_t).unsqueeze(0) * rand_base | |
| ce_batch = problem.tensor_energy(X, 1.0, is_optimizing=False).view(-1) | |
| best_idx = torch.argmin(ce_batch).item() | |
| return {v: float(X[best_idx, i].item()) for i, v in enumerate(var_list)}, ce_batch[best_idx].item() | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # THE DECOUPLED SEQUENCE RAY TRACER | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class SequenceRayTracer: | |
| def __init__(self, log_callback: Callable, update_state_callback: Callable): | |
| self.log = log_callback | |
| self.update_state = update_state_callback | |
| self.bandit = UCB1BanditSeeder() | |
| self.baton_registry = {} | |
| def trace(self, problem: core.Problem, init_b: Dict[str, float], init_ce: float, templates: List[Hypothesis], max_rays: int = 1500) -> Tuple[Dict[str, float], float, List[HypothesisTrace], str]: | |
| self.log(f"Initializing Axiom Ray Tracer on {problem.pid}...") | |
| sketch = compute_sketch(problem) | |
| for note in sketch["notes"]: | |
| self.log(f" [Sketch] {note}") | |
| best_ce = init_ce | |
| best_binding = dict(init_b) | |
| best_ray_name = "Zero-Shot" | |
| traces = [] | |
| solved = False | |
| # Build initial queue from verified templates | |
| queues = {"core": []} | |
| for idx, t in enumerate(templates): | |
| binding = dict(init_b); binding.update(t.pinned_vars) | |
| results = _batched_deduce_and_evaluate(problem, [t], steps=60) | |
| if results: | |
| final_b, final_ce, dom_vars, tc = results[0] | |
| if final_ce < best_ce: | |
| best_ce = final_ce | |
| best_binding = dict(final_b) | |
| best_ray_name = f"template:{t.h_type}" | |
| traces.append(HypothesisTrace( | |
| hid=t.hid, round_idx=idx, ray_name=f"template:{t.h_type}", ray_type="template", | |
| ce_before=init_ce, ce_after=final_ce, survived=True, elapsed_ms=0.0, | |
| g1_pass=(final_ce < core.SOLVE_THRESHOLD), binding=final_b, tension_class=tc | |
| )) | |
| # Register batons for structural branch mapping | |
| self.baton_registry[t.hid] = Baton( | |
| binding=final_b, ce=final_ce, ray_id=t.hid, | |
| l9=L9Certificate(final_ce, dom_vars, [], tc) | |
| ) | |
| # Build seed rays based on UCB1 Bandit weights | |
| affinities = sketch["affinities"] | |
| seeds_scored = sorted([(affinities.get(a, 1.0), idx, a) for idx, a in enumerate(ALL_AXIOMS)], key=lambda x: -x[0]) | |
| seed_rays = [AxiomRay([a], f"S{idx}", "seed", ce_prior=best_ce) for _, idx, a in seeds_scored[:150]] | |
| queues["core"].extend(seed_rays) | |
| model = StructuralModel(best_binding, best_ce) | |
| total_fired = 0 | |
| seed_baton = Baton(binding=best_binding, ce=best_ce, ray_id="SEED") | |
| while total_fired < max_rays and queues["core"]: | |
| batch_rays = queues["core"][:64] | |
| queues["core"] = queues["core"][64:] | |
| total_fired += len(batch_rays) | |
| batch_hyps = [] | |
| for idx, ray in enumerate(batch_rays): | |
| p_baton = ray.baton or seed_baton | |
| constructor = AXIOM_CONSTRUCTORS.get(ray.sequence[-1]) | |
| constructed_hyps = constructor(problem, p_baton, list(self.baton_registry.values()), model) if constructor else [] | |
| if not constructed_hyps: | |
| constructed_hyps = [_make_hyp("fb", p_baton.binding, "fallback", "Pass", [], {}, problem.variables, 0.1)] | |
| batch_hyps.append((idx, constructed_hyps[0])) | |
| eval_results = _batched_deduce_and_evaluate(problem, [h for _, h in batch_hyps], steps=80) | |
| for (ray_idx, hyp), (final_b, final_ce, dom_vars, tension_class) in zip(batch_hyps, eval_results): | |
| ray = batch_rays[ray_idx] | |
| parent_ce = (ray.baton or seed_baton).ce | |
| self.bandit.record_reward(ray.sequence[-1], parent_ce, final_ce) | |
| improved = final_ce < best_ce | |
| passed_pruning = final_ce < parent_ce * 0.98 | |
| if improved: | |
| best_ce = final_ce | |
| best_binding = dict(final_b) | |
| best_ray_name = ray.name | |
| model.best_binding = dict(final_b) | |
| model.best_ce = final_ce | |
| model.best_ray = ray.name | |
| self.update_state(best_ce=best_ce, total_fired=total_fired, best_ray=best_ray_name) | |
| if final_ce < core.SOLVE_THRESHOLD: | |
| solved = True | |
| out_baton = Baton( | |
| binding=final_b, ce=final_ce, ray_id=ray.ray_id, | |
| l9=L9Certificate(final_ce, dom_vars, [], tension_class) | |
| ) | |
| if final_ce < max(2.0, best_ce * 1.2): | |
| self.baton_registry[ray.ray_id] = out_baton | |
| if (passed_pruning or improved or ray.depth < 1) and ray.depth < 8: | |
| remaining_axioms = [a for a in ALL_AXIOMS if a not in ray.sequence] | |
| queues["core"].extend(self.bandit.intelligent_branch(ray, out_baton, remaining_axioms, branch_width=6)) | |
| if solved: | |
| self.log("Exact structural resonance found!") | |
| break | |
| return best_binding, best_ce, traces, best_ray_name | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # UCB1 BANDIT SEEDER | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class UCB1BanditSeeder: | |
| def __init__(self): | |
| self.stats = {a: {"tries": 0, "reward": 0.0} for a in ALL_AXIOMS} | |
| self.total_tries = 0 | |
| def record_reward(self, axiom, ce_before, ce_after): | |
| reward = max(0.0, ce_before - ce_after) | |
| self.stats[axiom]["tries"] += 1 | |
| self.stats[axiom]["reward"] += reward | |
| self.total_tries += 1 | |
| def intelligent_branch(self, ray, out_baton, remaining_axioms, branch_width): | |
| scored = [] | |
| for ax in remaining_axioms: | |
| tries = self.stats[ax]["tries"] | |
| if tries == 0: ucb = 999.0 | |
| else: | |
| avg_reward = self.stats[ax]["reward"] / tries | |
| exploration = math.sqrt(math.log(self.total_tries + 1) / tries) | |
| ucb = avg_reward + 0.5 * exploration | |
| scored.append((ucb, ax)) | |
| scored.sort(key=lambda x: x[0] * random.random(), reverse=True) | |
| children = [] | |
| for _, axiom in scored[:branch_width]: | |
| child = ray.extend(axiom, "branch", new_prior=out_baton.ce) | |
| if child: | |
| child.baton = out_baton | |
| children.append(child) | |
| return children |