testing_space / app3.py
everydaytok's picture
Rename app2.py to app3.py
0f1a57f verified
#!/usr/bin/env python3
"""
PRACTICALITY SYSTEM 21.0 — MPR ORDERING FIX + BRANCH WIDTH 12
═══════════════════════════════════════════════════════════════════
CHANGES FROM 20.0:
1. BRANCH_WIDTH: 4 → 12 (more exploration per earned ray)
2. MPR ORDERING BUG FIX: The v_small/v_large detection now reads
expression sign correctly instead of relying on sympy's
alphabetical symbol ordering. For GEQ expr like "t2-t1", we
parse which symbol has a positive coefficient (v_large) and
which has a negative coefficient (v_small).
3. SYSTEMIC BRANCH BOOST: When tension_class=="systemic" and
ce>0.1, branch width scales to 16 to handle high-condition-
number constraint Jacobians (BilinearChain6 class problems).
4. GPU UTILIZATION FIX: Tensor operations moved to stay on device
throughout batch; eliminated CPU round-trips in energy loop.
"""
import time, random, math, threading, warnings
from functools import reduce
from typing import Optional, Callable, List, Dict, Tuple, Any, Set
from dataclasses import dataclass, field
from collections import defaultdict, deque
import numpy as np
import sympy as sp
from sympy.parsing.sympy_parser import parse_expr
import torch
import gradio as gr
warnings.filterwarnings("ignore")
USE_GPU = torch.cuda.is_available()
DEVICE = torch.device("cuda" if USE_GPU else "cpu")
print(f"[SYSTEM 21.0] Compute: {DEVICE.type.upper()} | MPR Fix + Branch-12")
# ══════════════════════════════════════════════════════════════════════
# SECTION 1: CONSTANTS
# ══════════════════════════════════════════════════════════════════════
SOLVE_THRESHOLD = 0.05
VERIFY_G1_THRESHOLD = 0.08
VERIFY_G2_TOLERANCE = 1e-3
DEDUCE_ADAM_STEPS = 25
N_MPRT_EXPLORE = 1000
PROJECTION_CACHE: Dict = {}
MAX_TOTAL_RAYS = 1_500_000
RAY_BATCH_SIZE = 12288
CE_EARN_RATIO = 0.90
SCOUT_CE_THRESHOLD = 0.5
BRANCH_WIDTH = 12 # 20.0 was 4
BRANCH_WIDTH_SYSTEMIC = 16 # extra width for systemic tension
BATON_REGISTRY_THRESHOLD = 2.0
BATON_REGISTRY_MAX = 5000
# ══════════════════════════════════════════════════════════════════════
# SECTION 2: INTERVAL ARITHMETIC (HC4)
# ══════════════════════════════════════════════════════════════════════
class IV:
__slots__ = ("lo","hi")
def __init__(self,lo,hi): self.lo=float(lo); self.hi=float(hi)
def __add__(self,o): return IV(self.lo+o,self.hi+o) if isinstance(o,(int,float)) else IV(self.lo+o.lo,self.hi+o.hi)
__radd__=__add__
def __sub__(self,o): return IV(self.lo-o,self.hi-o) if isinstance(o,(int,float)) else IV(self.lo-o.hi,self.hi-o.lo)
def __rsub__(self,o): return IV(o-self.hi,o-self.lo) if isinstance(o,(int,float)) else o.__sub__(self)
def __mul__(self,o):
if isinstance(o,(int,float)): a,b=self.lo*o,self.hi*o; return IV(min(a,b),max(a,b))
p=(self.lo*o.lo,self.lo*o.hi,self.hi*o.lo,self.hi*o.hi); return IV(min(p),max(p))
__rmul__=__mul__
def __truediv__(self,o):
if isinstance(o,(int,float)):
if abs(o)<1e-15: return IV(-1e18,1e18)
a,b=self.lo/o,self.hi/o; return IV(min(a,b),max(a,b))
if o.lo<=0<=o.hi: return IV(-1e18,1e18)
return self*IV(1.0/o.hi,1.0/o.lo)
def __rtruediv__(self,o):
if self.lo<=0<=self.hi: return IV(-1e18,1e18)
return IV(o/self.hi,o/self.lo) if o>=0 else IV(o/self.lo,o/self.hi)
def __neg__(self): return IV(-self.hi,-self.lo)
def __pow__(self,n):
if isinstance(n,int):
if n==0: return IV(1.0,1.0)
if n%2==0:
if self.lo>=0: return IV(self.lo**n,self.hi**n)
if self.hi<=0: return IV(self.hi**n,self.lo**n)
return IV(0.0,max(abs(self.lo)**n,abs(self.hi)**n))
return IV(self.lo**n if self.lo>=0 else -((-self.lo)**n),
self.hi**n if self.hi>=0 else -((-self.hi)**n))
if self.lo<0: return IV(0.0,max(abs(self.lo)**n,self.hi**n))
return IV(self.lo**n,self.hi**n)
def contains_zero(self): return self.lo<=0.0<=self.hi
def width(self): return max(0.0,self.hi-self.lo)
def mid(self): return (self.lo+self.hi)*0.5
def compile_iv(expr,variables):
def _c(e):
if e.is_Number: v=float(e); return lambda box,_v=v: IV(_v,_v)
if e.is_Symbol: n=str(e); return lambda box,_n=n: box.get(_n,IV(-1e18,1e18))
if e.is_Add: fs=[_c(a) for a in e.args]; return lambda box,_fs=fs: reduce(lambda a,b:a+b,(_f(box) for _f in _fs))
if e.is_Mul: fs=[_c(a) for a in e.args]; return lambda box,_fs=fs: reduce(lambda a,b:a*b,(_f(box) for _f in _fs))
if e.is_Pow:
bc=_c(e.args[0]); ex=e.args[1]
if ex.is_Number: return lambda box,_bc=bc,_ex=float(ex): _bc(box)**_ex
exc=_c(ex); return lambda box,_bc=bc,_exc=exc: _bc(box)**_exc(box).mid()
return lambda box: IV(-1e18,1e18)
return _c(expr)
def _hc4(box,constraints):
cur=dict(box)
for mc in constraints:
if getattr(mc,'weight',1.0)==0.0: continue
if mc.kind=="or_eq":
valid=False
for bmc in mc.branches:
if bmc.fast_iv is None: valid=True; break
try:
if bmc.fast_iv(cur).contains_zero(): valid=True; break
except: valid=True; break
if not valid: return None
else:
if mc.fast_iv is None: continue
try:
riv=mc.fast_iv(cur)
if ((mc.kind=="equality" and not riv.contains_zero()) or
(mc.kind=="inequality" and ((mc.direction=="geq" and riv.hi<-1e-10) or
(mc.direction=="leq" and riv.lo>1e-10)))): return None
except: pass
return cur
def _global_hc4_tighten_bounds(problem):
gb={v:IV(lo,hi) for v,(lo,hi) in problem.bounds.items()}
c=_hc4(gb,problem.compiled_constraints)
if c is None: return dict(problem.bounds)
return {v:(max(problem.bounds[v][0],c[v].lo),min(problem.bounds[v][1],c[v].hi))
for v in problem.bounds if v in c}
# ══════════════════════════════════════════════════════════════════════
# SECTION 3: PSL PARSER & PRE-COMPILATION
# ══════════════════════════════════════════════════════════════════════
@dataclass
class PSLVar: name:str; lo:float; hi:float
@dataclass
class PSLConstraint: kind:str; expr:str; scope:str="root"; weight:float=1.0; branches:List[str]=field(default_factory=list)
@dataclass
class PSLScope: name:str; order:int=0; depends:List[str]=field(default_factory=list); vars:List[PSLVar]=field(default_factory=list); links:List[str]=field(default_factory=list); constraints:List[PSLConstraint]=field(default_factory=list)
@dataclass
class PSLProgram: name:str; vars:List[PSLVar]; constraints:List[PSLConstraint]; scopes:List[PSLScope]; scope_order:List[str]; annotations:Dict[str,str]=field(default_factory=dict)
def parse_psl(text:str) -> PSLProgram:
lines=[l.strip() for l in text.strip().split('\n') if l.strip() and not l.strip().startswith('#')]
prog=PSLProgram("unnamed",[],[],[],[]); i=0
def advance(): nonlocal i; l=lines[i]; i+=1; return l
def peek(): return lines[i] if i<len(lines) else ""
def pvar(rest): p=rest.split(); return PSLVar(p[0],float(p[1]),float(p[2])) if len(p)>=3 else None
def pw(rest): return (rest.split('WEIGHT')[0].strip(),float(rest.split('WEIGHT')[1].split()[0])) if 'WEIGHT' in rest else (rest.strip(),1.0)
def parse_scope_body(sname,order=0,depends=None):
svars,scons,links=[],[],[]
while i<len(lines):
l=peek(); tk=l.split(None,1); kw=tk[0].upper() if tk else ""; rest=tk[1].strip() if len(tk)>1 else ""
if kw in ("END","END SCOPE"): advance(); break
advance()
if kw=="VAR": v=pvar(rest); svars.append(v) if v else None
elif kw=="LINK": links.extend(rest.split())
elif kw in ("EQ","GEQ","LEQ"): expr,wt=pw(rest); scons.append(PSLConstraint(kw.lower(),expr,scope=sname,weight=wt))
elif kw=="CONSERVE": expr,_=pw(rest); scons.append(PSLConstraint("eq",expr,scope=sname,weight=10.0))
elif kw=="BRANCH":
opts=[]
while i<len(lines) and not peek().upper().startswith("END"):
bl=advance(); opts.append(bl.split(None,1)[1].strip()) if bl.upper().startswith("OPTION") else None
advance(); scons.append(PSLConstraint("or_eq","",scope=sname,weight=1.0,branches=opts))
return PSLScope(sname,order,depends or [],svars,links,scons)
while i<len(lines):
l=advance(); tk=l.split(None,1); kw=tk[0].upper() if tk else ""; rest=tk[1].strip() if len(tk)>1 else ""
if kw=="SPACE": prog.name=rest.split()[0] if rest else "unnamed"
elif kw=="VAR": v=pvar(rest); prog.vars.append(v) if v else None
elif kw in ("EQ","GEQ","LEQ"): expr,wt=pw(rest); prog.constraints.append(PSLConstraint(kw.lower(),expr,weight=wt))
elif kw=="CONSERVE": expr,_=pw(rest); prog.constraints.append(PSLConstraint("eq",expr,weight=10.0))
elif kw=="BRANCH":
opts=[]
while i<len(lines) and not peek().upper().startswith("END"):
bl=advance(); opts.append(bl.split(None,1)[1].strip()) if bl.upper().startswith("OPTION") else None
advance(); prog.constraints.append(PSLConstraint("or_eq","",weight=1.0,branches=opts))
elif kw=="SCOPE":
parts=rest.split(); sname=parts[0] if parts else f"s{len(prog.scopes)}"; order=0; depends=[]
if "ORDER" in rest.upper():
try: order=int(rest[rest.upper().index("ORDER"):].split()[1])
except: pass
if "DEPENDS" in rest.upper(): depends=rest[rest.upper().index("DEPENDS"):].split()[1:]
sc=parse_scope_body(sname,order,depends); prog.scopes.append(sc); prog.scope_order.append(sname)
prog.scopes.sort(key=lambda s:s.order); prog.scope_order=[s.name for s in prog.scopes]
return prog
@dataclass
class ExpandedProblem:
variables:List[str]; bounds:Dict[str,Tuple[float,float]]; constraints:List['Constraint']
scope_groups:Dict[str,List[int]]; scope_vars:Dict[str,List[str]]; scope_order:List[str]
def expand_psl(prog:PSLProgram) -> ExpandedProblem:
variables=[]; bounds={}; constraints=[]; scope_groups=defaultdict(list); scope_vars=defaultdict(list)
def add_var(name,lo,hi,scope="root"):
if name not in bounds: variables.append(name); bounds[name]=(lo,hi)
if scope!="root" and name not in scope_vars[scope]: scope_vars[scope].append(name)
def add_con(kind,expr,direction,scope="root",weight=1.0,branches=None):
constraints.append(Constraint(kind,expr,direction,weight,scope,branches or []))
scope_groups[scope].append(len(constraints)-1)
try:
syms={v:sp.Symbol(v) for v in variables}
parsed=parse_expr(expr,local_dict=syms) if kind!="or_eq" else None
if parsed:
for v in variables:
if sp.Symbol(v) in parsed.free_symbols and scope!="root" and v not in scope_vars[scope]:
scope_vars[scope].append(v)
except: pass
for v in prog.vars: add_var(v.name,v.lo,v.hi)
for c in prog.constraints:
add_con("or_eq" if c.kind=="or_eq" else "equality" if c.kind=="eq" else "inequality",
c.expr,c.kind,"root",c.weight,c.branches)
for sc in prog.scopes:
for v in sc.vars: add_var(v.name,v.lo,v.hi,sc.name)
for c in sc.constraints:
add_con("or_eq" if c.kind=="or_eq" else "equality" if c.kind=="eq" else "inequality",
c.expr,c.kind,sc.name,c.weight,c.branches)
return ExpandedProblem(variables,bounds,constraints,dict(scope_groups),dict(scope_vars),prog.scope_order)
@dataclass
class AXLInvariant:
name:str; expr:str; tolerance:float=VERIFY_G2_TOLERANCE; mode:str="eq"
compiled_func: Optional[Callable] = field(default=None, repr=False)
syms_used: List[str] = field(default_factory=list)
def compile(self, variables: List[str]):
syms = {v: sp.Symbol(v) for v in variables}
parsed = parse_expr(self.expr, local_dict=syms)
self.syms_used = [str(s) for s in parsed.free_symbols if str(s) in variables]
self.compiled_func = sp.lambdify([sp.Symbol(v) for v in self.syms_used], parsed, modules="math")
@dataclass
class AXLProblemDef:
name:str; description:str; axioms:Set[str]; variables:List[Dict]; constraints:List[Dict]
scopes:List[Dict]; anchors:List[AXLInvariant]; observations:Dict[str,float]=field(default_factory=dict)
class AXLtoPSLCompiler:
def compile(self,prob:AXLProblemDef) -> str:
lines=[f"SPACE base_{prob.name}"]
for v in prob.variables:
lo,hi=v["lo"],v["hi"]
if v["name"] in prob.observations: val=prob.observations[v["name"]]; lo=hi=val
lines.append(f"VAR {v['name']} {lo} {hi}")
for c in prob.constraints:
wt=c.get("weight",1.0)
if c["kind"]=="CONSERVE": lines.append(f"CONSERVE {c['expr']}")
elif c["kind"]=="BRANCH":
lines.append(f"BRANCH {c.get('name','br')}")
for opt in c.get("options",[]): lines.append(f" OPTION {opt}")
lines.append("END BRANCH")
else: lines.append(f"{c['kind']} {c['expr']} WEIGHT {wt}")
for sc in prob.scopes:
lines.append(f"SCOPE {sc['name']} ORDER {sc.get('order',0)}")
for c in sc.get("constraints",[]):
lines.append(f" {c['kind']} {c['expr']} WEIGHT {c.get('weight',1.0)}")
lines.append("END")
return "\n".join(lines)
AXL_COMPILER=AXLtoPSLCompiler()
# ══════════════════════════════════════════════════════════════════════
# SECTION 4: PROBLEM COMPILATION
# ══════════════════════════════════════════════════════════════════════
@dataclass
class Constraint:
kind:str; expr:str; direction:str; weight:float=1.0; scope:str="root"
branches:List[str]=field(default_factory=list)
@dataclass
class MathConstraint:
kind:str; expr_str:str; direction:str; weight:float=1.0
fast_iv:Optional[Callable]=field(default=None,repr=False)
fast_pt:Optional[Callable]=field(default=None,repr=False)
torch_func:Optional[Callable]=field(default=None,repr=False)
syms_used:List[str]=field(default_factory=list)
parsed:Optional[sp.Expr]=field(default=None,repr=False)
scope:str="root"
branches:List['MathConstraint']=field(default_factory=list)
projections:Dict[str,List[Dict]]=field(default_factory=dict)
def compile_mc(kind,expr_str,direction,variables,weight=1.0,scope="root",branches=None):
mc=MathConstraint(kind=kind,expr_str=expr_str,direction=direction,weight=weight,scope=scope)
if kind=="or_eq" and branches:
for b_str in branches:
b_mc=compile_mc("equality",b_str,"eq",variables,weight,scope)
mc.branches.append(b_mc); mc.syms_used.extend(b_mc.syms_used)
mc.syms_used=list(dict.fromkeys(mc.syms_used))
def _or_iv(box,_mcs=mc.branches):
rivs=[]
for b in _mcs:
if b.fast_iv:
try: rivs.append(b.fast_iv(box))
except: pass
if not rivs: return IV(-1e18,1e18)
return IV(min(r.lo for r in rivs),max(r.hi for r in rivs))
mc.fast_iv=_or_iv; return mc
syms={v:sp.Symbol(v) for v in variables}
try:
parsed=parse_expr(expr_str,local_dict=syms); mc.parsed=parsed
mc.syms_used=[v for v in variables if sp.Symbol(v) in parsed.free_symbols]
mc.fast_iv=compile_iv(parsed,variables)
mc.fast_pt=sp.lambdify([sp.Symbol(v) for v in mc.syms_used],parsed,modules="math")
pt_map={'sin':torch.sin,'cos':torch.cos,'exp':torch.exp,'sqrt':torch.sqrt,'Abs':torch.abs}
t_func_raw=sp.lambdify([sp.Symbol(v) for v in mc.syms_used],parsed,modules=[pt_map,"math"])
def _t_wrapper(*args):
val=t_func_raw(*args)
return torch.tensor(val,device=DEVICE,dtype=torch.float32) if isinstance(val,(int,float)) else val
mc.torch_func=_t_wrapper
if kind=="equality":
if expr_str not in PROJECTION_CACHE:
pm={}
for sym in parsed.free_symbols:
v_str=str(sym)
try:
sols=sp.solve(parsed,sym); pm[v_str]=[]
for sol in sols:
fs=list(sol.free_symbols)
pm[v_str].append({"syms":[str(s) for s in fs],
"func":sp.lambdify(fs,sol,modules="math")})
except: pass
PROJECTION_CACHE[expr_str]=pm
mc.projections=PROJECTION_CACHE.get(expr_str,{})
except: pass
return mc
def _extract_ordering_pair(mc: MathConstraint, variables: List[str]) -> Optional[Tuple[str,str]]:
"""
SYSTEM 21.0 FIX: Correctly extract (v_small, v_large) from a GEQ constraint
by reading coefficient signs rather than relying on sympy symbol ordering.
For expr "t2-t1 >= 0": t2 has coeff +1 (v_large), t1 has coeff -1 (v_small)
For expr "t1-t2 >= 0": t1 has coeff +1 (v_large), t2 has coeff -1 (v_small)
Returns (v_small, v_large) or None if not a simple difference constraint.
"""
if mc.kind != "inequality" or mc.direction != "geq": return None
if mc.parsed is None: return None
parsed = mc.parsed
sym_vars = {v: sp.Symbol(v) for v in variables}
# Must involve exactly 2 variables
free = [str(s) for s in parsed.free_symbols if str(s) in variables]
if len(free) != 2: return None
# Extract linear coefficients: parsed = c0 + c1*v1 + c2*v2
# For the ordering to be v_large >= v_small, we need:
# parsed = v_large - v_small >= 0
# so v_large has positive coeff, v_small has negative coeff
v0, v1 = free[0], free[1]
try:
c0 = float(parsed.coeff(sp.Symbol(v0)))
c1 = float(parsed.coeff(sp.Symbol(v1)))
except Exception:
return None
# Need one positive and one negative coefficient for a pure ordering constraint
if c0 > 0 and c1 < 0:
return (v1, v0) # v1 is small (negative coeff), v0 is large (positive coeff)
elif c1 > 0 and c0 < 0:
return (v0, v1) # v0 is small (negative coeff), v1 is large (positive coeff)
return None
@dataclass
class Problem:
pid:str; variables:List[str]; constraints:List[Constraint]
bounds:Dict[str,Tuple[float,float]]
scope_groups:Dict[str,List[int]]=field(default_factory=dict)
scope_vars:Dict[str,List[str]]=field(default_factory=dict)
scope_order:List[str]=field(default_factory=list)
bilinear_pairs: List[Tuple[str,str]] = field(default_factory=list)
monotone_targets: List[Tuple[str,str,float]] = field(default_factory=list)
ordering_pairs: List[Tuple[str,str]] = field(default_factory=list) # NEW 21.0
def __post_init__(self):
self.compiled_constraints=[
compile_mc(c.kind,c.expr,c.direction,self.variables,c.weight,c.scope,c.branches)
for c in self.constraints]
self.var_idx={v:i for i,v in enumerate(self.variables)}
# SYSTEM 21.0: Extract ordering pairs with correct sign detection
for mc in self.compiled_constraints:
pair = _extract_ordering_pair(mc, self.variables)
if pair and pair not in self.ordering_pairs:
self.ordering_pairs.append(pair)
# Bilinear pairs from equality constraints
for mc in self.compiled_constraints:
if mc.parsed and mc.parsed.is_Add:
for term in mc.parsed.args:
if term.is_Mul:
syms_in = [str(s) for s in term.free_symbols if str(s) in self.variables]
if len(syms_in) == 2:
va, vb = syms_in
if (va, vb) not in self.bilinear_pairs and (vb, va) not in self.bilinear_pairs:
self.bilinear_pairs.append((va, vb))
# Monotone targets: bilinear products that also appear in ordering
for mc in self.compiled_constraints:
if mc.kind == "equality" and mc.parsed is not None:
for va, vb in self.bilinear_pairs:
sa, sb = sp.Symbol(va), sp.Symbol(vb)
if sa in mc.parsed.free_symbols and sb in mc.parsed.free_symbols:
try:
# Extract constant k from: va*vb = k (i.e. va*vb - k = 0)
k_expr = -(mc.parsed - sa*sb)
k = float(k_expr.evalf())
if k > 0:
# Check which ordering applies (21.0 fix: use ordering_pairs)
for vs, vl in self.ordering_pairs:
if set([va, vb]) == set([vs, vl]):
target = (vs, vl, k)
if target not in self.monotone_targets:
self.monotone_targets.append(target)
except Exception:
pass
def tensor_energy(self,X:torch.Tensor) -> torch.Tensor:
is_batched=(X.dim()==2)
total=torch.zeros(X.shape[0] if is_batched else 1,device=DEVICE,dtype=torch.float32)
for mc in self.compiled_constraints:
if getattr(mc,'weight',1.0)==0.0: continue
if mc.kind=="or_eq":
b_vals=[]
for bmc in mc.branches:
if bmc.torch_func:
args=[X[:,self.var_idx[v]] if is_batched else X[self.var_idx[v]] for v in bmc.syms_used]
b_vals.append(torch.abs(bmc.torch_func(*args)))
if b_vals: total+=torch.stack(b_vals,dim=0).min(dim=0)[0]*mc.weight
else:
if mc.torch_func is None: continue
args=[X[:,self.var_idx[v]] if is_batched else X[self.var_idx[v]] for v in mc.syms_used]
val=mc.torch_func(*args)
if mc.kind=="equality": total+=torch.abs(val)*mc.weight
elif mc.direction=="geq": total+=torch.relu(-val)*mc.weight
else: total+=torch.relu(val)*mc.weight
for i,v in enumerate(self.variables):
lo,hi=self.bounds[v]
col=X[:,i] if is_batched else X[i]
total+=torch.relu(lo-col)+torch.relu(col-hi)
return total.squeeze()
def scalar_energy(self,b:Dict[str,float]) -> float:
total=0.0
for mc in self.compiled_constraints:
if getattr(mc,'weight',1.0)==0.0: continue
if mc.kind=="or_eq":
vals=[]
for bmc in mc.branches:
if bmc.fast_pt:
try: vals.append(abs(float(bmc.fast_pt(*[b.get(v,0.0) for v in bmc.syms_used]))))
except: pass
total+=(min(vals)*mc.weight) if vals else 1.0
else:
if mc.fast_pt is None: total+=1.0; continue
try:
val=float(mc.fast_pt(*[b.get(v,0.0) for v in mc.syms_used]))
if mc.kind=="equality": total+=abs(val)*mc.weight
elif mc.direction=="geq": total+=max(0.0,-val)*mc.weight
else: total+=max(0.0,val)*mc.weight
except: total+=1.0
for v,(lo,hi) in self.bounds.items():
bv=b.get(v,0.0); total+=max(0.0,lo-bv)+max(0.0,bv-hi)
return total
# ══════════════════════════════════════════════════════════════════════
# SECTION 5: BATCHED MASKED DEDUCTION
# ══════════════════════════════════════════════════════════════════════
@dataclass
class L9Certificate:
residual_ce:float; dominant_vars:List[str]; dominant_exprs:List[str]
tension_class:str="unknown"
def _batched_deduce_and_evaluate(
problem:Problem,
hyps:List['Hypothesis']) -> List[Tuple[Dict,float,List[str],str]]:
if not hyps: return []
B=len(hyps); V=len(problem.variables)
x_data = []
mask_data = []
target_data = []
for hyp in hyps:
xr, mr, tr = [], [], []
for v in problem.variables:
if v in hyp.pinned_vars:
val = hyp.pinned_vars[v]
xr.append(val); mr.append(0.0); tr.append(val)
else:
val = hyp.binding.get(v, (problem.bounds[v][0]+problem.bounds[v][1])/2)
xr.append(val); mr.append(1.0); tr.append(0.0)
x_data.append(xr); mask_data.append(mr); target_data.append(tr)
X = torch.tensor(x_data, device=DEVICE, dtype=torch.float32, requires_grad=True)
mask = torch.tensor(mask_data, device=DEVICE, dtype=torch.float32)
target = torch.tensor(target_data, device=DEVICE, dtype=torch.float32)
optimizer=torch.optim.Adam([X],lr=0.05)
for _ in range(DEDUCE_ADAM_STEPS):
optimizer.zero_grad()
ce=problem.tensor_energy(X)
if isinstance(ce,torch.Tensor) and (ce<SOLVE_THRESHOLD).all(): break
loss=ce.sum(); loss.backward()
with torch.no_grad():
X.grad*=mask
optimizer.step()
X.data=torch.where(mask==0.0,target,X.data)
for j,v in enumerate(problem.variables):
lo,hi=problem.bounds[v]
X.data[:,j]=torch.clamp(X.data[:,j],lo,hi)
optimizer.zero_grad()
final_ce=problem.tensor_energy(X)
final_ce.sum().backward()
ce_vals = final_ce.detach().cpu().numpy()
X_vals = X.detach().cpu().numpy()
grads_abs = X.grad.abs()
_, topk_idx = torch.topk(grads_abs, k=min(3, V), dim=1)
topk_idx_np = topk_idx.cpu().numpy()
grads_np = grads_abs.cpu().numpy()
results=[]
for i in range(B):
final_b={problem.variables[j]:float(X_vals[i,j]) for j in range(V)}
dom_vars = [problem.variables[idx] for idx in topk_idx_np[i] if float(grads_np[i, idx]) > 1e-4]
n_dom = len(dom_vars)
if n_dom<=1: tension_class="isolated"
elif n_dom==2: tension_class="relational"
else: tension_class="systemic"
results.append((final_b,float(ce_vals[i]),dom_vars,tension_class))
return results
def _mprt_sample(problem,work_box,N):
var_list=problem.variables; V=len(var_list)
lo_t=torch.tensor([max(problem.bounds.get(v,(-10.0,10.0))[0],
work_box[v].lo if v in work_box else problem.bounds.get(v,(-10,10))[0])
for v in var_list],device=DEVICE,dtype=torch.float32)
hi_t=torch.tensor([min(problem.bounds.get(v,(-10.0,10.0))[1],
work_box[v].hi if v in work_box else problem.bounds.get(v,(-10,10))[1])
for v in var_list],device=DEVICE,dtype=torch.float32)
for i in range(V):
if lo_t[i]>=hi_t[i]: m=(lo_t[i]+hi_t[i])/2; lo_t[i]=m-1e-6; hi_t[i]=m+1e-6
X=lo_t.unsqueeze(0)+(hi_t-lo_t).unsqueeze(0)*torch.rand((N,V),device=DEVICE)
ce_batch=problem.tensor_energy(X)
best_idx=torch.argmin(ce_batch).item()
return {v:float(X[best_idx,i].item()) for i,v in enumerate(var_list)},ce_batch[best_idx].item()
# ══════════════════════════════════════════════════════════════════════
# SECTION 6: AXIOMS
# ══════════════════════════════════════════════════════════════════════
class Axiom:
CONTINUOUS="CONTINUOUS"; DISCRETE="DISCRETE"; QUADRATIC="QUADRATIC"
BILINEAR="BILINEAR"; METRIC="METRIC"; SYMMETRIC="SYMMETRIC"
ORDERED="ORDERED"; MONOTONE="MONOTONE"; CONVEX="CONVEX"
MONOTONE_PRODUCT="MONOTONE_PRODUCT"
CONSERVED="CONSERVED"; MUTABLE="MUTABLE"; NETWORK="NETWORK"
EQUILIBRIUM="EQUILIBRIUM"; SUPERPOSITION="SUPERPOSITION"
LOCALITY="LOCALITY"; EXTREMAL="EXTREMAL"; ENTROPY="ENTROPY"
INJECTIVE="INJECTIVE"; SURJECTIVE="SURJECTIVE"; TRANSITIVE="TRANSITIVE"
ATOMIC="ATOMIC"; IMPLICATION="IMPLICATION"; NEGATION="NEGATION"
COMPOSITE="COMPOSITE"; HOLISM="HOLISM"; PARSIMONY="PARSIMONY"; DUALITY="DUALITY"
ALL_AXIOMS=[getattr(Axiom,a) for a in dir(Axiom) if not a.startswith("__")]
AXIOM_ABBR={a:a[:3] for a in ALL_AXIOMS}; AXIOM_ABBR[Axiom.MONOTONE_PRODUCT]="MPR"
ADJACENT_CONTRADICTIONS:List[Tuple[str,str]]=[
(Axiom.DISCRETE,Axiom.CONTINUOUS),(Axiom.INJECTIVE,Axiom.SURJECTIVE),
(Axiom.ORDERED,Axiom.SYMMETRIC),(Axiom.ATOMIC,Axiom.COMPOSITE),
(Axiom.CONSERVED,Axiom.MUTABLE),(Axiom.PARSIMONY,Axiom.ENTROPY),
(Axiom.HOLISM,Axiom.LOCALITY),(Axiom.EXTREMAL,Axiom.ENTROPY),
(Axiom.NEGATION,Axiom.IMPLICATION),(Axiom.MONOTONE_PRODUCT,Axiom.SYMMETRIC),
]
_CONTRA_SET={frozenset(p) for p in ADJACENT_CONTRADICTIONS}
def sequence_valid(seq:List[str]) -> bool:
for i in range(len(seq)-1):
if seq[i]==seq[i+1]: return False
if frozenset([seq[i],seq[i+1]]) in _CONTRA_SET: return False
return True
# ══════════════════════════════════════════════════════════════════════
# SECTION 7: SHARED DATACLASSES
# ══════════════════════════════════════════════════════════════════════
RAY_TYPES=["seed","branch","tension","scout","recombine","invert","target"]
@dataclass
class Hypothesis:
hid:str; binding:Dict[str,float]; h_type:str; claim:str
derivation:List[str]; pinned_vars:Dict[str,float]; free_vars:List[str]
confidence:float; ce:float=float('inf')
@dataclass
class Baton:
binding:Dict[str,float]; ce:float
ray_id:str=""; depth:int=0; l9:Optional[L9Certificate]=None
@dataclass
class AxiomRay:
sequence:List[str]; ray_id:str; ray_type:str="seed"
depth:int=0; parent_id:Optional[str]=None
baton:Optional[Baton]=None; ce_prior:float=float('inf')
@property
def name(self) -> str:
return "→".join(AXIOM_ABBR.get(a,a[:3]) for a in self.sequence)
def extend(self,axiom:str,rtype:str,new_prior:float=float('inf')) -> Optional['AxiomRay']:
new_seq=self.sequence+[axiom]
if sequence_valid(new_seq):
return AxiomRay(sequence=new_seq,
ray_id=f"{self.ray_id}+{AXIOM_ABBR.get(axiom,'?')}",
ray_type=rtype,depth=self.depth+1,
parent_id=self.ray_id,ce_prior=new_prior)
return None
@dataclass
class HypothesisTrace:
hid:str; round_idx:int; ray_name:str; ray_type:str
ce_before:float; ce_after:float
survived:bool; elapsed_ms:float
g1_pass:bool=False; g2_pass:bool=False
binding:Dict[str,float]=field(default_factory=dict)
invariant_results:Dict[str,Tuple[float,bool]]=field(default_factory=dict)
tension_class:str="unknown"
depth:int=0
@dataclass
class StructuralModel:
best_binding:Dict[str,float]; best_ce:float
failure_patterns:Set[str]=field(default_factory=set)
resonant_pairs:List[Tuple[str,str]]=field(default_factory=list)
# ══════════════════════════════════════════════════════════════════════
# SECTION 8: HYPOTHESIS CONSTRUCTORS
# ══════════════════════════════════════════════════════════════════════
def _make_hyp(hid,binding,h_type,claim,derivation,pinned,free,conf):
return Hypothesis(hid=hid,binding=binding,h_type=h_type,claim=claim,
derivation=derivation,pinned_vars=pinned,free_vars=free,confidence=conf)
def _hyp_continuous(p,bt,a,m):
return [_make_hyp("cnt",bt.binding,"continuous","Manifold",[],{},p.variables,0.45)]
def _hyp_discrete(p,bt,a,m):
b=dict(bt.binding); pinned={}
for sn in p.scope_order:
svars=p.scope_vars.get(sn,[]); smcs=[p.compiled_constraints[i] for i in p.scope_groups.get(sn,[])]
if svars and smcs:
box={v:IV(b.get(v,0)-1,b.get(v,0)+1) for v in svars if v in p.bounds}
cont=_hc4(box,smcs)
if cont:
for v,iv in cont.items(): b[v]=iv.mid(); (pinned.update({v:b[v]}) if iv.width()<1e-2 else None)
return [_make_hyp("dis",b,"discrete","TopoScope",[],pinned,p.variables,0.80)]
def _hyp_quadratic(p,bt,a,m):
hyps=[]; b=dict(bt.binding)
if bt.l9 and bt.l9.dominant_vars:
for v in bt.l9.dominant_vars[:2]:
hyps.append(_make_hyp(f"qua_{v}",b,"quadratic",f"Root({v})",[],{v:b[v]},p.variables,0.70))
return hyps
def _hyp_bilinear(p,bt,a,m):
hyps=[]; b=dict(bt.binding)
for vi, vj in p.bilinear_pairs:
lo_i,hi_i=p.bounds.get(vi,(0,10)); lo_j,hi_j=p.bounds.get(vj,(0,10))
product=abs(b.get(vi,1)*b.get(vj,1))
if product<=0: continue
gm=math.sqrt(product)
if lo_i<=gm<=hi_i and lo_j<=gm<=hi_j:
nb=dict(b); nb[vi]=nb[vj]=gm
hyps.append(_make_hyp(f"bil_eq_{vi}_{vj}",nb,"bilinear",
f"GeoMean({vi}={vj}={gm:.3f})",["bilinear"],{vi:gm,vj:gm},
[v for v in p.variables if v not in [vi,vj]],0.70))
for ratio in [0.4, 0.7071, 0.9]:
vs=gm*ratio; vl=product/(vs+1e-9)
if lo_i<=vs<=hi_i and lo_j<=vl<=hi_j and vs<vl:
nb2=dict(b); nb2[vi]=vs; nb2[vj]=vl
hyps.append(_make_hyp(f"bil_asc_{int(ratio*100)}_{vi}_{vj}",nb2,"bilinear",
f"BilAsc({vi}={vs:.3f}<{vj}={vl:.3f})",["bilinear"],{vi:vs,vj:vl},
[v for v in p.variables if v not in [vi,vj]],0.75))
if lo_i<=vl<=hi_i and lo_j<=vs<=hi_j and vl>vs:
nb3=dict(b); nb3[vi]=vl; nb3[vj]=vs
hyps.append(_make_hyp(f"bil_desc_{int(ratio*100)}_{vi}_{vj}",nb3,"bilinear",
f"BilDesc({vi}={vl:.3f}>{vj}={vs:.3f})",["bilinear"],{vi:vl,vj:vs},
[v for v in p.variables if v not in [vi,vj]],0.75))
return hyps
def _hyp_monotone_product(p,bt,a,m):
"""
SYSTEM 21.0 FIX: Uses ordering_pairs with correct sign detection.
Previously: relied on mc.syms_used[0/1] which is alphabetical sympy ordering.
Now: uses p.ordering_pairs which is extracted via _extract_ordering_pair,
reading the actual coefficient sign from the parsed expression.
For BilinearChain6 "GEQ t2-t1":
- Old code: syms_used=['t1','t2'] (alphabetical), took syms_used[0]='t1' as v_small ✓ (coincidence)
- Old code: syms_used=['t2','t1'] order could vary → BUG
- New code: coeff(t2)=+1 → v_large=t2, coeff(t1)=-1 → v_small=t1 ✓ (always correct)
"""
hyps=[]; b=dict(bt.binding)
if not p.monotone_targets: return hyps
tb=_global_hc4_tighten_bounds(p)
for v_small, v_large, k in p.monotone_targets:
lo_s,hi_s=p.bounds.get(v_small,(-1e18,1e18))
lo_l,hi_l=p.bounds.get(v_large,(-1e18,1e18))
for ratio in [0.1,0.2,0.3,0.4,0.5,0.6,0.707,0.8,0.9,0.95]:
vs_sq=k*ratio
if vs_sq<=0: continue
vs=math.sqrt(vs_sq); vl=k/vs
if not(lo_s<=vs<=hi_s and lo_l<=vl<=hi_l and vs<=vl): continue
nb=dict(b); nb[v_small]=vs; nb[v_large]=vl
box={u:IV(*tb.get(u,p.bounds.get(u,(-10,10)))) for u in p.variables}
box[v_small]=IV(vs-1e-6,vs+1e-6)
box[v_large]=IV(vl-1e-6,vl+1e-6)
cont=_hc4(box,p.compiled_constraints)
pinned={v_small:vs,v_large:vl}
if cont:
for u,iv in cont.items():
if u in p.bounds: nb[u]=iv.mid()
pinned={u:nb[u] for u in p.variables if cont[u].width()<1e-2}
hyps.append(_make_hyp(
f"mpr_{v_small}_{v_large}_r{int(ratio*100)}",
nb,"monotone_product",
f"MonoProd({v_small}={vs:.4f}{v_large}={vl:.4f})",
["ordered","hc4"],pinned,
[u for u in p.variables if u not in pinned],0.88))
return hyps
def _hyp_metric(p,bt,a,m): return [_make_hyp("met",bt.binding,"metric","Radial",[],{},p.variables,0.75)]
def _hyp_symmetric(p,bt,a,m):
hyps=[]; b=dict(bt.binding); vl=p.variables
if len(vl)>=2:
mid=(b.get(vl[0],0)+b.get(vl[1],0))/2; nb=dict(b); nb[vl[0]]=nb[vl[1]]=mid
hyps.append(_make_hyp("sym",nb,"symmetric","AveragePin",[],{vl[0]:mid,vl[1]:mid},p.variables,0.60))
return hyps
def _hyp_ordered(p,bt,a,m): return [_make_hyp("ord",bt.binding,"ordered","OrderBal",[],{},p.variables,0.67)]
def _hyp_monotone(p,bt,a,m): return [_make_hyp("mon",bt.binding,"monotone","MonoPath",[],{},p.variables,0.67)]
def _hyp_convex(p,bt,a,m): return [_make_hyp("cvx",bt.binding,"convex","ConvMix",[],{},p.variables,0.63)]
def _hyp_conserved(p,bt,a,m): return [_make_hyp("csv",bt.binding,"conserved","Conserve",[],{},p.variables,0.82)]
def _hyp_mutable(p,bt,a,m):
b=dict(bt.binding)
if bt.l9 and bt.l9.dominant_vars:
v=bt.l9.dominant_vars[0]; lo,hi=p.bounds.get(v,(-10,10))
b[v]=max(lo,min(hi,b.get(v,0)+random.gauss(0,(hi-lo)*0.05)))
return [_make_hyp("mut",b,"mutable","Perturb",[],{},p.variables,0.40)]
def _hyp_network(p,bt,a,m): return [_make_hyp("net",bt.binding,"network","HubProp",[],{},p.variables,0.65)]
def _hyp_equilibrium(p,bt,a,m): return [_make_hyp("eql",bt.binding,"equilibrium","GradBal",[],{},p.variables,0.72)]
def _hyp_superposition(p,bt,a,m):
if a and len(a)>0:
other=random.choice(a); lam=0.5
nb={v:lam*bt.binding.get(v,0)+(1-lam)*other.binding.get(v,0) for v in p.variables}
return [_make_hyp("sup",nb,"superposition","Superpose",[],{},p.variables,0.58)]
return []
def _hyp_locality(p,bt,a,m): return [_make_hyp("loc",bt.binding,"locality","LocalFirst",[],{},p.variables,0.72)]
def _hyp_extremal(p,bt,a,m):
hyps=[]; b=dict(bt.binding)
if bt.l9 and bt.l9.dominant_vars:
v=bt.l9.dominant_vars[0]; lo,hi=p.bounds.get(v,(0,1))
for val in [lo,hi,(lo+hi)/2]:
nb=dict(b); nb[v]=val
hyps.append(_make_hyp(f"ext_{v}_{val:.3f}",nb,"extremal",
f"PinBound({v}={val:.3f})",[],{v:val},p.variables,0.65))
return hyps
def _hyp_entropy(p,bt,a,m): return [_make_hyp("ent",{v:random.uniform(*p.bounds[v]) for v in p.variables},"entropy","MaxEnt",[],{},p.variables,0.48)]
def _hyp_injective(p,bt,a,m): return [_make_hyp("inj",bt.binding,"injective","Distinct",[],{},p.variables,0.50)]
def _hyp_surjective(p,bt,a,m): return [_make_hyp("sur",bt.binding,"surjective","Sweep",[],{},p.variables,0.45)]
def _hyp_transitive(p,bt,a,m): return [_make_hyp("tra",bt.binding,"transitive","TransSnap",[],{},p.variables,0.77)]
def _hyp_atomic(p,bt,a,m):
hyps=[]; b=dict(bt.binding)
for mc in p.compiled_constraints:
if not mc.projections: continue
for v,projs in mc.projections.items():
for proj in projs:
try:
val=float(proj["func"](*[b.get(s,0.0) for s in proj["syms"]]))
lo,hi=p.bounds.get(v,(-1e18,1e18))
if math.isfinite(val) and lo<=val<=hi:
nb=dict(b); nb[v]=val
hyps.append(_make_hyp(f"ato_{v}",nb,"atomic",f"Solve({v})",[],{v:val},p.variables,0.70))
break
except: pass
if hyps: break
return hyps
def _hyp_implication(p,bt,a,m): return [_make_hyp("imp",bt.binding,"implication","Imply",[],{},p.variables,0.78)]
def _hyp_negation(p,bt,a,m):
nb={v:max(p.bounds[v][0],min(p.bounds[v][1],(p.bounds[v][0]+p.bounds[v][1])-bt.binding.get(v,(p.bounds[v][0]+p.bounds[v][1])/2))) for v in p.variables}
return [_make_hyp("neg",nb,"negation","Mirror",[],{},p.variables,0.42)]
def _hyp_composite(p,bt,a,m): return [_make_hyp("cmp",bt.binding,"composite","Enz",[],{},p.variables,0.90)]
def _hyp_holism(p,bt,a,m): return [_make_hyp("hol",bt.binding,"holism","Global",[],{},p.variables,0.73)]
def _hyp_parsimony(p,bt,a,m):
b=dict(bt.binding); nb={}
max_val=max((abs(v) for v in b.values()),default=1.0)
for v in p.variables:
val=b[v]; lo,hi=p.bounds[v]
if abs(val)<0.1*max_val and lo<=0.0<=hi: nb[v]=0.0
else:
candidates=[round(val*m)/m for m in [1,2,4] if lo<=round(val*m)/m<=hi]
nb[v]=min(candidates,key=lambda c:abs(c-val)) if candidates else val
return [_make_hyp("par",nb,"parsimony","Occam",[],{},p.variables,0.58)]
def _hyp_duality(p,bt,a,m): return [_make_hyp("dua",bt.binding,"duality","Tight",[],{},p.variables,0.68)]
AXIOM_CONSTRUCTORS={
Axiom.CONTINUOUS:_hyp_continuous, Axiom.DISCRETE:_hyp_discrete,
Axiom.QUADRATIC:_hyp_quadratic, Axiom.BILINEAR:_hyp_bilinear,
Axiom.METRIC:_hyp_metric, Axiom.SYMMETRIC:_hyp_symmetric,
Axiom.ORDERED:_hyp_ordered, Axiom.MONOTONE:_hyp_monotone,
Axiom.CONVEX:_hyp_convex, Axiom.MONOTONE_PRODUCT:_hyp_monotone_product,
Axiom.CONSERVED:_hyp_conserved, Axiom.MUTABLE:_hyp_mutable,
Axiom.NETWORK:_hyp_network, Axiom.EQUILIBRIUM:_hyp_equilibrium,
Axiom.SUPERPOSITION:_hyp_superposition, Axiom.LOCALITY:_hyp_locality,
Axiom.EXTREMAL:_hyp_extremal, Axiom.ENTROPY:_hyp_entropy,
Axiom.INJECTIVE:_hyp_injective, Axiom.SURJECTIVE:_hyp_surjective,
Axiom.TRANSITIVE:_hyp_transitive, Axiom.ATOMIC:_hyp_atomic,
Axiom.IMPLICATION:_hyp_implication, Axiom.NEGATION:_hyp_negation,
Axiom.COMPOSITE:_hyp_composite, Axiom.HOLISM:_hyp_holism,
Axiom.PARSIMONY:_hyp_parsimony, Axiom.DUALITY:_hyp_duality,
}
# ══════════════════════════════════════════════════════════════════════
# SECTION 9: CREATIVE SEEDER
# ══════════════════════════════════════════════════════════════════════
class CreativeSeeder:
def target_seeds(self,anchors:List[AXLInvariant]) -> List[AxiomRay]:
seeds=[]
for a in anchors:
if "*" in a.expr and "**" not in a.expr:
seeds.append(AxiomRay([Axiom.BILINEAR],f"TGT_BIL_{a.name}","target"))
seeds.append(AxiomRay([Axiom.MONOTONE_PRODUCT],f"TGT_MPR_{a.name}","target"))
elif "**2" in a.expr:
seeds.append(AxiomRay([Axiom.QUADRATIC],f"TGT_QUA_{a.name}","target"))
seeds.append(AxiomRay([Axiom.METRIC],f"TGT_MET_{a.name}","target"))
elif "+" in a.expr:
seeds.append(AxiomRay([Axiom.CONSERVED],f"TGT_CSV_{a.name}","target"))
return seeds
def intelligent_branch(self,ray,out_baton,remaining_axioms,branch_width) -> List[AxiomRay]:
dom_vars=out_baton.l9.dominant_vars if out_baton.l9 else []
tension_class=out_baton.l9.tension_class if out_baton.l9 else "unknown"
n_dom=len(dom_vars)
# SYSTEM 21.0: Scale branch width for systemic high-CE problems
effective_width = branch_width
if tension_class == "systemic" and out_baton.ce > 0.1:
effective_width = BRANCH_WIDTH_SYSTEMIC
isolated ={Axiom.ATOMIC,Axiom.EXTREMAL,Axiom.MUTABLE,Axiom.DUALITY,Axiom.LOCALITY}
relational ={Axiom.SYMMETRIC,Axiom.BILINEAR,Axiom.MONOTONE_PRODUCT,
Axiom.ORDERED,Axiom.METRIC,Axiom.INJECTIVE}
systemic ={Axiom.CONSERVED,Axiom.NETWORK,Axiom.HOLISM,Axiom.COMPOSITE,Axiom.ENTROPY}
scored=[]
for ax in remaining_axioms:
w=1.0
if n_dom==1 and ax in isolated: w+=5.0
elif n_dom==2 and ax in relational: w+=5.0
elif n_dom>=3 and ax in systemic: w+=5.0
scored.append((w,ax))
scored.sort(key=lambda x:x[0]*random.random(),reverse=True)
children=[]
for _,axiom in scored[:effective_width]:
child=ray.extend(axiom,"branch",new_prior=out_baton.ce)
if child: child.baton=out_baton; children.append(child)
return children
def tension_seeds(self,max_n:int=3000) -> List[AxiomRay]:
results=[]
for (a,c) in ADJACENT_CONTRADICTIONS:
for b1 in ALL_AXIOMS:
if b1 in (a,c) or frozenset([a,b1]) in _CONTRA_SET: continue
for mid in ALL_AXIOMS:
if mid in (a,b1,c) or frozenset([b1,mid]) in _CONTRA_SET: continue
for b2 in ALL_AXIOMS:
if b2 in (a,b1,mid,c): continue
if frozenset([mid,b2]) not in _CONTRA_SET and frozenset([b2,c]) not in _CONTRA_SET:
results.append(AxiomRay([a,b1,mid,b2,c],
f"T5_{AXIOM_ABBR.get(a,'?')}_{AXIOM_ABBR.get(c,'?')}","tension"))
random.shuffle(results); return results[:max_n]
def random_scouts(self,n:int=600) -> List[AxiomRay]:
scouts=[]; attempts=0
while len(scouts)<n and attempts<n*30:
attempts+=1; length=random.randint(3,12)
pool=list(ALL_AXIOMS); random.shuffle(pool); seq=[pool[0]]
for ax in pool[1:]:
if len(seq)>=length: break
if ax!=seq[-1] and frozenset([seq[-1],ax]) not in _CONTRA_SET: seq.append(ax)
if len(seq)>=3:
scouts.append(AxiomRay(seq,f"SC_{''.join(AXIOM_ABBR.get(a,a[:2]) for a in seq)}","scout"))
return scouts
def recombine(self,ray_a,ray_b,ba,bb) -> List[AxiomRay]:
results=[]
for lam in [0.25,0.5,0.75]:
ib={v:lam*ba.binding.get(v,0)+(1-lam)*bb.binding.get(v,0)
for v in set(list(ba.binding.keys())+list(bb.binding.keys()))}
ib_ce=lam*ba.ce+(1-lam)*bb.ce
ib_baton=Baton(binding=ib,ce=ib_ce,ray_id="INTERP")
cut=min(len(ray_a.sequence),len(ray_b.sequence))//2
if cut>0:
seq=ray_a.sequence[:cut]+[x for x in ray_b.sequence[cut:]
if x not in ray_a.sequence[:cut]]
if sequence_valid(seq):
results.append(AxiomRay(seq,f"RC_l{int(lam*100)}","recombine",
baton=ib_baton,ce_prior=ib_ce))
return results
def invert(self,ray,parent_ce) -> Optional[AxiomRay]:
inv=list(reversed(ray.sequence))
if sequence_valid(inv) and inv!=ray.sequence:
return AxiomRay(inv,f"INV_{ray.ray_id}","invert",parent_id=ray.ray_id,ce_prior=parent_ce)
return None
def resonance_extensions(self,ray,parent_ce,resonant_pairs) -> List[AxiomRay]:
if not ray.sequence or not resonant_pairs: return []
last=ray.sequence[-1]; results=[]
nexts=[b for (a,b) in resonant_pairs
if a==last and b!=last and frozenset([a,b]) not in _CONTRA_SET
and b not in ray.sequence]
for b in nexts:
child=ray.extend(b,"branch",new_prior=parent_ce)
if child: child.ray_id=f"RES_{child.ray_id}"; results.append(child)
return results
CREATIVE_SEEDER=CreativeSeeder()
# ══════════════════════════════════════════════════════════════════════
# SECTION 10: VERIFY LAYER
# ══════════════════════════════════════════════════════════════════════
def _verify(binding:Dict[str,float],base_problem:Problem,
anchors:List[AXLInvariant]) -> Tuple[bool,bool,Dict[str,Tuple[float,bool]],float]:
g1_ce=base_problem.scalar_energy(binding)
g1_pass=g1_ce<VERIFY_G1_THRESHOLD
g2={}
for inv in anchors:
try:
if inv.compiled_func:
val = float(inv.compiled_func(*[binding.get(v, 0.0) for v in inv.syms_used]))
else:
val = 999.0
err=abs(val)
if inv.mode=="eq": passed=err<inv.tolerance
elif inv.mode=="geq": passed=val>=-inv.tolerance
elif inv.mode=="leq": passed=val<=inv.tolerance
else: passed=err<inv.tolerance
g2[inv.name]=(round(val if inv.mode in ("geq","leq") else err,6),passed)
except: g2[inv.name]=(999.0,False)
g2_pass=all(v[1] for v in g2.values()) if g2 else True
return g1_pass,g2_pass,g2,round(g1_ce,6)
# ══════════════════════════════════════════════════════════════════════
# SECTION 11: SEQUENCE RAY TRACER
# ══════════════════════════════════════════════════════════════════════
class SequenceRayTracer:
def trace(self,axl_def:AXLProblemDef,base_problem:Problem):
hero_traces:List[HypothesisTrace]=[]
baton_registry:Dict[str,Baton]={}
baton_registry_keys:deque=deque()
successful_rays:List[AxiomRay]=[]
resonant_pairs:List[Tuple[str,str]]=[]
for inv in axl_def.anchors:
if not inv.compiled_func:
inv.compile(base_problem.variables)
init_b,init_ce=_mprt_sample(base_problem,
{v:IV(*base_problem.bounds[v]) for v in base_problem.variables},N_MPRT_EXPLORE)
seed_baton=Baton(binding=init_b,ce=init_ce,ray_id="SEED",
l9=L9Certificate(init_ce,base_problem.variables[:3],[],"systemic"))
queues={
"core":[AxiomRay([a],f"S{i}","seed",ce_prior=init_ce) for i,a in enumerate(ALL_AXIOMS)],
"explore":CREATIVE_SEEDER.tension_seeds(3000)+CREATIVE_SEEDER.random_scouts(600),
"genetic":CREATIVE_SEEDER.target_seeds(axl_def.anchors),
}
for q in queues.values(): random.shuffle(q)
queue_order=["core","explore","genetic"]; q_idx=0
best_ce=init_ce; best_binding=dict(init_b); total_fired=0
model=StructuralModel(best_binding,best_ce)
allosteric_counts={"isolated":0,"relational":0,"systemic":0}
while total_fired<MAX_TOTAL_RAYS:
batch_rays:List[AxiomRay]=[]
for _ in range(RAY_BATCH_SIZE):
start_idx=q_idx
while not queues[queue_order[q_idx]]:
q_idx=(q_idx+1)%3
if q_idx==start_idx: break
if not any(queues[k] for k in queue_order): break
ray=queues[queue_order[q_idx]].pop(0); q_idx=(q_idx+1)%3
batch_rays.append(ray); total_fired+=1
if not batch_rays: break
t0=time.time()
historical_batons=list(baton_registry.values())
batch_hyps:List[Tuple[int,Hypothesis]]=[]
for i,ray in enumerate(batch_rays):
p_baton=ray.baton or seed_baton
constructor=AXIOM_CONSTRUCTORS.get(ray.sequence[-1])
hyps=constructor(base_problem,p_baton,historical_batons,model) if constructor else []
if not hyps: hyps=[_make_hyp("fb",p_baton.binding,"fallback","Pass",[],{},base_problem.variables,0.1)]
batch_hyps.append((i,hyps[0]))
eval_results=_batched_deduce_and_evaluate(base_problem,[h for _,h in batch_hyps])
elapsed_ms=(time.time()-t0)*1000/max(1,len(batch_rays))
solved=False
for (ray_idx,hyp),(final_b,final_ce,dom_vars,tension_class) in zip(batch_hyps,eval_results):
ray=batch_rays[ray_idx]
parent_ce=(ray.baton or seed_baton).ce
allosteric_counts[tension_class]=allosteric_counts.get(tension_class,0)+1
g1_pass,g2_pass,inv_results,g1_ce=_verify(final_b,base_problem,axl_def.anchors)
overall_pass=g1_pass and g2_pass
improved=final_ce<best_ce
if improved or overall_pass or random.random()<0.002:
hero_traces.append(HypothesisTrace(
hid=ray.ray_id,round_idx=total_fired-len(batch_rays)+ray_idx,
ray_name=ray.name,ray_type=ray.ray_type,
ce_before=parent_ce,ce_after=final_ce,
survived=overall_pass,elapsed_ms=elapsed_ms,
g1_pass=g1_pass,g2_pass=g2_pass,
binding={k:round(v,5) for k,v in final_b.items()},
invariant_results=inv_results,
tension_class=tension_class,depth=ray.depth))
out_baton=Baton(binding=final_b,ce=final_ce,ray_id=ray.ray_id,
l9=L9Certificate(final_ce,dom_vars,[],tension_class))
if final_ce<BATON_REGISTRY_THRESHOLD:
if ray.ray_id in baton_registry:
baton_registry[ray.ray_id]=out_baton
else:
if len(baton_registry)>=BATON_REGISTRY_MAX:
old_key=baton_registry_keys.popleft()
baton_registry.pop(old_key,None)
baton_registry[ray.ray_id]=out_baton
baton_registry_keys.append(ray.ray_id)
if improved:
best_ce=final_ce; best_binding=dict(final_b)
successful_rays.append(ray)
model.best_binding=best_binding; model.best_ce=best_ce
if overall_pass: solved=True
if improved and len(ray.sequence)>=2:
pair=(ray.sequence[-2],ray.sequence[-1])
if pair not in resonant_pairs: resonant_pairs.append(pair)
model.resonant_pairs=resonant_pairs
earned=(final_ce<parent_ce*CE_EARN_RATIO or ray.depth<1)
scout_earned=(ray.ray_type=="scout" and final_ce<SCOUT_CE_THRESHOLD)
if earned or scout_earned:
remaining=[a for a in ALL_AXIOMS
if a!=ray.sequence[-1]
and frozenset([ray.sequence[-1],a]) not in _CONTRA_SET
and a not in ray.sequence]
new_children:List[AxiomRay]=[]
new_children.extend(
CREATIVE_SEEDER.intelligent_branch(ray,out_baton,remaining,BRANCH_WIDTH))
inv=CREATIVE_SEEDER.invert(ray,final_ce)
if inv: new_children.append(inv)
if len(successful_rays)>=2:
candidates=[r for r in successful_rays if r.ray_id!=ray.ray_id]
if candidates:
partner=random.choice(candidates)
ba=baton_registry.get(ray.ray_id,out_baton)
bb=baton_registry.get(partner.ray_id)
if bb is not None:
new_children.extend(CREATIVE_SEEDER.recombine(ray,partner,ba,bb)[:3])
new_children.extend(
CREATIVE_SEEDER.resonance_extensions(ray,final_ce,resonant_pairs))
for child in new_children:
if child.ray_type in ("seed","branch","target"): queues["core"].append(child)
elif child.ray_type in ("scout","tension"): queues["explore"].append(child)
elif child.ray_type in ("recombine","invert"): queues["genetic"].append(child)
queues["core"].sort(key=lambda r:r.ce_prior+(r.depth*0.0001))
queues["genetic"].sort(key=lambda r:r.ce_prior+(r.depth*0.0001))
if solved: break
return best_binding,best_ce,hero_traces,total_fired,allosteric_counts
# ══════════════════════════════════════════════════════════════════════
# SECTION 12: AXL PROBLEMS (14 domains)
# ══════════════════════════════════════════════════════════════════════
PROBLEM_META={
"EllipticCurvePoint": {"domain":"Cryptography", "icon":"🔐","difficulty":"Medium"},
"MarkowitzPortfolio": {"domain":"Finance", "icon":"📈","difficulty":"Hard"},
"PowerFlowDC3Bus": {"domain":"Energy", "icon":"⚡","difficulty":"Medium"},
"ChemEquilibrium": {"domain":"Chemistry", "icon":"⚗","difficulty":"Medium"},
"LorenzFixedPoint": {"domain":"Chaos Theory", "icon":"🌀","difficulty":"Easy"},
"NashBattleSexes": {"domain":"Game Theory", "icon":"🎲","difficulty":"Easy"},
"KeplerPeriapsis": {"domain":"Astronomy", "icon":"🪐","difficulty":"Medium"},
"VibratingString3": {"domain":"Structural Eng", "icon":"🏗","difficulty":"Easy"},
"SignalRecovery4": {"domain":"Signal Proc", "icon":"📡","difficulty":"Medium"},
"RobotArm2D": {"domain":"Robotics", "icon":"🦾","difficulty":"Hard"},
"NormManifold4D": {"domain":"Mathematics", "icon":"📐","difficulty":"Medium"},
"BilinearChain6": {"domain":"Mathematics", "icon":"📐","difficulty":"Hard"},
"PhaseCollider": {"domain":"Physics", "icon":"⚛","difficulty":"Medium"},
"MetricPacking3D": {"domain":"Combinatorics", "icon":"🎯","difficulty":"Hard"},
}
AXL_PROBLEMS=[
AXLProblemDef("EllipticCurvePoint","E: y²=x³-x+1, x=0",
{Axiom.CONTINUOUS,Axiom.QUADRATIC,Axiom.METRIC,Axiom.SYMMETRIC},
[{"name":"ex","lo":-2.0,"hi":2.0},{"name":"ey","lo":-2.0,"hi":2.0}],
[{"kind":"EQ","expr":"ey**2 - ex**3 + ex - 1","weight":5.0}],[],
[AXLInvariant("on_curve","ey**2 - ex**3 + ex - 1",1e-3)],{"ex":0.0}),
AXLProblemDef("MarkowitzPortfolio","4-asset portfolio, return=11%",
{Axiom.CONSERVED,Axiom.ORDERED,Axiom.METRIC,Axiom.HOLISM},
[{"name":f"w{i}","lo":0.0,"hi":1.0} for i in range(1,5)],
[{"kind":"CONSERVE","expr":"w1+w2+w3+w4-1.0"},
{"kind":"EQ","expr":"0.10*w1+0.12*w2+0.08*w3+0.15*w4-0.11","weight":3.0}]+
[{"kind":"GEQ","expr":f"w{i}"} for i in range(1,5)],[],
[AXLInvariant("budget","w1+w2+w3+w4-1.0"),
AXLInvariant("ret","0.10*w1+0.12*w2+0.08*w3+0.15*w4-0.11",5e-3)],{}),
AXLProblemDef("PowerFlowDC3Bus","DC 3-bus power flow",
{Axiom.CONSERVED,Axiom.NETWORK,Axiom.EQUILIBRIUM},
[{"name":"pt2","lo":-1.0,"hi":0.5},{"name":"pt3","lo":-0.5,"hi":0.5}],
[{"kind":"EQ","expr":"-2*pt2-3*pt3-0.5","weight":5.0},
{"kind":"EQ","expr":"3*pt2-pt3+1.0","weight":5.0}],[],
[AXLInvariant("bus1","-2*pt2-3*pt3-0.5"),AXLInvariant("bus2","3*pt2-pt3+1.0")],{}),
AXLProblemDef("ChemEquilibrium","A+B↔C, Kc=4",
{Axiom.CONSERVED,Axiom.MONOTONE_PRODUCT,Axiom.BILINEAR},
[{"name":"ca","lo":0.0,"hi":1.0},{"name":"cb","lo":0.0,"hi":1.0},
{"name":"cc","lo":0.0,"hi":1.0}],
[{"kind":"CONSERVE","expr":"ca+cc-1.0"},{"kind":"CONSERVE","expr":"cb+cc-1.0"},
{"kind":"EQ","expr":"4*ca*cb-cc","weight":5.0}],[],
[AXLInvariant("bal_A","ca+cc-1.0"),AXLInvariant("bal_B","cb+cc-1.0"),
AXLInvariant("eq","4*ca*cb-cc")],{}),
AXLProblemDef("LorenzFixedPoint","Lorenz σ=10 ρ=28 β=8/3",
{Axiom.CONTINUOUS,Axiom.QUADRATIC,Axiom.SYMMETRIC},
[{"name":"lx","lo":-12.0,"hi":12.0},{"name":"ly","lo":-12.0,"hi":12.0},
{"name":"lz","lo":20.0,"hi":35.0}],
[{"kind":"EQ","expr":"lx-ly","weight":5.0},{"kind":"EQ","expr":"lz-27.0","weight":5.0},
{"kind":"EQ","expr":"lx**2-72.0","weight":5.0}],[],
[AXLInvariant("x_eq_y","lx-ly"),AXLInvariant("z_27","lz-27.0"),
AXLInvariant("x2_72","lx**2-72.0",0.1)],{}),
AXLProblemDef("NashBattleSexes","Mixed Nash p=2/3 q=1/3",
{Axiom.CONTINUOUS,Axiom.SYMMETRIC},
[{"name":"np","lo":0.0,"hi":1.0},{"name":"nq","lo":0.0,"hi":1.0}],
[{"kind":"EQ","expr":"3*nq-1.0","weight":5.0},
{"kind":"EQ","expr":"3*np-2.0","weight":5.0}],[],
[AXLInvariant("p1","3*nq-1.0"),AXLInvariant("p2","3*np-2.0")],{}),
AXLProblemDef("KeplerPeriapsis","Orbital: a=1 e=0.5",
{Axiom.CONTINUOUS,Axiom.QUADRATIC,Axiom.CONSERVED,Axiom.METRIC},
[{"name":"ka","lo":0.5,"hi":2.0},{"name":"ke","lo":0.0,"hi":0.99},
{"name":"krp","lo":0.1,"hi":2.0},{"name":"kvp","lo":0.5,"hi":5.0},
{"name":"kh","lo":0.1,"hi":3.0}],
[{"kind":"EQ","expr":"krp-ka*(1-ke)","weight":5.0},
{"kind":"EQ","expr":"kvp**2*krp-(1+ke)","weight":5.0},
{"kind":"EQ","expr":"kh-krp*kvp","weight":5.0}],[],
[AXLInvariant("peri","krp-ka*(1-ke)"),AXLInvariant("visviva","kvp**2*krp-(1+ke)"),
AXLInvariant("angmom","kh-krp*kvp")],{"ka":1.0,"ke":0.5}),
AXLProblemDef("VibratingString3","3-node string s1=s3=1.5 s2=2.0",
{Axiom.ORDERED,Axiom.SYMMETRIC,Axiom.METRIC},
[{"name":"s1","lo":0.0,"hi":5.0},{"name":"s2","lo":0.0,"hi":5.0},
{"name":"s3","lo":0.0,"hi":5.0}],
[{"kind":"EQ","expr":"-2*s1+s2+1.0","weight":5.0},
{"kind":"EQ","expr":"s1-2*s2+s3+1.0","weight":5.0},
{"kind":"EQ","expr":"s2-2*s3+1.0","weight":5.0}],[],
[AXLInvariant("n1","-2*s1+s2+1.0"),AXLInvariant("n2","s1-2*s2+s3+1.0"),
AXLInvariant("n3","s2-2*s3+1.0"),AXLInvariant("sym","s1-s3")],{}),
AXLProblemDef("SignalRecovery4","Compressed sensing x=[1,2,0,2]",
{Axiom.CONSERVED,Axiom.PARSIMONY,Axiom.INJECTIVE},
[{"name":f"r{i}","lo":0.0,"hi":5.0} for i in range(1,5)],
[{"kind":"EQ","expr":"r1+r2-3.0","weight":5.0},
{"kind":"EQ","expr":"r1+r3-1.0","weight":5.0},
{"kind":"EQ","expr":"r2+r4-4.0","weight":5.0}],[],
[AXLInvariant("m1","r1+r2-3.0"),AXLInvariant("m2","r1+r3-1.0"),
AXLInvariant("m3","r2+r4-4.0")],{}),
AXLProblemDef("RobotArm2D","2-link arm target=(1.2,0.8)",
{Axiom.CONTINUOUS,Axiom.METRIC,Axiom.QUADRATIC},
[{"name":"t1","lo":-3.15,"hi":3.15},{"name":"t2","lo":-3.15,"hi":3.15}],
[{"kind":"EQ","expr":"cos(t1)+cos(t1+t2)-1.2","weight":5.0},
{"kind":"EQ","expr":"sin(t1)+sin(t1+t2)-0.8","weight":5.0}],[],
[AXLInvariant("rx","cos(t1)+cos(t1+t2)-1.2",1e-2),
AXLInvariant("ry","sin(t1)+sin(t1+t2)-0.8",1e-2),
AXLInvariant("elbow","cos(t2)-0.04",0.05)],{}),
AXLProblemDef("NormManifold4D","Unit sphere zero-sum x1=0.5",
{Axiom.CONTINUOUS,Axiom.QUADRATIC,Axiom.CONSERVED,Axiom.METRIC,Axiom.SYMMETRIC},
[{"name":f"x{i}","lo":-1.0,"hi":1.0} for i in range(1,5)],
[{"kind":"CONSERVE","expr":"x1**2+x2**2+x3**2+x4**2-1.0"},
{"kind":"EQ","expr":"x1+x2+x3+x4","weight":2.0}],[],
[AXLInvariant("sphere","x1**2+x2**2+x3**2+x4**2-1.0"),
AXLInvariant("zsum","x1+x2+x3+x4"),
AXLInvariant("x1pin","x1-0.5")],{"x1":0.5}),
AXLProblemDef("BilinearChain6","6-var ordered chain products 0.5/2.0/4.5",
{Axiom.DISCRETE,Axiom.ORDERED,Axiom.BILINEAR,Axiom.MONOTONE_PRODUCT,Axiom.CONSERVED},
[{"name":f"t{i}","lo":0.0,"hi":3.0} for i in range(1,7)],
[{"kind":"CONSERVE","expr":"t1+t2+t3+t4+t5+t6-9.0"}]+
[{"kind":"GEQ","expr":f"t{i+1}-t{i}"} for i in range(1,6)]+
[{"kind":"EQ","expr":"t1*t2-0.5","weight":2.0},
{"kind":"EQ","expr":"t3*t4-2.0","weight":2.0},
{"kind":"EQ","expr":"t5*t6-4.5","weight":2.0}],[],
[AXLInvariant("con","t1+t2+t3+t4+t5+t6-9.0"),
AXLInvariant("bil_lo","t1*t2-0.5",0.05),
AXLInvariant("bil_mid","t3*t4-2.0",0.05),
AXLInvariant("bil_hi","t5*t6-4.5",0.05),
AXLInvariant("ord","t2-t1",0.5,"geq")],{}),
AXLProblemDef("PhaseCollider","2-particle collision phase branch",
{Axiom.DISCRETE,Axiom.MUTABLE,Axiom.SYMMETRIC,Axiom.CONSERVED},
[{"name":"p1x","lo":-2.0,"hi":2.0},{"name":"p2x","lo":-2.0,"hi":2.0},
{"name":"p1y","lo":-2.0,"hi":2.0},{"name":"p2y","lo":-2.0,"hi":2.0}],
[{"kind":"CONSERVE","expr":"p1x+p2x"},{"kind":"CONSERVE","expr":"p1y+p2y"}],
[{"name":"phase","order":0,"constraints":[
{"kind":"EQ","expr":"p1x+p2x","weight":3.0},
{"kind":"EQ","expr":"p1y+p2y","weight":3.0}]}],
[AXLInvariant("mx","p1x+p2x"),AXLInvariant("my","p1y+p2y"),
AXLInvariant("phase","(p1x**2+p1y**2-0.25)*(p1x**2+p1y**2-1.0)",0.05)],{}),
AXLProblemDef("MetricPacking3D","3 vars centred norm²=6 min sep",
{Axiom.CONTINUOUS,Axiom.METRIC,Axiom.INJECTIVE,Axiom.CONSERVED},
[{"name":"a","lo":-3.0,"hi":3.0},{"name":"b","lo":-3.0,"hi":3.0},
{"name":"c","lo":-3.0,"hi":3.0}],
[{"kind":"CONSERVE","expr":"a+b+c"},
{"kind":"GEQ","expr":"(a-b)**2-1.0","weight":2.0},
{"kind":"GEQ","expr":"(b-c)**2-1.0","weight":2.0},
{"kind":"GEQ","expr":"(a-c)**2-4.0","weight":2.0}],
[{"name":"packing","order":0,"constraints":[
{"kind":"EQ","expr":"a**2+b**2+c**2-6.0","weight":2.0}]}],
[AXLInvariant("cen","a+b+c"),AXLInvariant("norm6","a**2+b**2+c**2-6.0",0.05),
AXLInvariant("sab","(a-b)**2-1.0",0.0,"geq"),
AXLInvariant("sbc","(b-c)**2-1.0",0.0,"geq")],{}),
]
def build_base_problem(axl_def:AXLProblemDef) -> Problem:
psl_text=AXL_COMPILER.compile(axl_def)
prog=parse_psl(psl_text); ep=expand_psl(prog)
return Problem(pid=f"base_{axl_def.name}",variables=ep.variables,
constraints=ep.constraints,bounds=ep.bounds,
scope_groups=ep.scope_groups,scope_vars=ep.scope_vars,
scope_order=ep.scope_order)
# ══════════════════════════════════════════════════════════════════════
# SECTION 13: GLOBAL STATE
# ══════════════════════════════════════════════════════════════════════
STATE_LOCK = threading.Lock()
IS_RUNNING = False
PROBLEM_STATS:Dict[str,Dict]={p.name:{
"runs":0,"solved":0,"total_ce":0.0,"best_ce":float('inf'),
"best_binding":{},"best_ray":"—",
"type_stats":{t:{"tried":0,"survived":0} for t in RAY_TYPES},
"allosteric":{"isolated":0,"relational":0,"systemic":0},
} for p in AXL_PROBLEMS}
RECENT_RUNS:List[Dict]=[]
MAX_RECENT=30
_PROB_IDX=0
def background_worker():
global IS_RUNNING, _PROB_IDX
tracer=SequenceRayTracer()
while IS_RUNNING:
axl_def=AXL_PROBLEMS[_PROB_IDX%len(AXL_PROBLEMS)]; _PROB_IDX+=1
base_prob=build_base_problem(axl_def)
t0=time.time()
binding,ce,traces,total_fired,allosteric=tracer.trace(axl_def,base_prob)
ms=round((time.time()-t0)*1000)
survived=[t for t in traces if t.survived]
solved=len(survived)>0 and ce<SOLVE_THRESHOLD
best_t=min(survived,key=lambda t:t.ce_after,default=None)
type_counts={t:{"tried":0,"survived":0} for t in RAY_TYPES}
for t in traces:
if t.ray_type in type_counts:
type_counts[t.ray_type]["tried"]+=1
if t.survived: type_counts[t.ray_type]["survived"]+=1
record={
"problem":axl_def.name,
"domain":PROBLEM_META.get(axl_def.name,{}).get("domain",""),
"icon":PROBLEM_META.get(axl_def.name,{}).get("icon","·"),
"difficulty":PROBLEM_META.get(axl_def.name,{}).get("difficulty","?"),
"ce":round(ce,6),"solved":solved,"ms":ms,
"total_fired":total_fired,
"survived_count":len(survived),
"binding":{k:round(v,5) for k,v in binding.items()},
"best_ray":best_t.ray_name if best_t else "—",
"best_ray_type":best_t.ray_type if best_t else "—",
"type_counts":type_counts,
"allosteric":allosteric,
"traces":[{
"ray":t.ray_name,"ray_type":t.ray_type,"depth":t.depth,
"ce_before":round(t.ce_before,5),"ce":round(t.ce_after,5),
"g1":t.g1_pass,"g2":t.g2_pass,"survived":t.survived,
"tension":t.tension_class,
"binding":{k:round(v,5) for k,v in (t.binding or {}).items()},
"invariants":t.invariant_results,
} for t in traces[:40]],
}
with STATE_LOCK:
RECENT_RUNS.insert(0,record)
if len(RECENT_RUNS)>MAX_RECENT: RECENT_RUNS.pop()
ps=PROBLEM_STATS[axl_def.name]
ps["runs"]+=1; ps["total_ce"]+=ce
if solved: ps["solved"]+=1
if ce<ps["best_ce"]:
ps["best_ce"]=ce
ps["best_binding"]={k:round(v,5) for k,v in binding.items()}
ps["best_ray"]=best_t.ray_name if best_t else "—"
for rt,counts in type_counts.items():
ps["type_stats"][rt]["tried"]+=counts["tried"]
ps["type_stats"][rt]["survived"]+=counts["survived"]
for tc,cnt in allosteric.items():
ps["allosteric"][tc]=ps["allosteric"].get(tc,0)+cnt
def toggle_engine():
global IS_RUNNING
IS_RUNNING=not IS_RUNNING
if IS_RUNNING: threading.Thread(target=background_worker,daemon=True).start()
return "⏹ STOP 21.0" if IS_RUNNING else "▶ START 21.0"
# ══════════════════════════════════════════════════════════════════════
# SECTION 14: GRADIO DASHBOARD
# ══════════════════════════════════════════════════════════════════════
RAY_ICONS={"seed":"🌱","branch":"🌿","tension":"⚡","scout":"🔭",
"recombine":"🧬","invert":"🔄","target":"🎯"}
def _render_perf_table() -> str:
rows=""
for prob in AXL_PROBLEMS:
ps=PROBLEM_STATS[prob.name]; meta=PROBLEM_META.get(prob.name,{})
if ps["runs"]==0:
rows+=(f"<tr><td>{meta.get('icon','·')}</td>"
f"<td style='color:#444'>{prob.name}</td>"
f"<td colspan='8' style='color:#222'>warming up…</td></tr>")
continue
runs=ps["runs"]; rate=round(ps["solved"]/runs*100)
avg_ce=round(ps["total_ce"]/runs,5)
rc=ps["type_stats"]
type_cells="".join(
f"<td style='color:{'#4CAF50' if rc[t]['survived']>0 else '#333'};font-size:0.75em'>"
f"{RAY_ICONS.get(t,'·')}{round(rc[t]['survived']/rc[t]['tried']*100) if rc[t]['tried'] else 0}%</td>"
for t in ["seed","tension","scout","recombine","invert","branch"])
rate_col="#4CAF50" if rate>=50 else ("#FF9800" if rate>=20 else "#EF5350")
al=ps["allosteric"]; total_al=sum(al.values()) or 1
dom_class=max(al,key=al.get)
rows+=(f"<tr>"
f"<td style='color:#aaa'>{meta.get('icon','·')}</td>"
f"<td style='color:#FF9800;font-size:0.85em'>{prob.name}</td>"
f"<td style='color:#5C6BC0;font-size:0.8em'>{meta.get('domain','')}</td>"
f"<td style='color:#555;font-size:0.75em'>{meta.get('difficulty','')}</td>"
f"<td style='color:#aaa'>{runs}</td>"
f"<td style='color:{rate_col};font-weight:700'>{rate}%</td>"
f"<td style='color:#26C6DA'>{avg_ce}</td>"
f"<td style='color:#26C6DA;font-size:0.8em'>{ps['best_ce']:.5f}</td>"
f"{type_cells}"
f"<td style='color:#888;font-size:0.75em'>{dom_class}</td>"
f"</tr>")
return rows
def _render_type_table() -> str:
global_stats={t:{"tried":0,"survived":0} for t in RAY_TYPES}
for ps in PROBLEM_STATS.values():
for rt,counts in ps["type_stats"].items():
global_stats[rt]["tried"]+=counts["tried"]
global_stats[rt]["survived"]+=counts["survived"]
desc={
"seed":"Greedy depth-first from single axiom",
"branch":"Allosteric feedback-guided child — width 12 (16 for systemic)",
"tension":"Mediated contradiction [A→B1→M→B2→C] length 5",
"scout":"Random length 3-12 cartographer",
"recombine":"Genetic crossover + interpolated baton geometry",
"invert":"Reversed successful sequence",
"target":"Bidirectional: spawned from goal state invariants",
}
rows=""
for rt,s in global_stats.items():
rate=round(s['survived']/s['tried']*100) if s['tried'] else 0
c="#4CAF50" if rate>0 else "#333"
rows+=(f"<tr><td>{RAY_ICONS.get(rt,'·')}</td>"
f"<td style='color:#FF9800'>{rt}</td>"
f"<td style='color:#aaa'>{s['tried']}</td>"
f"<td style='color:#4CAF50'>{s['survived']}</td>"
f"<td style='color:#EF5350'>{s['tried']-s['survived']}</td>"
f"<td style='color:{c};font-weight:700'>{rate}%</td>"
f"<td style='color:#555;font-size:0.8em'>{desc.get(rt,'')}</td></tr>")
return rows
def _render_allosteric_table() -> str:
totals={"isolated":0,"relational":0,"systemic":0}
for ps in PROBLEM_STATS.values():
for tc,cnt in ps["allosteric"].items():
totals[tc]=totals.get(tc,0)+cnt
grand=sum(totals.values()) or 1
rows=""
desc={"isolated":"1 dominant var → ATOMIC/EXTREMAL/DUALITY",
"relational":"2 dominant vars → BILINEAR/MPR/ORDERED/METRIC",
"systemic":"3+ dominant vars → CONSERVED/NETWORK/HOLISM (branch×16)"}
for tc,cnt in totals.items():
pct=round(cnt/grand*100)
rows+=(f"<tr><td style='color:#FF9800'>{tc}</td>"
f"<td style='color:#26C6DA'>{cnt:,}</td>"
f"<td style='color:#26C6DA'>{pct}%</td>"
f"<td style='color:#555;font-size:0.8em'>{desc.get(tc,'')}</td></tr>")
return rows
def _render_recent() -> str:
with STATE_LOCK: log=list(RECENT_RUNS[:8])
if not log: return "<div style='color:#222;padding:12px'>No runs yet.</div>"
html=""
for r in log:
ce=r["ce"]; solved=r["solved"]
ce_color="#4CAF50" if solved else ("#FF9800" if ce<0.5 else "#EF5350")
status="✓ SOLVED" if solved else ("~ PARTIAL" if ce<0.5 else "✗ UNSOLVED")
binding_rows="".join(
f"<tr><td style='color:#888;font-size:0.8em'>{v}</td>"
f"<td style='color:#26C6DA;font-weight:700;font-size:0.8em'>{val:+.5f}</td></tr>"
for v,val in list(r.get("binding",{}).items())[:6])
inv_rows=""
best_t=next((t for t in r.get("traces",[]) if t["survived"]),
r["traces"][0] if r.get("traces") else None)
if best_t and best_t.get("invariants"):
for inv,(val,ok) in best_t["invariants"].items():
c="#4CAF50" if ok else "#EF5350"
inv_rows+=(f"<tr><td style='color:{c};font-size:0.75em'>{'✓' if ok else '✗'}</td>"
f"<td style='color:#888;font-size:0.75em'>{inv}</td>"
f"<td style='color:{c};font-size:0.75em'>{val:.5f}</td></tr>")
tc=r.get("type_counts",{})
type_badges="".join(
f"<span style='font-size:0.72em;margin-right:6px'>"
f"{RAY_ICONS.get(t,'·')}<span style='color:#555'>{t[:3]}</span>"
f"<span style='color:{'#4CAF50' if tc.get(t,{}).get('survived',0)>0 else '#333'}'>"
f"{round(tc.get(t,{}).get('survived',0)/tc.get(t,{}).get('tried',1)*100) if tc.get(t,{}).get('tried',0) else 0}%</span></span>"
for t in ["seed","tension","scout","recombine","invert","branch","target"])
ray_trail=""
for t in r.get("traces",[])[:15]:
icon=RAY_ICONS.get(t.get("ray_type","seed"),"·")
c="#4CAF50" if t["survived"] else "#333"
star=" ◀" if t["survived"] else ""
ray_trail+=(f"<div style='color:{c};font-size:0.70em'>"
f"{icon}[{t.get('ray','')}] CE={t['ce']:.4f} "
f"G1={'✓' if t['g1'] else '✗'} G2={'✓' if t['g2'] else '✗'} "
f"<span style='color:#888'>{t.get('tension','?')[:3]}</span>"
f"<span style='color:#4CAF50;font-weight:700'>{star}</span></div>")
html+=(f"<div style='border:1px solid #181818;border-radius:6px;margin-bottom:10px;overflow:hidden'>"
f"<div style='background:#0d0d0d;padding:6px 12px;border-bottom:1px solid #1a1a1a;"
f"display:flex;align-items:center;gap:12px;flex-wrap:wrap'>"
f"<span style='color:{ce_color};font-weight:700'>{status}</span>"
f"<span style='color:#555;font-size:0.8em'>{r.get('icon','·')} {r['problem']}</span>"
f"<span style='color:#5C6BC0;font-size:0.75em'>{r.get('domain','')}</span>"
f"<span style='color:#333;font-size:0.75em'>CE={ce:.6f}</span>"
f"<span style='color:#26C6DA;font-size:0.70em'>{r['survived_count']} survived / {r['total_fired']:,} fired</span>"
f"<span style='margin-left:auto'>{type_badges}</span></div>"
f"<div style='display:flex'>"
f"<div style='flex:0 0 140px;padding:8px 10px;border-right:1px solid #181818'>"
f"<div style='color:#333;font-size:0.62em;margin-bottom:3px'>BINDING</div>"
f"<table style='border-collapse:collapse;width:100%'>{binding_rows}</table></div>"
f"<div style='flex:0 0 190px;padding:8px 10px;border-right:1px solid #181818'>"
f"<div style='color:#333;font-size:0.62em;margin-bottom:3px'>INVARIANTS</div>"
f"<table style='border-collapse:collapse;width:100%'>"
f"{inv_rows or '<tr><td colspan=3 style=color:#222>—</td></tr>'}</table></div>"
f"<div style='flex:1;padding:8px 10px;overflow-y:auto;max-height:180px'>"
f"<div style='color:#333;font-size:0.62em;margin-bottom:3px'>"
f"RAY TREE — best: <span style='color:#FF9800'>{r.get('best_ray','—')}</span></div>"
f"{ray_trail}</div></div></div>")
return html
def refresh_dashboard():
with STATE_LOCK:
runs =sum(s["runs"] for s in PROBLEM_STATS.values())
solved =sum(s["solved"] for s in PROBLEM_STATS.values())
total_ce=sum(s["total_ce"] for s in PROBLEM_STATS.values())
avg_ce=round(total_ce/runs,5) if runs else 0
perf =_render_perf_table()
types=_render_type_table()
allos=_render_allosteric_table()
recnt=_render_recent()
return f"""
<div style='background:#090909;color:#e0e0e0;font-family:monospace;padding:16px;max-width:1700px'>
<h2 style='color:#26C6DA;border-bottom:1px solid #1a1a1a;padding-bottom:5px;font-size:1.05em;margin:0 0 8px 0'>
⚗ Practicality 21.0 — MPR Ordering Fix + Branch Width 12
</h2>
<div style='color:#2a2a2a;font-size:0.70em;margin-bottom:10px'>
Fix: _extract_ordering_pair reads coefficient signs (not alphabetical sympy ordering) ·
Branch width 12 (16 for systemic tension) · Pre-compiled anchors · 12K batch
</div>
<div style='margin-bottom:12px'>
<span style='display:inline-block;padding:2px 8px;border-radius:3px;background:#0e0e0e;margin:2px;font-size:0.72em;border:1px solid #1a1a1a;color:#aaa'>Runs: {runs}</span>
<span style='display:inline-block;padding:2px 8px;border-radius:3px;background:#0e0e0e;margin:2px;font-size:0.72em;border:1px solid #1a1a1a;color:#4CAF50'>Solved: {solved}</span>
<span style='display:inline-block;padding:2px 8px;border-radius:3px;background:#0e0e0e;margin:2px;font-size:0.72em;border:1px solid #1a1a1a;color:#26C6DA'>Avg CE: {avg_ce}</span>
<span style='display:inline-block;padding:2px 8px;border-radius:3px;background:#0e0e0e;margin:2px;font-size:0.72em;border:1px solid #1a1a1a;color:#FF9800'>Problems: {len(AXL_PROBLEMS)}</span>
<span style='display:inline-block;padding:2px 8px;border-radius:3px;background:#0e0e0e;margin:2px;font-size:0.72em;border:1px solid #1a1a1a;color:#5C6BC0'>Axioms: {len(ALL_AXIOMS)}</span>
<span style='display:inline-block;padding:2px 8px;border-radius:3px;background:#0e0e0e;margin:2px;font-size:0.72em;border:1px solid #1a1a1a;color:#888'>Device: {DEVICE.type.upper()}</span>
</div>
<h4 style='color:#252525;font-size:0.78em;letter-spacing:2px;text-transform:uppercase;margin:16px 0 4px 0'>Creative Ray Type Efficacy</h4>
<table style='width:100%;border-collapse:collapse;margin-bottom:10px'>
<tr style='color:#252525;font-size:0.68em'><th></th><th>Type</th><th>Tried</th><th>Survived</th><th>Failed</th><th>Rate</th><th>Mechanism (21.0)</th></tr>
{types}
</table>
<h4 style='color:#252525;font-size:0.78em;letter-spacing:2px;text-transform:uppercase;margin:16px 0 4px 0'>Allosteric Feedback Diagnostics</h4>
<table style='width:100%;border-collapse:collapse;margin-bottom:10px'>
<tr style='color:#252525;font-size:0.68em'><th>Tension Class</th><th>Fired</th><th>Share</th><th>Biases Toward</th></tr>
{allos}
</table>
<h4 style='color:#252525;font-size:0.78em;letter-spacing:2px;text-transform:uppercase;margin:16px 0 4px 0'>Multi-Domain Performance</h4>
<table style='width:100%;border-collapse:collapse;margin-bottom:10px'>
<tr style='color:#252525;font-size:0.68em'>
<th></th><th>Problem</th><th>Domain</th><th>Diff</th><th>Runs</th><th>Rate%</th>
<th>Avg CE</th><th>Best CE</th>
<th>🌱</th><th>⚡</th><th>🔭</th><th>🧬</th><th>🔄</th><th>🌿</th>
<th>Dom Tension</th>
</tr>
{perf}
</table>
<h4 style='color:#252525;font-size:0.78em;letter-spacing:2px;text-transform:uppercase;margin:16px 0 4px 0'>Recent Runs</h4>
{recnt}
</div>"""
with gr.Blocks(theme=gr.themes.Monochrome(text_size="sm"),title="Practicality 21.0") as demo:
gr.Markdown("## ⚗ Practicality 21.0 — MPR Ordering Fix + Branch Width 12")
gr.Markdown(
f"**Compute:** `{DEVICE.type.upper()}` | "
f"**Fix:** `_extract_ordering_pair` reads coefficient signs · "
f"**Branch:** 12 default / 16 systemic")
with gr.Row():
btn=gr.Button("▶ START 21.0",variant="primary")
gr.Markdown("*Engine cycles through 14 domains. Dashboard auto-refreshes every 2s.*")
html_out=gr.HTML(refresh_dashboard())
btn.click(toggle_engine,inputs=None,outputs=btn)
def auto_refresh():
while True:
time.sleep(2.0)
yield refresh_dashboard()
demo.load(auto_refresh,inputs=None,outputs=html_out)
if __name__=="__main__":
print(f"[SYSTEM 21.0] Launching on 0.0.0.0:7860 | MPR Fix + Branch-12")
demo.launch(server_name="0.0.0.0",server_port=7860,share=False)