diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -1,72 +1,962 @@ #!/usr/bin/env python3 """ -PRACTICALITY SYSTEM 4.0 — WISDOM-GUIDED SOLVER +PRACTICALITY SYSTEM 10.0 — FULL NEURO-SYMBOLIC STACK ═══════════════════════════════════════════════════════════════════ -Architecture: Two-layer system. +THE ABSTRACTION IMPERATIVE & NUMERICAL RESOLUTION -LAYER 1 — WISDOM ENGINE (Structural): - Maintains a library of abstract mathematical domains defined by - Bourbaki axiom sets. An internal Proposer/Critic debate selects - the best domain isomorphism for a given problem. The winning - domain is compiled into PSL constraints. +This is the fully merged architecture. It bridges pure mathematical +reasoning (Wisdom) down to AST-aware algebraic roots (Hypothesis Engine). -LAYER 2 — NUMERICAL ENGINE (3.11): - Receives compiled PSL. Runs consensus solver. Returns a Universal - Witness containing the binding, CE, topology, and dominant - residual expressions. +LAYER 1 — WISDOM ENGINE (Structural Abstraction): + Maintains a library of mathematical spaces defined by Bourbaki axioms. + An internal Proposer/Critic debate selects the best isomorphism for + unstructured data. It compiles the winning domain into PSL 2.0 constraints. -FEEDBACK LOOP: - The Witness is parsed back into axiom evidence. Failing constraints - reveal which axioms were wrong or missing. The Wisdom Engine mutates - its domain hypothesis and re-compiles. This continues until CE - crosses the solve threshold or wisdom rounds are exhausted. +LAYER 2 — TRUE HYPOTHESIS ENGINE (Algebraic AST): + Receives the generated PSL. Parses it into ASTs. Extracts exact + algebraic roots, manifold tangents, and topological chains to + formulate numerical hypotheses. Tests them and polishes with Adam/Gradients. -The LLM provides the initial ProblemSpace(axioms). Everything else -is internal symbolic reasoning + numerical verification. +FEEDBACK LOOP: + The AST Engine returns a Universal Witness (Residuals + Topology). + The Wisdom Engine parses this back into Axiom Evidence. Failing + constraints reveal which abstract assumptions were flawed. The domain + mutates, and the cycle continues until the space is mathematically resolved. """ -import itertools, math, random, time, threading, asyncio, warnings +import time, random, math, threading, asyncio, warnings, json, itertools +from functools import reduce +from typing import Optional, Callable, List, Dict, Tuple, Any, Set from dataclasses import dataclass, field -from typing import Set, List, Dict, Tuple, Optional from collections import defaultdict, deque -from functools import reduce from concurrent.futures import ThreadPoolExecutor from contextlib import asynccontextmanager import numpy as np import sympy as sp from sympy.parsing.sympy_parser import parse_expr +import torch from fastapi import FastAPI, Request from fastapi.responses import HTMLResponse, JSONResponse import uvicorn warnings.filterwarnings("ignore") +USE_GPU = torch.cuda.is_available() +DEVICE = torch.device("cuda" if USE_GPU else "cpu") +print(f"[SYSTEM] Compute Mode: {'GPU' if USE_GPU else 'CPU'} | System v10.0") + +# ══════════════════════════════════════════════════════════════════════════ +# SECTION 1: SYSTEM CONSTANTS & CONFIGURATION +# ══════════════════════════════════════════════════════════════════════════ +SOLVE_THRESHOLD = 0.05 +TOTAL_BUDGET = 600 +HC4_VOLUME_FLOOR = 0.20 +L9_CONTRIB_THRESHOLD = 1e-8 +HUB_RESIDUAL_MULT = 2.0 +HUB_DEGREE_RATIO = 0.6 + +HYPOTHESIS_MAX_ROUNDS = 25 +MAX_HYPS_PER_ROUND = 10 +BRANCH_DEPTH = 3 +MANIFOLD_STEPS = [0.05, 0.2, -0.05, -0.2, 0.5, -0.5] + +N_MPRT_EXPLORE = 50000 if USE_GPU else 5000 +POLISH_STEPS = 50 +ADAM_STEPS = 40 + +TIER_HARD="hard" + +PROJECTION_CACHE: Dict[str, Dict[str, List[Dict]]] = {} + +# ══════════════════════════════════════════════════════════════════════════ +# SECTION 2: INTERVAL ARITHMETIC (HC4 BASE) +# ══════════════════════════════════════════════════════════════════════════ +class IV: + __slots__ = ("lo", "hi") + def __init__(self, lo, hi): self.lo=float(lo); self.hi=float(hi) + def __add__(self,o): + if isinstance(o,(int,float)): return IV(self.lo+o,self.hi+o) + return IV(self.lo+o.lo,self.hi+o.hi) + __radd__=__add__ + def __sub__(self,o): + if isinstance(o,(int,float)): return IV(self.lo-o,self.hi-o) + return IV(self.lo-o.hi,self.hi-o.lo) + def __rsub__(self,o): + if isinstance(o,(int,float)): return IV(o-self.hi,o-self.lo) + return o.__sub__(self) + def __mul__(self,o): + if isinstance(o,(int,float)): + a,b=self.lo*o,self.hi*o; return IV(min(a,b),max(a,b)) + p=(self.lo*o.lo,self.lo*o.hi,self.hi*o.lo,self.hi*o.hi) + return IV(min(p),max(p)) + __rmul__=__mul__ + def __truediv__(self,o): + if isinstance(o,(int,float)): + if abs(o)<1e-15: return IV(-1e18,1e18) + a,b=self.lo/o,self.hi/o; return IV(min(a,b),max(a,b)) + if o.lo<=0<=o.hi: return IV(-1e18,1e18) + return self*IV(1.0/o.hi,1.0/o.lo) + def __rtruediv__(self,o): + if self.lo<=0<=self.hi: return IV(-1e18,1e18) + return IV(o/self.hi,o/self.lo) if o>=0 else IV(o/self.lo,o/self.hi) + def __neg__(self): return IV(-self.hi,-self.lo) + def __pow__(self,n): + if isinstance(n,int): + if n==0: return IV(1.0,1.0) + if n%2==0: + if self.lo>=0: return IV(self.lo**n,self.hi**n) + if self.hi<=0: return IV(self.hi**n,self.lo**n) + return IV(0.0,max(abs(self.lo)**n,abs(self.hi)**n)) + return IV(self.lo**n if self.lo>=0 else -((-self.lo)**n), + self.hi**n if self.hi>=0 else -((-self.hi)**n)) + if self.lo<0: return IV(0.0,max(abs(self.lo)**n,self.hi**n)) + return IV(self.lo**n,self.hi**n) + def contains_zero(self): return self.lo<=0.0<=self.hi + def width(self): return max(0.0,self.hi-self.lo) + def mid(self): return (self.lo+self.hi)*0.5 + +Box=Dict[str,IV] + +def compile_iv(expr,variables): + def _c(e): + if e.is_Number: + v=float(e); return lambda box,_v=v: IV(_v,_v) + if e.is_Symbol: + n=str(e); return lambda box,_n=n: box.get(_n,IV(-1e18,1e18)) + if e.is_Add: + fs=[_c(a) for a in e.args] + return lambda box,_fs=fs: reduce(lambda a,b:a+b,(_f(box) for _f in _fs)) + if e.is_Mul: + fs=[_c(a) for a in e.args] + return lambda box,_fs=fs: reduce(lambda a,b:a*b,(_f(box) for _f in _fs)) + if e.is_Pow: + bc=_c(e.args[0]); ex=e.args[1] + if ex.is_Number: return lambda box,_bc=bc,_ex=float(ex): _bc(box)**_ex + exc=_c(ex) + return lambda box,_bc=bc,_exc=exc: _bc(box)**_exc(box).mid() + return lambda box: IV(-1e18,1e18) + return _c(expr) + +# ══════════════════════════════════════════════════════════════════════════ +# SECTION 3: PSL 2.0 PARSER +# ══════════════════════════════════════════════════════════════════════════ +@dataclass +class PSLVar: name:str; lo:float; hi:float +@dataclass +class PSLConstraint: kind:str; expr:str; scope:str="root"; weight:float=1.0; branches:List[str]=field(default_factory=list) +@dataclass +class PSLScope: name:str; vars:List[PSLVar]; ref_vars:List[str]; constraints:List[PSLConstraint] +@dataclass +class PSLProgram: + name:str; level:int; vars:List[PSLVar]; constraints:List[PSLConstraint] + scopes:List[PSLScope]; scope_order:List[str] + +def parse_psl(text:str) -> PSLProgram: + lines=[l.strip() for l in text.strip().split('\n') if l.strip() and not l.strip().startswith('#')] + prog=PSLProgram("unnamed",0,[],[],[],[]) + i=0 + def advance(): nonlocal i; l=lines[i]; i+=1; return l + def peek(): return lines[i] if i=3 else None + + def parse_scope_body(sname): + svars,scons,ref_vars=[],[],[] + while i1 else "" + if kw=="END": advance(); break + advance() + if kw=="VAR": + v=pvar(rest) + if v: svars.append(v) + elif kw=="VARS": ref_vars.extend(rest.split()) + elif kw in ("EQ","GEQ","LEQ","OR_EQ"): + br=[b.strip() for b in rest.split('|')] if kw=="OR_EQ" else [] + scons.append(PSLConstraint(kw.lower(),rest,scope=sname,branches=br)) + return PSLScope(sname,svars,ref_vars,scons) + + while i1 else "" + if kw=="SPACE": prog.name=rest.split()[0] if rest else "unnamed" + elif kw=="VAR": + v=pvar(rest) + if v: prog.vars.append(v) + elif kw in ("EQ","GEQ","LEQ","OR_EQ"): + br=[b.strip() for b in rest.split('|')] if kw=="OR_EQ" else [] + prog.constraints.append(PSLConstraint(kw.lower(),rest,branches=br)) + elif kw=="SCOPE": + sname=rest.split()[0] if rest else f"s{len(prog.scopes)}" + prog.scopes.append(parse_scope_body(sname)); prog.scope_order.append(sname) + return prog + +@dataclass +class ExpandedProblem: + variables:List[str]; bounds:Dict[str,Tuple[float,float]] + constraints:List['Constraint'] + scope_groups:Dict[str,List[int]]; scope_vars:Dict[str,List[str]]; scope_order:List[str] + +def expand_psl(prog:PSLProgram) -> ExpandedProblem: + variables=[]; bounds={}; constraints=[] + scope_groups=defaultdict(list); scope_vars=defaultdict(list) + scope_order=list(prog.scope_order) + + def add_var(name,lo,hi,scope="root"): + if name not in bounds: variables.append(name); bounds[name]=(lo,hi) + if scope!="root" and name not in scope_vars[scope]: scope_vars[scope].append(name) + + def add_con(kind,expr,direction,scope="root",weight=1.0,branches=None): + idx=len(constraints) + constraints.append(Constraint(kind,expr,direction,weight,scope,branches or [])) + scope_groups[scope].append(idx) + try: + syms={v:sp.Symbol(v) for v in variables} + if kind=="or_eq" and branches: + for b_str in branches: + parsed=parse_expr(b_str,local_dict=syms) + for v in variables: + if sp.Symbol(v) in parsed.free_symbols and scope!="root" and v not in scope_vars[scope]: + scope_vars[scope].append(v) + else: + parsed=parse_expr(expr,local_dict=syms) + for v in variables: + if sp.Symbol(v) in parsed.free_symbols and scope!="root" and v not in scope_vars[scope]: + scope_vars[scope].append(v) + except: pass + + for v in prog.vars: add_var(v.name,v.lo,v.hi) + for c in prog.constraints: + if c.kind in ("eq","geq","leq","or_eq"): + kind="or_eq" if c.kind=="or_eq" else "equality" if c.kind=="eq" else "inequality" + add_con(kind,c.expr,c.kind,"root",c.weight,c.branches) + for sc in prog.scopes: + for v in sc.vars: add_var(v.name,v.lo,v.hi,sc.name) + for vname in sc.ref_vars: + if vname in bounds and vname not in scope_vars[sc.name]: + scope_vars[sc.name].append(vname) + for c in sc.constraints: + if c.kind in ("eq","geq","leq","or_eq"): + kind="or_eq" if c.kind=="or_eq" else "equality" if c.kind=="eq" else "inequality" + add_con(kind,c.expr,c.kind,sc.name,c.weight,c.branches) + return ExpandedProblem(variables,bounds,constraints, + dict(scope_groups),dict(scope_vars),scope_order) + +# ══════════════════════════════════════════════════════════════════════════ +# SECTION 4: PROBLEM & DUAL COMPILATION (AST) +# ══════════════════════════════════════════════════════════════════════════ +@dataclass +class Constraint: + kind:str; expr:str; direction:str; weight:float=1.0; scope:str="root" + branches:List[str]=field(default_factory=list) + +@dataclass +class MathConstraint: + kind:str; expr_str:str; direction:str; weight:float=1.0 + fast_iv:Optional[Callable]=field(default=None,repr=False) + fast_pt:Optional[Callable]=field(default=None,repr=False) + fast_np:Optional[Callable]=field(default=None,repr=False) + fast_torch:Optional[Callable]=field(default=None,repr=False) + syms_used:List[str]=field(default_factory=list) + parsed:Optional[sp.Expr]=field(default=None,repr=False) + degree:int=1; scope:str="root" + branches:List['MathConstraint']=field(default_factory=list) + projections:Dict[str,List[Dict]]=field(default_factory=dict) + fast_jac:Dict[str,Dict]=field(default_factory=dict) + +def compile_mc(kind,expr_str,direction,variables,weight=1.0,scope="root",branches=None): + mc=MathConstraint(kind=kind,expr_str=expr_str,direction=direction,weight=weight,scope=scope) + if kind=="or_eq" and branches: + for b_str in branches: + b_mc=compile_mc("equality",b_str,"eq",variables,weight,scope) + mc.branches.append(b_mc); mc.syms_used.extend(b_mc.syms_used) + mc.syms_used=list(dict.fromkeys(mc.syms_used)) + def _or_iv(box,_mcs=mc.branches): + rivs=[] + for b in _mcs: + if b.fast_iv: + try: rivs.append(b.fast_iv(box)) + except: pass + if not rivs: return IV(-1e18,1e18) + return IV(min(r.lo for r in rivs),max(r.hi for r in rivs)) + mc.fast_iv=_or_iv + return mc + + syms={v:sp.Symbol(v) for v in variables} + try: + parsed=parse_expr(expr_str,local_dict=syms); mc.parsed=parsed + mc.syms_used=[v for v in variables if sp.Symbol(v) in parsed.free_symbols] + mc.fast_iv=compile_iv(parsed,variables) + mc.fast_pt=sp.lambdify([sp.Symbol(v) for v in mc.syms_used],parsed,modules="math") + mc.fast_np=sp.lambdify([sp.Symbol(v) for v in mc.syms_used],parsed,modules="numpy") + + if USE_GPU: + _tops={"sin":torch.sin,"cos":torch.cos,"exp":torch.exp, + "sqrt":torch.sqrt,"log":torch.log,"Abs":torch.abs} + try: + mc.fast_torch=sp.lambdify([sp.Symbol(v) for v in mc.syms_used], + parsed,modules=[_tops,"math"]) + except: pass + + fast_jac={} + for v in mc.syms_used: + try: + de=sp.diff(parsed,sp.Symbol(v)) + fs=[str(s) for s in de.free_symbols] + func=sp.lambdify([sp.Symbol(s) for s in fs],de,modules="math") + fast_jac[v]={"syms":fs,"func":func} + except: pass + mc.fast_jac=fast_jac + + # AST Projection: Isolate variables algebraically + if kind=="equality": + if expr_str not in PROJECTION_CACHE: + pm={} + for sym in parsed.free_symbols: + v_str=str(sym) + try: + sols=sp.solve(parsed,sym) + if v_str not in pm: pm[v_str]=[] + for sol in sols: + fs=list(sol.free_symbols) + func=sp.lambdify(fs,sol,modules="math") + pm[v_str].append({"syms":[str(s) for s in fs],"func":func}) + except: pass + PROJECTION_CACHE[expr_str]=pm + mc.projections=PROJECTION_CACHE.get(expr_str,{}) + except: pass + return mc + +@dataclass +class Problem: + pid:str; variables:List[str] + constraints:List[Constraint]; bounds:Dict[str,Tuple[float,float]] + tier:str=TIER_HARD + scope_groups:Dict[str,List[int]]=field(default_factory=dict) + scope_vars:Dict[str,List[str]]=field(default_factory=dict) + scope_order:List[str]=field(default_factory=list) + + def __post_init__(self): + self.compiled_constraints=[compile_mc(c.kind,c.expr,c.direction, + self.variables,c.weight,c.scope,c.branches) for c in self.constraints] + + def constraint_energy(self,b:Dict[str,float]) -> float: + total=0.0 + for mc in self.compiled_constraints: + if getattr(mc,'weight',1.0)==0.0: continue + if mc.kind=="or_eq": + vals=[] + for bmc in mc.branches: + if bmc.fast_pt: + try: + args=[b.get(v,0.0) for v in bmc.syms_used] + vals.append(abs(float(bmc.fast_pt(*args)))) + except: pass + total+=(min(vals)*mc.weight) if vals else 1.0 + else: + if mc.fast_pt is None: total+=1.0; continue + try: + args=[b.get(v,0.0) for v in mc.syms_used] + val=float(mc.fast_pt(*args)) + if mc.kind=="equality": total+=abs(val)*mc.weight + elif mc.direction=="geq": total+=max(0.0,-val)*mc.weight + else: total+=max(0.0,val)*mc.weight + except: total+=1.0 + for v,(lo,hi) in self.bounds.items(): + bv=b.get(v,0.0) + total+=max(0.0,lo-bv)+max(0.0,bv-hi) + return total + + def evaluate_gradient(self,b:Dict[str,float]) -> Dict[str,float]: + grad={v:0.0 for v in self.variables} + for mc in self.compiled_constraints: + if getattr(mc,'weight',1.0)==0.0: continue + if mc.kind=="or_eq": + best,mv=None,float('inf') + for bmc in mc.branches: + if bmc.fast_pt: + try: + args=[b.get(v,0.0) for v in bmc.syms_used] + val=float(bmc.fast_pt(*args)) + if abs(val)=0 else -1.0 + for vn,jac in bmc.fast_jac.items(): + try: grad[vn]+=mc.weight*sign*float(jac["func"](*[b.get(s,0.0) for s in jac["syms"]])) + except: pass + else: + if mc.fast_pt is None: continue + try: + f_val=float(mc.fast_pt(*[b.get(v,0.0) for v in mc.syms_used])) + if mc.kind=="equality": mult=mc.weight*(1.0 if f_val>=0 else -1.0) + elif mc.direction=="geq": mult=-mc.weight if f_val<0 else 0.0 + else: mult=mc.weight if f_val>0 else 0.0 + if mult!=0.0: + for vn,jac in mc.fast_jac.items(): + try: grad[vn]+=mult*float(jac["func"](*[b.get(s,0.0) for s in jac["syms"]])) + except: pass + except: pass + for v in self.variables: + lo,hi=self.bounds[v]; val=b.get(v,0.0) + if valhi: grad[v]+=1.0 + return grad + + def constraint_energy_tensor(self,b_matrix:Any,var_order:List[str]) -> Any: + N=b_matrix.shape[0]; vti={v:i for i,v in enumerate(var_order)} + if USE_GPU: + total=torch.zeros(N,dtype=torch.float32,device=DEVICE) + for mc in self.compiled_constraints: + if getattr(mc,'weight',1.0)==0.0: continue + if mc.kind=="or_eq": + bv=[] + for bmc in mc.branches: + if bmc.fast_torch: + try: + args=[b_matrix[:,vti[v]] for v in bmc.syms_used] + val=bmc.fast_torch(*args) + if not isinstance(val,torch.Tensor): + val=torch.full((N,),float(val),device=DEVICE,dtype=torch.float32) + elif val.dim()==0: val=val.expand(N) + bv.append(torch.abs(val)) + except: pass + if bv: + mv,_=torch.stack(bv,0).min(0); total+=mv*mc.weight + else: total+=1.0 + else: + if mc.fast_torch is None: total+=1.0; continue + try: + args=[b_matrix[:,vti[v]] for v in mc.syms_used] + val=mc.fast_torch(*args) + if not isinstance(val,torch.Tensor): + val=torch.full((N,),float(val),device=DEVICE,dtype=torch.float32) + elif val.dim()==0: val=val.expand(N) + if mc.kind=="equality": total+=torch.abs(val)*mc.weight + elif mc.direction=="geq": total+=torch.relu(-val)*mc.weight + else: total+=torch.relu(val)*mc.weight + except: total+=1.0 + for i,v in enumerate(var_order): + lo,hi=self.bounds[v]; col=b_matrix[:,i] + total+=torch.relu(torch.tensor(lo,device=DEVICE)-col)+torch.relu(col-torch.tensor(hi,device=DEVICE)) + return total + else: + total=np.zeros(N,dtype=np.float32) + for mc in self.compiled_constraints: + if getattr(mc,'weight',1.0)==0.0: continue + if mc.kind=="or_eq": + bv=[] + for bmc in mc.branches: + if bmc.fast_np: + try: + args=[b_matrix[:,vti[v]] for v in bmc.syms_used] + val=bmc.fast_np(*args) + if not isinstance(val,np.ndarray): val=np.full(N,float(val),dtype=np.float32) + bv.append(np.abs(val)) + except: pass + if bv: total+=np.minimum.reduce(bv)*mc.weight + else: total+=1.0 + else: + if mc.fast_np is None: total+=1.0; continue + try: + args=[b_matrix[:,vti[v]] for v in mc.syms_used] + val=mc.fast_np(*args) + if not isinstance(val,np.ndarray): val=np.full(N,float(val),dtype=np.float32) + if mc.kind=="equality": total+=np.abs(val)*mc.weight + elif mc.direction=="geq": total+=np.maximum(0.0,-val)*mc.weight + else: total+=np.maximum(0.0,val)*mc.weight + except: total+=1.0 + for i,v in enumerate(var_order): + lo,hi=self.bounds[v]; col=b_matrix[:,i] + total+=np.maximum(0.0,lo-col)+np.maximum(0.0,col-hi) + return total + +# ══════════════════════════════════════════════════════════════════════════ +# SECTION 5: ORACLES +# ══════════════════════════════════════════════════════════════════════════ +@dataclass +class L7Certificate: topology_sig:str; hub_vars:List[str]; solve_order:List[str] +@dataclass +class L8Certificate: incoherent_vars:List[str]; tightened_bounds:Dict[str,Tuple[float,float]] +@dataclass +class L9Certificate: residual_ce:float; dominant_vars:List[str]; dominant_exprs:List[str] + +class TopologyOracle: + def evaluate(self,problem:Problem,active_scopes:List[str]) -> L7Certificate: + vd=defaultdict(int) + for mc in problem.compiled_constraints: + for v in mc.syms_used: vd[v]+=1 + ht=max(2,math.ceil(max(vd.values() or [0])*HUB_DEGREE_RATIO)) if vd else 2 + hub=[v for v,d in vd.items() if d>=ht] + if not active_scopes: return L7Certificate("none",hub,[]) + so=sorted(active_scopes,key=lambda sn:sum(vd.get(v,0) for v in problem.scope_vars.get(sn,[]))) + return L7Certificate("mesh" if len(hub)>2 else "hub" if hub else "chain",hub,so) + +class CoherenceOracle: + def evaluate(self,wd:Dict[str,Dict[str,Tuple[float,float]]]) -> L8Certificate: + inc=set(); tight={}; scopes=list(wd.keys()) + for i in range(len(scopes)): + for j in range(i+1,len(scopes)): + ba,bb=wd[scopes[i]],wd[scopes[j]] + for v in set(ba)&set(bb): + lo_a,hi_a=ba[v]; lo_b,hi_b=bb[v] + o_lo=max(lo_a,lo_b); o_hi=min(hi_a,hi_b) + if o_lo>o_hi+1e-8: inc.add(v); tight[v]=(min(lo_a,lo_b),max(hi_a,hi_b)) + elif o_hi>o_lo: tight[v]=(o_lo,o_hi) + return L8Certificate(list(inc),tight) + +class ResidualOracle: + def evaluate(self,binding:Dict[str,float],problem:Problem,hub_vars:List[str]) -> L9Certificate: + ce=problem.constraint_energy(binding) + if ceL9_CONTRIB_THRESHOLD: + de.append((bb.expr_str,contrib)) + for v in bb.syms_used: + dv[v]+=contrib*(HUB_RESIDUAL_MULT if v in hub_vars else 1.0) + else: + if mc.fast_pt is None: continue + try: + args=[binding.get(v,0.0) for v in mc.syms_used] + val=float(mc.fast_pt(*args)) + contrib=(abs(val)*mc.weight if mc.kind=="equality" + else max(0.0,(-val if mc.direction=="geq" else val))*mc.weight) + if contrib>L9_CONTRIB_THRESHOLD: + de.append((mc.expr_str,contrib)) + for v in mc.syms_used: + dv[v]+=contrib*(HUB_RESIDUAL_MULT if v in hub_vars else 1.0) + except: pass + rv=sorted(dv.keys(),key=lambda v:-dv[v])[:5] + re=[e for e,c in sorted(de,key=lambda x:-x[1])[:5]] + return L9Certificate(round(ce,6),rv,re) + +TOPOLOGY_ORACLE=TopologyOracle() +COHERENCE_ORACLE=CoherenceOracle() +RESIDUAL_ORACLE=ResidualOracle() + +# ══════════════════════════════════════════════════════════════════════════ +# SECTION 6: CORE SOLVERS (HC4, MPRT, ADAM) +# ══════════════════════════════════════════════════════════════════════════ +def _hc4(box:Box,constraints) -> Optional[Box]: + cur=dict(box) + for mc in constraints: + if getattr(mc,'weight',1.0)==0.0: continue + if mc.kind=="or_eq": + if not mc.branches: continue + valid=False + for bmc in mc.branches: + if bmc.fast_iv is None: valid=True; break + try: + if bmc.fast_iv(cur).contains_zero(): valid=True; break + except: valid=True; break + if not valid: return None + else: + if mc.fast_iv is None: continue + try: + riv=mc.fast_iv(cur) + if ((mc.kind=="equality" and not riv.contains_zero()) or + (mc.kind=="inequality" and ( + (mc.direction=="geq" and riv.hi<-1e-10) or + (mc.direction=="leq" and riv.lo>1e-10)))): + return None + except: pass + return cur + +def _global_hc4_tighten(problem:Problem) -> Dict[str,Tuple[float,float]]: + gb={v:IV(lo,hi) for v,(lo,hi) in problem.bounds.items()} + c=_hc4(gb,problem.compiled_constraints) + if c is None: return dict(problem.bounds) + ov=sum(hi-lo for lo,hi in problem.bounds.values()) + nv=sum(max(0.0,iv.hi-iv.lo) for iv in c.values()) + if ov>0 and nv-1e17 else 0.0 + tb[s]=tb.get(s,mid)*0.9+mid*0.1 + try: val=float(proj["func"](*[tb.get(s,0.0) for s in proj["syms"]])) + except: continue + if not math.isfinite(val) or abs(val)>1e6: continue + lo,hi=problem.bounds.get(v,(-1e18,1e18)) + if not(lo<=val<=hi): continue + tb[v]=val + nce=problem.constraint_energy(tb) + if nce Tuple[Dict[str,float],float]: + best_ce=problem.constraint_energy(binding); best_b=dict(binding) + active=[s for s in problem.scope_order if s!="root" and problem.scope_vars.get(s)] + l7=TOPOLOGY_ORACLE.evaluate(problem,active) + ov=[] + for sn in (l7.solve_order or []): + for v in problem.scope_vars.get(sn,[]): + if v not in ov: ov.append(v) + for v in problem.variables: + if v not in ov: ov.append(v) + for _ in range(20): + best_b,best_ce,imp=_single_snap_pass(problem,best_b,ov,best_ce,best_ce*1.15) + if not imp or best_ce Tuple[Dict[str,float],float]: + var_list=problem.variables; V=len(var_list) + lo_a=np.zeros(V,dtype=np.float32); hi_a=np.ones(V,dtype=np.float32) + for i,v in enumerate(var_list): + g_lo,g_hi=problem.bounds.get(v,(-10.0,10.0)) + lo_a[i]=max(g_lo,work_box[v].lo) if v in work_box else (g_lo if g_lo>-1e17 else -10.0) + hi_a[i]=min(g_hi,work_box[v].hi) if v in work_box else (g_hi if g_hi<1e17 else 10.0) + if lo_a[i]>=hi_a[i]: m=(lo_a[i]+hi_a[i])/2; lo_a[i]=m-1e-6; hi_a[i]=m+1e-6 + if USE_GPU: + lo_t=torch.tensor(lo_a,device=DEVICE); span_t=torch.tensor(hi_a-lo_a,device=DEVICE) + mat=lo_t.unsqueeze(0)+span_t.unsqueeze(0)*torch.rand(N,V,device=DEVICE,dtype=torch.float32) + cev=problem.constraint_energy_tensor(mat,var_list) + idx=torch.argmin(cev).item() + return {v:mat[idx,i].item() for i,v in enumerate(var_list)},cev[idx].item() + else: + span=hi_a-lo_a + mat=lo_a[np.newaxis,:]+span[np.newaxis,:]*np.random.random((N,V)).astype(np.float32) + cev=problem.constraint_energy_tensor(mat,var_list) + idx=np.argmin(cev) + return {v:float(mat[idx,i]) for i,v in enumerate(var_list)},float(cev[idx]) + +def _gradient_polish(problem:Problem,binding:Dict[str,float],steps:int=POLISH_STEPS) -> Tuple[Dict[str,float],float]: + b=dict(binding); best_ce=problem.constraint_energy(b); best_b=dict(b) + for _ in range(steps): + if best_ce=best_ce: break + best_ce=nce; b=nb; best_b=dict(b) + return best_b,best_ce + +def _adam_polish(problem:Problem,binding:Dict[str,float],steps:int=ADAM_STEPS) -> Tuple[Dict[str,float],float]: + b={v:binding.get(v,(problem.bounds[v][0]+problem.bounds[v][1])/2) for v in problem.variables} + best_ce=problem.constraint_energy(b); best_b=dict(b) + lr=0.01; b1=0.9; b2=0.999; eps=1e-8 + m={v:0.0 for v in problem.variables}; mv={v:0.0 for v in problem.variables} + for t in range(1,steps+1): + if best_ce List[Hypothesis]: + hyps = [] + hyps.extend(self._branch_hypotheses(problem, model, l9)) + hyps.extend(self._chain_hypotheses(problem, model, l7)) + hyps.extend(self._coherence_hypotheses(problem, model, l8)) + hyps.extend(self._residual_chain_hypotheses(problem, model, l9)) + if round_idx >= 3: + hyps.extend(self._manifold_tangent_hypotheses(problem, model, l9)) + hyps = [h for h in hyps if h.hid not in model.failure_patterns] + return sorted(hyps, key=lambda h: -h.confidence) + + def _branch_hypotheses(self, problem:Problem, model:StructuralModel, l9:L9Certificate) -> List[Hypothesis]: + hyps = []; ref = model.best_binding + for expr_str in l9.dominant_exprs[:4]: + mc = next((mc for mc in problem.compiled_constraints if mc.expr_str==expr_str), None) + if not mc or not mc.projections: continue + for v, proj_list in mc.projections.items(): + lo,hi = problem.bounds.get(v,(-1e18,1e18)) + for proj_idx,proj in enumerate(proj_list): + try: + args=[ref.get(s,0.0) for s in proj["syms"]] + val=float(proj["func"](*args)) + if not math.isfinite(val) or abs(val)>1e6: continue + if not(lo<=val<=hi): continue + b=dict(ref); b[v]=val + hid=f"branch_{hash(expr_str)%9999}_{v}_{proj_idx}" + hyps.append(Hypothesis(hid=hid, binding=b, h_type="branch", + claim=f"AlgBranch: '{expr_str[:25]}' → {v}={val:.4f}", + derivation=[expr_str], pinned_vars={v:val}, + free_vars=[u for u in problem.variables if u!=v], + confidence=0.85 if proj_idx==0 else 0.70)) + except: pass + return hyps + + def _chain_hypotheses(self, problem:Problem, model:StructuralModel, l7:L7Certificate) -> List[Hypothesis]: + if not l7.solve_order: return [] + hyps = []; tb = _global_hc4_tighten(problem) + b = dict(model.best_binding); chain_steps = []; scope_bindings = {} + for sn in l7.solve_order: + svars = problem.scope_vars.get(sn, []) + if not svars: continue + scope_mcs = [problem.compiled_constraints[i] for i in problem.scope_groups.get(sn, [])] + if not scope_mcs: continue + start_box = {v: IV(tb.get(v,problem.bounds[v])[0] if isinstance(tb.get(v,(0,1)),tuple) else problem.bounds[v][0], + tb.get(v,problem.bounds[v])[1] if isinstance(tb.get(v,(0,1)),tuple) else problem.bounds[v][1]) + for v in svars if v in problem.bounds} + cont = _hc4(start_box, scope_mcs) + if cont: + for v, iv in cont.items(): + b[v] = iv.mid(); span = problem.bounds[v][1]-problem.bounds[v][0] + tb[v] = (max(problem.bounds[v][0], iv.mid()-span*0.05), + min(problem.bounds[v][1], iv.mid()+span*0.05)) + chain_steps.append(sn) + if chain_steps: + hyps.append(Hypothesis(hid=f"chain_{'_'.join(c[:4] for c in chain_steps)}", + binding=b, h_type="chain", claim=f"TopologyChain({' → '.join(chain_steps)})", + derivation=chain_steps, pinned_vars={v: b[v] for sn in chain_steps for v in problem.scope_vars.get(sn, []) if v in b}, + free_vars=[v for v in problem.variables if not any(v in problem.scope_vars.get(sn,[]) for sn in chain_steps)], + confidence=0.80 if len(chain_steps)==len(l7.solve_order) else 0.55)) + return hyps + + def _coherence_hypotheses(self, problem:Problem, model:StructuralModel, l8:L8Certificate) -> List[Hypothesis]: + if not l8.incoherent_vars: return [] + b = dict(model.best_binding) + for v, (lo, hi) in l8.tightened_bounds.items(): + if v in problem.bounds: b[v] = (lo+hi)/2.0 + return [Hypothesis(hid=f"coherence_{hash(tuple(sorted(l8.incoherent_vars)))%9999}", + binding=b, h_type="coherence", claim=f"CoherenceMeet({', '.join(list(l8.incoherent_vars)[:3])})", + derivation=list(l8.incoherent_vars), pinned_vars={v: b[v] for v in l8.incoherent_vars if v in b}, + free_vars=[v for v in problem.variables if v not in l8.incoherent_vars], confidence=0.70)] + + def _residual_chain_hypotheses(self, problem:Problem, model:StructuralModel, l9:L9Certificate) -> List[Hypothesis]: + hyps = []; b = dict(model.best_binding); chain = []; depth = 0 + if not l9.dominant_vars: return hyps + for target_var in l9.dominant_vars[:BRANCH_DEPTH]: + if depth >= BRANCH_DEPTH: break + for mc in problem.compiled_constraints: + if target_var not in mc.syms_used or target_var not in mc.projections: continue + for proj in mc.projections[target_var]: + try: + val=float(proj["func"](*[b.get(s,0.0) for s in proj["syms"]])) + lo,hi=problem.bounds.get(target_var,(-1e18,1e18)) + if not(lo<=val<=hi) or not math.isfinite(val): continue + b[target_var]=val; chain.append(f"{target_var}={val:.3f}"); depth+=1; break + except: pass + if depth>len(chain)-1: break + if chain: + hyps.append(Hypothesis(hid=f"rchain_{hash(str(chain))%9999}", + binding=b, h_type="residual_chain", claim=f"ResidualChain({' → '.join(chain)})", + derivation=l9.dominant_exprs[:len(chain)], pinned_vars={v: b[v] for v in l9.dominant_vars[:len(chain)] if v in b}, + free_vars=[v for v in problem.variables if v not in l9.dominant_vars[:len(chain)]], confidence=0.65)) + return hyps + + def _manifold_tangent_hypotheses(self, problem:Problem, model:StructuralModel, l9:L9Certificate) -> List[Hypothesis]: + hyps = []; b = model.best_binding + eq_mcs=[mc for mc in problem.compiled_constraints if mc.kind=="equality" and mc.fast_jac] + if not eq_mcs: return hyps + nv=len(problem.variables); J=np.zeros((len(eq_mcs),nv)) + for i,mc in enumerate(eq_mcs): + for j,v in enumerate(problem.variables): + if v not in mc.fast_jac: continue + jac=mc.fast_jac[v] + try: J[i,j]=float(jac["func"](*[b.get(s,0.0) for s in jac["syms"]])) + except: pass + try: + _,sv,Vt=np.linalg.svd(J,full_matrices=True) + rank=np.sum(sv>1e-8) + if rank>=Vt.shape[0]: return hyps + null_dirs=Vt[rank:] + for di,null_dir in enumerate(null_dirs[:2]): + for step in MANIFOLD_STEPS: + nb={} + for j,v in enumerate(problem.variables): + lo,hi=problem.bounds[v] + nb[v]=max(lo,min(hi,b.get(v,0.0)+step*null_dir[j])) + hyps.append(Hypothesis(hid=f"manifold_d{di}_s{step}", + binding=nb, h_type="manifold", claim=f"ManifoldTangent(dir={di},step={step:.2f})", + derivation=[f"null_space_dir_{di}"], pinned_vars={}, free_vars=problem.variables, confidence=0.45)) + except: pass + return hyps + +def _test_hypothesis(problem:Problem, hyp:Hypothesis) -> Tuple[Dict[str,float],float]: + b = dict(hyp.binding) + for v,(lo,hi) in problem.bounds.items(): + if v not in b: b[v]=(lo+hi)/2.0 + start_box = {} + for v in problem.variables: + lo,hi = problem.bounds[v] + cv = b.get(v,(lo+hi)/2.0) + half = max(1e-6, (hi-lo)*0.001) if v in hyp.pinned_vars else (hi-lo)*0.05 + start_box[v] = IV(max(lo,cv-half), min(hi,cv+half)) + contracted = _hc4(start_box, problem.compiled_constraints) + if contracted: + for v,iv in contracted.items(): + if v in problem.bounds: b[v]=iv.mid() + snap_b, snap_ce = _algebraic_snap(problem, b) + if snap_ce < problem.constraint_energy(b): b=snap_b + best_ce = problem.constraint_energy(b); best_b = dict(b) + if best_ce < 2.0 and best_ce > SOLVE_THRESHOLD: + gb, gce = _gradient_polish(problem, b) + if gce < best_ce: best_ce=gce; best_b=gb; b=gb + if best_ce < 1.0 and best_ce > SOLVE_THRESHOLD: + ab, ace = _adam_polish(problem, b) + if ace < best_ce: best_ce=ace; best_b=ab + return best_b, best_ce + +def _explore_new_basin(problem:Problem, model:StructuralModel) -> Tuple[Dict[str,float],float]: + tb = _global_hc4_tighten(problem); work_box = {} + for v,(lo,hi) in problem.bounds.items(): + if v in model.confirmed_ranges: + cr_lo,cr_hi = model.confirmed_ranges[v] + work_box[v] = IV(tb.get(v,(lo,hi))[0], tb.get(v,(lo,hi))[1]) if (cr_hi-cr_lo < (hi-lo)*0.8) else IV(lo,hi) + else: + work_box[v] = IV(tb.get(v,(lo,hi))[0], tb.get(v,(lo,hi))[1]) + b, ce = _mprt_sample(problem, work_box, N_MPRT_EXPLORE) + snap_b, snap_ce = _algebraic_snap(problem, b) + return (snap_b, snap_ce) if snap_ce < ce else (b, ce) + +def solve_by_hypothesis(problem:Problem, budget:int=TOTAL_BUDGET) -> Dict: + t0 = time.time() + init_b = {v: max(lo,min(hi,(lo+hi)/2+((hi-lo)*0.0117))) for v,(lo,hi) in problem.bounds.items()} + tb = _global_hc4_tighten(problem) + full_box = {v:IV(tb[v][0],tb[v][1]) for v in problem.variables if v in tb} + mprt_b, mprt_ce = _mprt_sample(problem, full_box, N_MPRT_EXPLORE//5) + if mprt_ce < problem.constraint_energy(init_b): init_b = mprt_b + snap_b, snap_ce = _algebraic_snap(problem, init_b) + if snap_ce < problem.constraint_energy(init_b): init_b=snap_b + + model = StructuralModel(best_binding=init_b, best_ce=problem.constraint_energy(init_b)) + active_scopes=[s for s in problem.scope_order if s!="root" and problem.scope_vars.get(s)] + + if model.best_ce < SOLVE_THRESHOLD: + l7=TOPOLOGY_ORACLE.evaluate(problem,active_scopes) + l9=RESIDUAL_ORACLE.evaluate(model.best_binding,problem,l7.hub_vars) + return {"binding":model.best_binding,"ce":model.best_ce,"solved":True, + "l7":l7,"l9":l9,"time":round(time.time()-t0,3)} + + consecutive_no_improvement = 0 + explore_trigger = 5 + oracle = HypothesisOracle() + + for round_idx in range(HYPOTHESIS_MAX_ROUNDS): + if model.best_ce < SOLVE_THRESHOLD: break + l7 = TOPOLOGY_ORACLE.evaluate(problem, active_scopes) + l8 = COHERENCE_ORACLE.evaluate(model.scope_witnesses) + l9 = RESIDUAL_ORACLE.evaluate(model.best_binding, problem, l7.hub_vars) + + hypotheses = oracle.generate(problem, model, l7, l8, l9, round_idx) + round_improved = False + + for hyp in hypotheses[:MAX_HYPS_PER_ROUND]: + if model.best_ce < SOLVE_THRESHOLD: break + tested_b, tested_ce = _test_hypothesis(problem, hyp) + if tested_ce < model.best_ce: + model.best_ce = tested_ce; model.best_binding = dict(tested_b) + round_improved = True; consecutive_no_improvement = 0 + for sn in active_scopes: + sv = problem.scope_vars.get(sn,[]) + scope_b = {v:tested_b[v] for v in sv if v in tested_b} + if scope_b: + model.scope_witnesses[sn] = {v:(val-1e-3,val+1e-3) for v,val in scope_b.items()} + else: + model.failure_patterns.add(hyp.hid) + + if not round_improved: + consecutive_no_improvement += 1 + if consecutive_no_improvement >= explore_trigger or len(hypotheses)==0: + explore_b, explore_ce = _explore_new_basin(problem, model) + if explore_ce < model.best_ce: + model.best_ce = explore_ce; model.best_binding = dict(explore_b) + consecutive_no_improvement = 0 + + if SOLVE_THRESHOLD < model.best_ce < 2.0: + gb, gce = _gradient_polish(problem, model.best_binding) + if gce < model.best_ce: model.best_ce=gce; model.best_binding=gb + if SOLVE_THRESHOLD < model.best_ce < 1.0: + ab, ace = _adam_polish(problem, model.best_binding) + if ace < model.best_ce: model.best_ce=ace; model.best_binding=ab + + final_l7 = TOPOLOGY_ORACLE.evaluate(problem, active_scopes) + final_l9 = RESIDUAL_ORACLE.evaluate(model.best_binding, problem, final_l7.hub_vars) + + return {"binding":model.best_binding,"ce":model.best_ce,"solved":model.best_ce bool: for pair in AXIOM_CONTRADICTIONS: - if pair.issubset(self.axioms): - return False + if pair.issubset(self.axioms): return False return True def __str__(self): - return (f"[{self.name}|{self.derivation}|conf={self.confidence:.2f}]" - f" : {{{', '.join(sorted(self.axioms))}}}") + return f"[{self.name}|{self.derivation}|conf={self.confidence:.2f}] : {{{', '.join(sorted(self.axioms))}}}" -# ══════════════════════════════════════════════════════════════════════════ -# SECTION 2: DOMAIN LIBRARY -# ══════════════════════════════════════════════════════════════════════════ BASE_LIBRARY = [ - AbstractDomain("GraphSpace", - {Axiom.DISCRETE, Axiom.NETWORK, Axiom.SYMMETRIC, Axiom.ATOMIC}), - AbstractDomain("MetricPacking", - {Axiom.DISCRETE, Axiom.METRIC, Axiom.INJECTIVE, Axiom.CONSERVED}), - AbstractDomain("FluidContinuum", - {Axiom.CONTINUOUS, Axiom.METRIC, Axiom.SURJECTIVE, - Axiom.CONSERVED, Axiom.MUTABLE}), - AbstractDomain("SequentialChain", - {Axiom.DISCRETE, Axiom.ORDERED, Axiom.TRANSITIVE, - Axiom.BILINEAR, Axiom.ATOMIC}), - AbstractDomain("HierarchicalTree", - {Axiom.DISCRETE, Axiom.NETWORK, Axiom.ORDERED, Axiom.COMPOSITE}), - AbstractDomain("NormManifold", - {Axiom.CONTINUOUS, Axiom.QUADRATIC, Axiom.CONSERVED, - Axiom.METRIC, Axiom.SYMMETRIC}), - AbstractDomain("BilinearCoupling", - {Axiom.CONTINUOUS, Axiom.BILINEAR, Axiom.NETWORK, - Axiom.CONSERVED, Axiom.MUTABLE}), - AbstractDomain("DiscreteSchedule", - {Axiom.DISCRETE, Axiom.ORDERED, Axiom.INJECTIVE, - Axiom.CONSERVED, Axiom.ATOMIC}), - AbstractDomain("PhaseJump", - {Axiom.DISCRETE, Axiom.MUTABLE, Axiom.SYMMETRIC, - Axiom.NETWORK, Axiom.ATOMIC}), + AbstractDomain("GraphSpace", {Axiom.DISCRETE, Axiom.NETWORK, Axiom.SYMMETRIC, Axiom.ATOMIC}), + AbstractDomain("MetricPacking", {Axiom.DISCRETE, Axiom.METRIC, Axiom.INJECTIVE, Axiom.CONSERVED}), + AbstractDomain("FluidContinuum", {Axiom.CONTINUOUS, Axiom.METRIC, Axiom.SURJECTIVE, Axiom.CONSERVED, Axiom.MUTABLE}), + AbstractDomain("SequentialChain", {Axiom.DISCRETE, Axiom.ORDERED, Axiom.TRANSITIVE, Axiom.BILINEAR, Axiom.ATOMIC}), + AbstractDomain("HierarchicalTree", {Axiom.DISCRETE, Axiom.NETWORK, Axiom.ORDERED, Axiom.COMPOSITE}), + AbstractDomain("NormManifold", {Axiom.CONTINUOUS, Axiom.QUADRATIC, Axiom.CONSERVED, Axiom.METRIC, Axiom.SYMMETRIC}), + AbstractDomain("BilinearCoupling", {Axiom.CONTINUOUS, Axiom.BILINEAR, Axiom.NETWORK, Axiom.CONSERVED, Axiom.MUTABLE}), + AbstractDomain("DiscreteSchedule", {Axiom.DISCRETE, Axiom.ORDERED, Axiom.INJECTIVE, Axiom.CONSERVED, Axiom.ATOMIC}), + AbstractDomain("PhaseJump", {Axiom.DISCRETE, Axiom.MUTABLE, Axiom.SYMMETRIC, Axiom.NETWORK, Axiom.ATOMIC}), ] -# ══════════════════════════════════════════════════════════════════════════ -# SECTION 3: WISDOM OPERATIONS (Domain Algebra) -# ══════════════════════════════════════════════════════════════════════════ class WisdomOps: @staticmethod def union(d1: AbstractDomain, d2: AbstractDomain) -> AbstractDomain: merged = d1.axioms | d2.axioms - candidate = AbstractDomain( - f"{d1.name}∪{d2.name}", merged, - derivation="Union", - confidence=(d1.confidence + d2.confidence) / 2) - # Remove contradictions — keep the one with more axiom support + candidate = AbstractDomain(f"{d1.name}∪{d2.name}", merged, derivation="Union", confidence=(d1.confidence + d2.confidence) / 2) for pair in AXIOM_CONTRADICTIONS: if pair.issubset(candidate.axioms): - # Keep whichever axiom appears in more base domains - counts = {ax: sum(ax in d.axioms for d in BASE_LIBRARY) - for ax in pair} - remove = min(counts, key=counts.get) - candidate.axioms.discard(remove) + counts = {ax: sum(ax in d.axioms for d in BASE_LIBRARY) for ax in pair} + candidate.axioms.discard(min(counts, key=counts.get)) return candidate @staticmethod def relax(d: AbstractDomain, axioms: Set[str]) -> AbstractDomain: - return AbstractDomain( - f"{d.name}_Relaxed", d.axioms - axioms, - derivation=f"Relax({','.join(sorted(axioms))})", - confidence=d.confidence * 0.85) + return AbstractDomain(f"{d.name}_Relaxed", d.axioms - axioms, derivation=f"Relax({','.join(sorted(axioms))})", confidence=d.confidence * 0.85) @staticmethod def inject(d: AbstractDomain, axioms: Set[str]) -> AbstractDomain: merged = d.axioms | axioms - candidate = AbstractDomain( - f"{d.name}+{','.join(sorted(axioms))}", merged, - derivation=f"Inject({','.join(sorted(axioms))})", - confidence=d.confidence * 0.9) - # Resolve any new contradictions + candidate = AbstractDomain(f"{d.name}+{','.join(sorted(axioms))}", merged, derivation=f"Inject({','.join(sorted(axioms))})", confidence=d.confidence * 0.9) for pair in AXIOM_CONTRADICTIONS: if pair.issubset(candidate.axioms): - to_remove = (pair - d.axioms) # remove the newly added one - if to_remove: - candidate.axioms -= to_remove + to_remove = (pair - d.axioms) + if to_remove: candidate.axioms -= to_remove return candidate @staticmethod def dual(d: AbstractDomain) -> AbstractDomain: flipped = set() - flip_map = { - Axiom.DISCRETE: Axiom.CONTINUOUS, - Axiom.CONTINUOUS: Axiom.DISCRETE, - Axiom.INJECTIVE: Axiom.SURJECTIVE, - Axiom.SURJECTIVE: Axiom.INJECTIVE, - Axiom.ORDERED: Axiom.SYMMETRIC, - Axiom.SYMMETRIC: Axiom.ORDERED, - Axiom.CONSERVED: Axiom.MUTABLE, - Axiom.MUTABLE: Axiom.CONSERVED, - Axiom.ATOMIC: Axiom.COMPOSITE, - Axiom.COMPOSITE: Axiom.ATOMIC, - } - for ax in d.axioms: - flipped.add(flip_map.get(ax, ax)) - return AbstractDomain( - f"{d.name}*", flipped, - derivation="Dual", - confidence=d.confidence * 0.7) + flip_map = {Axiom.DISCRETE: Axiom.CONTINUOUS, Axiom.CONTINUOUS: Axiom.DISCRETE, + Axiom.INJECTIVE: Axiom.SURJECTIVE, Axiom.SURJECTIVE: Axiom.INJECTIVE, + Axiom.ORDERED: Axiom.SYMMETRIC, Axiom.SYMMETRIC: Axiom.ORDERED, + Axiom.CONSERVED: Axiom.MUTABLE, Axiom.MUTABLE: Axiom.CONSERVED, + Axiom.ATOMIC: Axiom.COMPOSITE, Axiom.COMPOSITE: Axiom.ATOMIC} + for ax in d.axioms: flipped.add(flip_map.get(ax, ax)) + return AbstractDomain(f"{d.name}*", flipped, derivation="Dual", confidence=d.confidence * 0.7) -# ══════════════════════════════════════════════════════════════════════════ -# SECTION 4: PROBLEM SPACE & WITNESS -# ══════════════════════════════════════════════════════════════════════════ @dataclass class ProblemSpace: description: str axioms: Set[str] variables: List[str] - n_vars: int = 0 - scope_hints: List[str] = field(default_factory=list) - - def __post_init__(self): - if self.n_vars == 0: - self.n_vars = len(self.variables) @dataclass class WitnessReport: - """Parsed feedback from the numerical engine.""" ce: float solved: bool dominant_exprs: List[str] hub_vars: List[str] topology: str binding: Dict[str, float] - # Inferred axiom evidence from the witness failing_axioms: Set[str] = field(default_factory=set) confirmed_axioms: Set[str] = field(default_factory=set) - def parse_axiom_evidence(self): - """ - Read the dominant residual expressions and infer which axioms - were wrong or missing. This is the key feedback mechanism. - """ + def parse_axiom_evidence(self, prev_ce: float = None): for expr in self.dominant_exprs: + if expr.startswith("ERROR:"): continue expr_lower = expr.lower() for axiom, signatures in AXIOM_CONSTRAINT_SIGNATURES.items(): if any(sig in expr_lower for sig in signatures): self.failing_axioms.add(axiom) - # Topology evidence - if self.topology == "mesh": - self.confirmed_axioms.add(Axiom.NETWORK) - elif self.topology == "chain": - self.confirmed_axioms.add(Axiom.ORDERED) - elif self.topology == "hub": - self.confirmed_axioms.add(Axiom.NETWORK) - - # Binding evidence - vals = list(self.binding.values()) - if vals: - if all(abs(v) <= 1.1 for v in vals): - self.confirmed_axioms.add(Axiom.CONTINUOUS) - if len(set(round(v, 1) for v in vals)) == len(vals): - self.confirmed_axioms.add(Axiom.INJECTIVE) - total = sum(vals) - if abs(total - round(total)) < 0.1: - self.confirmed_axioms.add(Axiom.CONSERVED) + topo_map = {"mesh": Axiom.NETWORK, "chain": Axiom.ORDERED, "hub": Axiom.NETWORK} + if self.topology in topo_map: self.confirmed_axioms.add(topo_map[self.topology]) + + if self.ce < 900.0: + vals = list(self.binding.values()) + if vals: + if all(abs(v) <= 1.1 for v in vals): self.confirmed_axioms.add(Axiom.CONTINUOUS) + if abs(sum(vals) - round(sum(vals))) < 0.15: self.confirmed_axioms.add(Axiom.CONSERVED) + if any("**2" in e for e in self.dominant_exprs): self.confirmed_axioms.add(Axiom.QUADRATIC) + + if prev_ce is not None and self.ce < (prev_ce * 0.9): + self.confirmed_axioms.add(Axiom.BILINEAR) # ══════════════════════════════════════════════════════════════════════════ -# SECTION 5: PSL TRANSLATOR (Abstract → Concrete) +# SECTION 9: TRANSLATOR & DEBATE LOOP # ══════════════════════════════════════════════════════════════════════════ class PSLTranslator: - """ - Translates AbstractDomain + ProblemSpace into PSL text. - Templates are richer than 1.0 — each axiom combination - generates domain-appropriate constraint structures. - """ - - def compile(self, domain: AbstractDomain, - problem: ProblemSpace) -> str: - ax = domain.axioms - vars_ = problem.variables - n = len(vars_) - lines = [f"SPACE {domain.name.replace(' ','_')}"] - - # Variable bounds — derived from topology + def compile(self, domain: AbstractDomain, problem: ProblemSpace) -> str: + ax = domain.axioms; vars_ = problem.variables; n = len(vars_) + lines = [f"SPACE wisdom_{domain.name.replace(' ','_')}"] + if Axiom.CONTINUOUS in ax and Axiom.QUADRATIC in ax: - # Norm manifold: bounded in [-1,1] - for v in vars_: lines.append(f" VAR {v} -1 1") + for v in vars_: lines.append(f" VAR {v} -1.0 1.0") elif Axiom.CONTINUOUS in ax: - for v in vars_: lines.append(f" VAR {v} -10 10") - elif Axiom.DISCRETE in ax and Axiom.ORDERED in ax: - for v in vars_: lines.append(f" VAR {v} -5 5") + for v in vars_: lines.append(f" VAR {v} -10.0 10.0") + elif Axiom.ORDERED in ax: + for v in vars_: lines.append(f" VAR {v} -5.0 5.0") else: - for v in vars_: lines.append(f" VAR {v} 0 3") + for v in vars_: lines.append(f" VAR {v} 0.0 3.0") scope_idx = 0 - - # CONSERVED: global sum constraint if Axiom.CONSERVED in ax and n >= 2: - target = n # natural conservation target - lines.append(f" SCOPE conservation_{scope_idx}") + lines.append(f" SCOPE conserved_{scope_idx}") lines.append(f" VARS {' '.join(vars_)}") - lines.append(f" EQ {' + '.join(vars_)} - {target}") - lines.append(" END") - scope_idx += 1 + lines.append(f" EQ {' + '.join(vars_)} - {float(n):.1f}") + lines.append(" END"); scope_idx += 1 - # QUADRATIC + CONSERVED: norm sphere (quantum/physics) - if Axiom.QUADRATIC in ax and Axiom.CONSERVED in ax: + if Axiom.QUADRATIC in ax and Axiom.CONSERVED in ax and n >= 2: quadsum = " + ".join(f"{v}**2" for v in vars_) - lines.append(f" SCOPE norm_constraint_{scope_idx}") + lines.append(f" SCOPE norm_{scope_idx}") lines.append(f" VARS {' '.join(vars_)}") lines.append(f" EQ {quadsum} - {n * 0.2:.1f}") - lines.append(" END") - scope_idx += 1 - - # BILINEAR + NETWORK: pairwise coupling scopes - if Axiom.BILINEAR in ax and Axiom.NETWORK in ax: - chunk = max(2, n // 4) - for i in range(0, n - 1, chunk): - group = vars_[i:i + chunk + 1] - if len(group) < 2: continue - lines.append(f" SCOPE bilinear_group_{scope_idx}") - lines.append(f" VARS {' '.join(group)}") - for j in range(len(group) - 1): - lines.append( - f" EQ {group[j]}**2 + {group[j+1]}**2 - {chunk}") - lines.append( - f" EQ {group[j]}*{group[j+1]} - {group[j]} - 1") - lines.append(" END") - scope_idx += 1 - - # ORDERED: sequential chain with bilinear coupling - if Axiom.ORDERED in ax and Axiom.BILINEAR in ax: - chunk = 3 - for i in range(0, n - 1, chunk - 1): - group = vars_[i:i + chunk] + lines.append(" END"); scope_idx += 1 + + if Axiom.BILINEAR in ax and n >= 2: + window = min(3, n) + for i in range(0, n - 1, max(1, window - 1)): + group = vars_[i:i + window] if len(group) < 2: continue - lines.append(f" SCOPE chain_{scope_idx}") + lines.append(f" SCOPE bilinear_{scope_idx}") lines.append(f" VARS {' '.join(group)}") for j in range(len(group) - 1): - lines.append( - f" EQ {group[j]}*{group[j+1]} - {group[j]} - 1") - lines.append(" END") - scope_idx += 1 - # Global sum coupling - lines.append(f" EQ {' + '.join(vars_[::2])} - {n // 2}") - - # METRIC + INJECTIVE: pairwise separation (no collisions) - if Axiom.METRIC in ax and Axiom.INJECTIVE in ax: - lines.append(f" SCOPE separation_{scope_idx}") - lines.append(f" VARS {' '.join(vars_)}") - for i, j in itertools.combinations(range(min(n, 6)), 2): - lines.append( - f" GEQ ({vars_[i]} - {vars_[j]})**2 - 0.25") - lines.append(" END") - scope_idx += 1 - - # INJECTIVE + DISCRETE: exclusivity (scheduling) - if (Axiom.INJECTIVE in ax and Axiom.DISCRETE in ax - and Axiom.METRIC not in ax): - lines.append(f" SCOPE exclusivity_{scope_idx}") - lines.append(f" VARS {' '.join(vars_)}") - for i, j in itertools.combinations(range(min(n, 5)), 2): - lines.append(f" EQ {vars_[i]} * {vars_[j]}") - lines.append(" END") - scope_idx += 1 + lines.append(f" EQ {group[j]}**2 + {group[j+1]}**2 - {window:.1f}") + lines.append(f" EQ {group[j]}*{group[j+1]} - {group[j]} - 1.0") + lines.append(" END"); scope_idx += 1 - # ORDERED only: causal precedence if Axiom.ORDERED in ax and Axiom.BILINEAR not in ax: - lines.append(f" SCOPE ordering_{scope_idx}") + lines.append(f" SCOPE ordered_{scope_idx}") lines.append(f" VARS {' '.join(vars_)}") - for i in range(n - 1): - lines.append(f" GEQ {vars_[i+1]} - {vars_[i]}") - lines.append(" END") - scope_idx += 1 - - # SYMMETRIC + NETWORK: undirected coupling pairs - if Axiom.SYMMETRIC in ax and Axiom.NETWORK in ax: - lines.append(f" SCOPE symmetric_links_{scope_idx}") + for i in range(n - 1): lines.append(f" GEQ {vars_[i+1]} - {vars_[i]}") + lines.append(" END"); scope_idx += 1 + + if Axiom.METRIC in ax and Axiom.INJECTIVE in ax and n >= 2: + lines.append(f" SCOPE separation_{scope_idx}") + lines.append(f" VARS {' '.join(vars_[:min(6, n)])}") + for i, j in itertools.combinations(range(min(6, n)), 2): + lines.append(f" GEQ ({vars_[i]} - {vars_[j]})**2 - 0.25") + lines.append(" END"); scope_idx += 1 + + if Axiom.SYMMETRIC in ax and Axiom.NETWORK in ax and n >= 2: + lines.append(f" SCOPE symmetric_{scope_idx}") lines.append(f" VARS {' '.join(vars_)}") - for i in range(0, n - 1, 2): - lines.append( - f" EQ {vars_[i]} + {vars_[i+1]} - {2.0:.1f}") - lines.append(" END") - scope_idx += 1 + for i in range(0, n - 1, 2): lines.append(f" EQ {vars_[i]} + {vars_[i+1]} - 2.0") + lines.append(" END"); scope_idx += 1 + + if Axiom.ORDERED in ax and Axiom.BILINEAR in ax and n >= 4: + odds = vars_[::2] + lines.append(f" EQ {' + '.join(odds)} - {float(len(odds)):.1f}") lines.append("END") return "\n".join(lines) -# ══════════════════════════════════════════════════════════════════════════ -# SECTION 6: DEBATE ENGINE (Proposer ↔ Critic, N rounds) -# ══════════════════════════════════════════════════════════════════════════ @dataclass class DebateRound: - round_idx: int - domain: AbstractDomain - psl_text: str - ce: float - solved: bool - witness: Optional[WitnessReport] - action: str # what the proposer did this round + round_idx: int; domain: AbstractDomain; psl_text: str; ce: float; solved: bool; witness: Optional[WitnessReport]; action: str class WisdomDebate: - """ - Runs the full Proposer/Critic/Translate/Solve/Feedback loop. - """ MAX_WISDOM_ROUNDS = 6 def __init__(self, library: List[AbstractDomain] = None): @@ -406,381 +1161,119 @@ class WisdomDebate: self.best_binding: Dict[str, float] = {} self.best_domain: Optional[AbstractDomain] = None - # ── Critic: pure set difference scoring ────────────────────────────── - def _critic_score(self, problem: ProblemSpace, - domain: AbstractDomain) -> Tuple[float, Set, Set]: - unmapped = problem.axioms - domain.axioms - false_asm = domain.axioms - problem.axioms - # Weight: unmapped is worse than false assumptions + def _critic_score(self, problem: ProblemSpace, domain: AbstractDomain) -> Tuple[float, Set, Set]: + unmapped = problem.axioms - domain.axioms; false_asm = domain.axioms - problem.axioms score = len(unmapped) * 2.0 + len(false_asm) * 1.0 - # Penalise internally inconsistent domains - if not domain.is_consistent(): - score += 10.0 - # Reward confidence + if not domain.is_consistent(): score += 10.0 score -= (domain.confidence - 1.0) * 0.5 return score, unmapped, false_asm - # ── Proposer: generate candidate hypotheses ─────────────────────────── - def _generate_candidates(self, - current: AbstractDomain, - witness: Optional[WitnessReport], - round_idx: int) -> List[Tuple[AbstractDomain, str]]: - candidates = [] - - # Always include all base library domains - for d in self.library: - candidates.append((d, "Library")) + def _generate_candidates(self, current: AbstractDomain, witness: Optional[WitnessReport], round_idx: int) -> List[Tuple[AbstractDomain, str]]: + candidates = [(d, "Library") for d in self.library] + for d1, d2 in itertools.combinations(self.library[:5], 2): candidates.append((WisdomOps.union(d1, d2), "Union")) + for d in self.library[:4]: candidates.append((WisdomOps.dual(d), "Dual")) - # Pairwise unions of base domains - for d1, d2 in itertools.combinations(self.library[:5], 2): - candidates.append((WisdomOps.union(d1, d2), "Union")) - - # Duals of base domains - for d in self.library[:4]: - candidates.append((WisdomOps.dual(d), "Dual")) - - # Witness-guided mutations of current domain if witness and not witness.solved: - witness.parse_axiom_evidence() - - # Relax axioms that are causing failures - if witness.failing_axioms: - relaxed = WisdomOps.relax(current, witness.failing_axioms) - candidates.append((relaxed, f"Relax({witness.failing_axioms})")) - - # Inject confirmed axioms that are missing + if witness.failing_axioms: candidates.append((WisdomOps.relax(current, witness.failing_axioms), f"Relax({witness.failing_axioms})")) missing = witness.confirmed_axioms - current.axioms - if missing: - injected = WisdomOps.inject(current, missing) - candidates.append((injected, f"Inject({missing})")) - - # If CE is very high, try dual - if witness.ce > 5.0: - candidates.append( - (WisdomOps.dual(current), "Dual(high-CE escape)")) - - # If CE is low but unsolved, inject QUADRATIC for manifold snap - if witness.ce < 1.0: - snap = WisdomOps.inject( - current, {Axiom.QUADRATIC, Axiom.METRIC}) - candidates.append((snap, "Inject(quadratic-snap)")) - - # Rotating random mutation for diversity - if round_idx > 2: - mutation_pool = [Axiom.BILINEAR, Axiom.QUADRATIC, - Axiom.CONSERVED, Axiom.METRIC, Axiom.ORDERED] - chosen = random.sample(mutation_pool, 2) - mutated = WisdomOps.inject(current, set(chosen)) - candidates.append((mutated, f"RandomInject({chosen})")) + if missing: candidates.append((WisdomOps.inject(current, missing), f"Inject({missing})")) + if witness.ce > 5.0: candidates.append((WisdomOps.dual(current), "Dual(high-CE escape)")) + if witness.ce < 1.0: candidates.append((WisdomOps.inject(current, {Axiom.QUADRATIC, Axiom.METRIC}), "Inject(quadratic-snap)")) + if round_idx > 2: + chosen = random.sample([Axiom.BILINEAR, Axiom.QUADRATIC, Axiom.CONSERVED, Axiom.METRIC, Axiom.ORDERED], 2) + candidates.append((WisdomOps.inject(current, set(chosen)), f"RandomInject({chosen})")) return candidates - # ── Main debate loop ────────────────────────────────────────────────── - def debate(self, problem: ProblemSpace, - solver_fn) -> Tuple[Dict[str, float], float, List[DebateRound]]: - """ - solver_fn: callable(psl_text: str) -> WitnessReport - Returns best binding, best CE, and full debate history. - """ - current_domain = self._initial_domain(problem) + def debate(self, problem: ProblemSpace, solver_fn) -> Tuple[Dict[str, float], float, List[DebateRound]]: + current_domain = min(self.library, key=lambda d: self._critic_score(problem, d)[0]) current_witness = None - self.history = [] - self.best_ce = float('inf') - self.best_binding = {} - self.best_domain = current_domain + self.history = []; self.best_ce = float('inf'); self.best_binding = {}; self.best_domain = current_domain - print(f"\n{'═'*60}") - print(f"WISDOM DEBATE: {problem.description}") - print(f"Problem axioms: {problem.axioms}") - print(f"{'═'*60}") + print(f"\n{'═'*60}\nWISDOM DEBATE: {problem.description}\nProblem axioms: {problem.axioms}\n{'═'*60}") for round_idx in range(self.MAX_WISDOM_ROUNDS): - # Proposer generates candidates - candidates = self._generate_candidates( - current_domain, current_witness, round_idx) - - # Critic scores each - scored = [] - for domain, action in candidates: - score, unmapped, false_asm = self._critic_score( - problem, domain) - scored.append((score, domain, action, unmapped, false_asm)) + candidates = self._generate_candidates(current_domain, current_witness, round_idx) + scored = [(self._critic_score(problem, d)[0], d, act, *self._critic_score(problem, d)[1:]) for d, act in candidates] scored.sort(key=lambda x: x[0]) - # Pick winner — but avoid repeating a domain we already tried tried_names = {r.domain.name for r in self.history} - winner_score, winner_domain, action, unmapped, false_asm = None, None, None, None, None - for score, domain, act, um, fa in scored: - if domain.name not in tried_names: - winner_score = score - winner_domain = domain - action = act - unmapped = um - false_asm = fa - break - if winner_domain is None: - winner_score, winner_domain, action, unmapped, false_asm = scored[0] - - print(f"\n[R{round_idx+1}] Proposer selects: {winner_domain.name}") - print(f" Action: {action}") - print(f" Critic score: {winner_score:.1f} " - f"(unmapped={unmapped}, false={false_asm})") - - # Translate to PSL - psl_text = self.translator.compile(winner_domain, problem) - print(f" Compiled PSL: {psl_text.count('EQ') + psl_text.count('GEQ')} constraints") + winner_score, winner_domain, action, unmapped, false_asm = next((s for s in scored if s[1].name not in tried_names), scored[0]) - # Run numerical solver + print(f"\n[R{round_idx+1}] Proposer selects: {winner_domain.name}\n Action: {action}\n Critic score: {winner_score:.1f} (unmapped={unmapped}, false={false_asm})") + + psl_text = self.translator.compile(winner_domain, problem) witness = solver_fn(psl_text, problem.variables) + prev_ce = self.history[-1].witness.ce if self.history else None + witness.parse_axiom_evidence(prev_ce=prev_ce) - print(f" CE={witness.ce:.5f} | " - f"Solved={witness.solved} | " - f"Topology={witness.topology}") - - # Record round - round_rec = DebateRound( - round_idx=round_idx, - domain=winner_domain, - psl_text=psl_text, - ce=witness.ce, - solved=witness.solved, - witness=witness, - action=action) - self.history.append(round_rec) - - # Update best - if witness.ce < self.best_ce: - self.best_ce = witness.ce - self.best_binding = dict(witness.binding) - self.best_domain = winner_domain - - if witness.solved: - print(f"\n✓ WISDOM SOLVED in round {round_idx+1}") - break - - # Update state for next round - current_domain = winner_domain - current_witness = witness - - print(f"\n{'─'*60}") - print(f"WISDOM RESULT: CE={self.best_ce:.5f} | " - f"Domain={self.best_domain.name if self.best_domain else 'None'}") - print(f"{'─'*60}") - return self.best_binding, self.best_ce, self.history + print(f" CE={witness.ce:.5f} | Solved={witness.solved} | Topology={witness.topology}") - def _initial_domain(self, problem: ProblemSpace) -> AbstractDomain: - """Pick the library domain with the lowest critic score as starting point.""" - best = min( - self.library, - key=lambda d: self._critic_score(problem, d)[0]) - return best + self.history.append(DebateRound(round_idx, winner_domain, psl_text, witness.ce, witness.solved, witness, action)) + if witness.ce < self.best_ce: self.best_ce = witness.ce; self.best_binding = dict(witness.binding); self.best_domain = winner_domain + if witness.solved: print(f"\n✓ WISDOM SOLVED in round {round_idx+1}"); break + current_domain = winner_domain; current_witness = witness -# ══════════════════════════════════════════════════════════════════════════ -# SECTION 7: NUMERICAL ENGINE (3.11 core, condensed) -# ══════════════════════════════════════════════════════════════════════════ -# [Full 3.11 numerical engine retained here] -# For brevity the following shows the integration interface. -# In deployment paste the complete Section 2-8 of 3.11 above this line. - -SOLVE_THRESHOLD = 0.05 -TOTAL_BUDGET = 600 -MIN_WIDTH = 1e-6 -CONTRACT_EPS = 1e-9 -TRANSFER_SLACK_COUP= 0.04 -TRANSFER_SLACK_HUB = 0.02 -HUB_BUDGET_MULT = 2.0 -HUB_RESIDUAL_MULT = 2.0 -CONSENSUS_MAX_ITER = 8 -CONSENSUS_PATIENCE = 3 -NEAR_SOLVED_CE = 0.25 -HUB_DEGREE_RATIO = 0.6 -PERTURB_BASE_SCALE = 0.05 -CYCLE_WINDOW = 3 -MIN_HYP_FRAC = 0.15 -HC4_VOLUME_FLOOR = 0.20 -L9_CONTRIB_THRESHOLD = 1e-8 -N_PERTURB_RESTARTS = 4 -STRUCT_PROBE_COUNT = 5 -MERGE_WEIGHT_EXP = 2.0 -MIN_SWEEP_BUDGET = 40 -PROJECTION_CACHE: Dict[str, Dict] = {} - -TIER_EASY="easy"; TIER_HARD="hard" -TIER_MANIFOLD="manifold"; TIER_DECEPTIVE="deceptive" - -# [Paste complete IV class, compile_iv, PSL parser, Problem, -# Oracles, _hc4, _solve_region, _algebraic_snap, -# solve_ae, solve_consensus from 3.11 here] -# -# The integration point is _run_wisdom_trial() below which -# calls parse_psl → expand_psl → Problem → solve_consensus. + print(f"\n{'─'*60}\nWISDOM RESULT: CE={self.best_ce:.5f} | Domain={self.best_domain.name if self.best_domain else 'None'}\n{'─'*60}") + return self.best_binding, self.best_ce, self.history # ══════════════════════════════════════════════════════════════════════════ -# SECTION 8: SOLVER BRIDGE (PSL text → WitnessReport) +# SECTION 10: ENGINE BRIDGE & FASTAPI SERVER # ══════════════════════════════════════════════════════════════════════════ - -def make_solver_fn(budget: int = TOTAL_BUDGET): - """ - Returns a callable that: - 1. Parses PSL text into a Problem - 2. Runs solve_consensus - 3. Returns a WitnessReport - - This is the bridge between the Wisdom Layer and the Numerical Engine. - """ - def solver_fn(psl_text: str, - hint_variables: List[str]) -> WitnessReport: +def make_solver_fn_v2(budget: int = 600): + def solver_fn(psl_text: str, hint_variables: List[str]) -> WitnessReport: try: - # Parse PSL - from io import StringIO - prog = parse_psl(psl_text) - ep = expand_psl(prog) - - # If PSL produced no variables, fall back to hint variables + prog = parse_psl(psl_text); ep = expand_psl(prog) if not ep.variables: - # Build a minimal problem from hints - constraints = [] - bounds = {v: (-5.0, 5.0) for v in hint_variables} - prob = Problem( - pid="wisdom_fallback", - variables=hint_variables, - constraints=constraints, - bounds=bounds, - tier=TIER_HARD) - else: - prob = Problem( - pid="wisdom_trial", - variables=ep.variables, - constraints=ep.constraints, - bounds=ep.bounds, - tier=TIER_HARD, - scope_groups=ep.scope_groups, - scope_vars=ep.scope_vars, - scope_order=ep.scope_order) - - # Run consensus solver - result = solve_consensus(prob, budget=budget) - - # Build witness - active_scopes = [s for s in prob.scope_order - if s != "root" - and len(prob.scope_vars.get(s, [])) > 0] - l7 = TOPOLOGY_ORACLE.evaluate(prob, active_scopes) - l9 = RESIDUAL_ORACLE.evaluate( - result["binding"], prob, l7.hub_vars) + return WitnessReport(ce=50.0, solved=False, dominant_exprs=["NO_VARS_COMPILED"], hub_vars=[], topology="none", binding={v: 0.0 for v in hint_variables}) + prob = Problem(pid="wisdom_trial", variables=ep.variables, constraints=ep.constraints, bounds=ep.bounds, tier="hard", scope_groups=ep.scope_groups, scope_vars=ep.scope_vars, scope_order=ep.scope_order) + result = solve_by_hypothesis(prob, budget=budget) + + l7, l9 = result.get("l7"), result.get("l9") return WitnessReport( - ce=result["ce"], - solved=result["solved"], - dominant_exprs=l9.dominant_exprs, - hub_vars=l7.hub_vars, - topology=l7.topology_sig, - binding=result["binding"]) - + ce=result["ce"], solved=result["solved"], + dominant_exprs=l9.dominant_exprs if l9 else [], hub_vars=l7.hub_vars if l7 else [], + topology=l7.topology_sig if l7 else "none", binding=result["binding"]) except Exception as e: - print(f"[SOLVER BRIDGE ERROR] {e}") - return WitnessReport( - ce=999.0, solved=False, - dominant_exprs=[], hub_vars=[], - topology="none", binding={}) - + print(f"[BRIDGE ERROR] {type(e).__name__}: {e}") + return WitnessReport(ce=50.0 if "parse" in str(e).lower() else 20.0, solved=False, dominant_exprs=[f"ERROR:{type(e).__name__}"], hub_vars=[], topology="none", binding={v: 0.0 for v in hint_variables}) return solver_fn -# ══════════════════════════════════════════════════════════════════════════ -# SECTION 9: ARENA & DASHBOARD -# ══════════════════════════════════════════════════════════════════════════ - -# Pre-defined ProblemSpaces — LLM would generate these in production WISDOM_PROBLEMS = [ - ProblemSpace( - description="Coupled bilinear chain with global sum", - axioms={Axiom.DISCRETE, Axiom.ORDERED, Axiom.BILINEAR, - Axiom.CONSERVED, Axiom.NETWORK}, - variables=[f"x{i}" for i in range(1, 11)], - scope_hints=["chain"]), - - ProblemSpace( - description="Quantum norm manifold with phase coupling", - axioms={Axiom.CONTINUOUS, Axiom.QUADRATIC, Axiom.CONSERVED, - Axiom.METRIC, Axiom.SYMMETRIC}, - variables=[f"re_{i}" for i in range(5)] - + [f"im_{i}" for i in range(5)]), - - ProblemSpace( - description="Fully coupled sphere with pairwise sums", - axioms={Axiom.CONTINUOUS, Axiom.CONSERVED, Axiom.SYMMETRIC, - Axiom.QUADRATIC, Axiom.BILINEAR}, - variables=[f"z{i}" for i in range(10)]), - - ProblemSpace( - description="Phase-jump collider with OR constraints", - axioms={Axiom.DISCRETE, Axiom.MUTABLE, Axiom.SYMMETRIC, - Axiom.NETWORK, Axiom.CONSERVED}, - variables=["p1_x", "p2_x", "p1_y", "p2_y"]), + ProblemSpace(description="Coupled bilinear chain with global sum", axioms={Axiom.DISCRETE, Axiom.ORDERED, Axiom.BILINEAR, Axiom.CONSERVED, Axiom.NETWORK}, variables=[f"x{i}" for i in range(1, 11)]), + ProblemSpace(description="Quantum norm manifold with phase coupling", axioms={Axiom.CONTINUOUS, Axiom.QUADRATIC, Axiom.CONSERVED, Axiom.METRIC, Axiom.SYMMETRIC}, variables=[f"re_{i}" for i in range(5)] + [f"im_{i}" for i in range(5)]), + ProblemSpace(description="Fully coupled sphere with pairwise sums", axioms={Axiom.CONTINUOUS, Axiom.CONSERVED, Axiom.SYMMETRIC, Axiom.QUADRATIC, Axiom.BILINEAR}, variables=[f"z{i}" for i in range(10)]), + ProblemSpace(description="Phase-jump collider with OR constraints", axioms={Axiom.DISCRETE, Axiom.MUTABLE, Axiom.SYMMETRIC, Axiom.NETWORK, Axiom.CONSERVED}, variables=["p1_x", "p2_x", "p1_y", "p2_y"]), ] STATE_LOCK = threading.Lock() -WISDOM_DATA: Dict = { - "runs": 0, - "solved": 0, - "total_ce": 0.0, - "total_time": 0.0, - "history": [], # last N DebateRound summaries - "by_problem": defaultdict(lambda: { - "runs": 0, "solved": 0, "ce": 0.0, - "best_domain": "—", "rounds_to_solve": []}) -} - +WISDOM_DATA: Dict = {"runs": 0, "solved": 0, "total_ce": 0.0, "total_time": 0.0, "history": [], "by_problem": defaultdict(lambda: {"runs": 0, "solved": 0, "ce": 0.0, "best_domain": "—", "rounds_to_solve": []})} POOL = ThreadPoolExecutor(max_workers=2) DEBATE_ENGINE = WisdomDebate() -SOLVER_FN = None # set after numerical engine is imported +SOLVER_FN = None def _run_wisdom_trial(problem: ProblemSpace): global SOLVER_FN - if SOLVER_FN is None: - SOLVER_FN = make_solver_fn() - + if SOLVER_FN is None: SOLVER_FN = make_solver_fn_v2() t0 = time.time() try: binding, ce, rounds = DEBATE_ENGINE.debate(problem, SOLVER_FN) - elapsed = round(time.time() - t0, 2) - solved = ce < SOLVE_THRESHOLD - + elapsed = round(time.time() - t0, 2); solved = ce < SOLVE_THRESHOLD with STATE_LOCK: - WISDOM_DATA["runs"] += 1 - WISDOM_DATA["solved"] += int(solved) - WISDOM_DATA["total_ce"] += ce - WISDOM_DATA["total_time"] += elapsed - + WISDOM_DATA["runs"] += 1; WISDOM_DATA["solved"] += int(solved); WISDOM_DATA["total_ce"] += ce; WISDOM_DATA["total_time"] += elapsed ps = WISDOM_DATA["by_problem"][problem.description[:30]] - ps["runs"] += 1 - ps["solved"] += int(solved) - ps["ce"] += ce - if rounds: - ps["best_domain"] = rounds[-1].domain.name - if solved: - ps["rounds_to_solve"].append(len(rounds)) - - for r in rounds: - WISDOM_DATA["history"].append({ - "problem": problem.description[:25], - "round": r.round_idx + 1, - "domain": r.domain.name[:20], - "action": r.action[:20], - "ce": round(r.ce, 5), - "solved": r.solved, - "derivation": r.domain.derivation[:15], - }) + ps["runs"] += 1; ps["solved"] += int(solved); ps["ce"] += ce + if rounds: ps["best_domain"] = rounds[-1].domain.name + if solved: ps["rounds_to_solve"].append(len(rounds)) + for r in rounds: WISDOM_DATA["history"].append({"problem": problem.description[:25], "round": r.round_idx + 1, "domain": r.domain.name[:20], "action": r.action[:20], "ce": round(r.ce, 5), "solved": r.solved, "derivation": r.domain.derivation[:15]}) WISDOM_DATA["history"] = WISDOM_DATA["history"][-20:] - - except Exception as e: - print(f"[WISDOM TRIAL ERROR] {problem.description}: {e}") + except Exception as e: print(f"[WISDOM TRIAL ERROR] {problem.description}: {e}") async def wisdom_loop(): loop = asyncio.get_event_loop() while True: - problem = random.choice(WISDOM_PROBLEMS) - await loop.run_in_executor(POOL, _run_wisdom_trial, problem) + await loop.run_in_executor(POOL, _run_wisdom_trial, random.choice(WISDOM_PROBLEMS)) await asyncio.sleep(0.5) @asynccontextmanager @@ -789,129 +1282,39 @@ async def lifespan(app: FastAPI): yield POOL.shutdown(wait=False) -app = FastAPI(title="Practicality System 4.0", lifespan=lifespan) +app = FastAPI(title="Practicality System 10.0", lifespan=lifespan) @app.post("/wisdom/submit") async def submit_problem(request: Request): - """ - LLM posts a ProblemSpace JSON here. - Expected: {description, axioms: [...], variables: [...]} - """ try: data = await request.json() - ps = ProblemSpace( - description=data.get("description", "unnamed"), - axioms=set(data.get("axioms", [])), - variables=data.get("variables", [])) - loop = asyncio.get_event_loop() - loop.run_in_executor(POOL, _run_wisdom_trial, ps) + ps = ProblemSpace(description=data.get("description", "unnamed"), axioms=set(data.get("axioms", [])), variables=data.get("variables", [])) + asyncio.get_event_loop().run_in_executor(POOL, _run_wisdom_trial, ps) return JSONResponse({"ok": True, "queued": ps.description}) - except Exception as e: - return JSONResponse({"error": str(e)}, status_code=400) + except Exception as e: return JSONResponse({"error": str(e)}, status_code=400) @app.get("/", response_class=HTMLResponse) async def dashboard(): - with STATE_LOCK: - d = dict(WISDOM_DATA) - by_prob = dict(d["by_problem"]) - history = list(d["history"]) - - runs = max(1, d["runs"]) - solve_rate = round(d["solved"] / runs * 100, 1) - avg_ce = round(d["total_ce"] / runs, 5) - avg_time = round(d["total_time"] / runs, 2) - - def badge(label, val, col): - return (f"{label}: {val}") + with STATE_LOCK: d = dict(WISDOM_DATA); by_prob = dict(d["by_problem"]); history = list(d["history"]) + runs = max(1, d["runs"]); solve_rate = round(d["solved"] / runs * 100, 1); avg_ce = round(d["total_ce"] / runs, 5); avg_time = round(d["total_time"] / runs, 2) + def badge(label, val, col): return f"{label}: {val}" prob_rows = "" for desc, ps in by_prob.items(): - n = max(1, ps["runs"]) - sr = round(ps["solved"] / n * 100, 1) - ace = round(ps["ce"] / n, 4) - avg_r = (round(sum(ps["rounds_to_solve"]) / len(ps["rounds_to_solve"]), 1) - if ps["rounds_to_solve"] else "—") - prob_rows += ( - f"" - f"{desc}" - f"{sr}%" - f"{ace}" - f"{ps['best_domain']}" - f"{avg_r}" - f"{ps['runs']}" - f"") + n = max(1, ps["runs"]); sr = round(ps["solved"] / n * 100, 1); ace = round(ps["ce"] / n, 4); avg_r = (round(sum(ps["rounds_to_solve"]) / len(ps["rounds_to_solve"]), 1) if ps["rounds_to_solve"] else "—") + prob_rows += f"{desc}{sr}%{ace}{ps['best_domain']}{avg_r}{ps['runs']}" col_map = {True: "#4CAF50", False: "#FF5252"} - debate_rows = "" - for h in reversed(history): - sc = col_map.get(h["solved"], "#888") - debate_rows += ( - f"" - f"{h['problem']}" - f"{h['round']}" - f"{h['domain']}" - f"{h['derivation']}" - f"{h['action']}" - f"{h['ce']}" - f"" - f"{'✓' if h['solved'] else '·'}" - f"") - - html = f""" -Practicality System 4.0 - - -

⚗ Practicality System 4.0 — Wisdom Layer

-
- Neuro-Symbolic: Axiom debate selects mathematical domain → - PSL compiler instantiates constraints → - Numerical engine verifies → Witness feeds back → Domain mutates. -
- -
- {badge("Trials", runs, "#aaa")} - {badge("Solve Rate", f"{solve_rate}%", "#26C6DA")} - {badge("Avg CE", avg_ce, "#26C6DA")} - {badge("Avg Time", f"{avg_time}s", "#aaa")} - {badge("Library Size", len(DEBATE_ENGINE.library), "#FF9800")} -
- -

Per-Problem Wisdom Results

- - - - {prob_rows or - ""} -
ProblemSolve%Avg CEBest DomainAvg RoundsRuns
Warming up…
- -

Live Debate Log

- - - - {debate_rows or - ""} -
ProblemRoundDomainDerivationActionCE
Warming up…
- -

Domain Library ({len(DEBATE_ENGINE.library)} entries)

- - - {"".join(f"" - f"" - for d in DEBATE_ENGINE.library)} -
DomainAxioms
{d.name}" - f"{', '.join(sorted(d.axioms))}
+ debate_rows = "".join(f"{h['problem']}{h['round']}{h['domain']}{h['derivation']}{h['action']}{h['ce']}{'✓' if h['solved'] else '·'}" for h in reversed(history)) + + return f"""System 10.0 +

⚗ Practicality System 10.0 — Neuro-Symbolic Hypothesis Stack

+
Wisdom Layer (Axiom Deduction) → Isomorphism Compiler (PSL 2.0) → Algebraic Engine (AST Projections) → Local Polish
+
{badge("Trials", runs, "#aaa")}{badge("Solve Rate", f"{solve_rate}%", "#26C6DA")}{badge("Avg CE", avg_ce, "#26C6DA")}{badge("Avg Time", f"{avg_time}s", "#aaa")}{badge("Library Size", len(DEBATE_ENGINE.library), "#FF9800")}
+

Per-Problem Wisdom Results

{prob_rows or ""}
ProblemSolve%Avg CEBest DomainAvg RoundsRuns
Warming up…
+

Live Debate Log

{debate_rows or ""}
ProblemRoundDomainDerivationActionCE
Warming up…
+

Domain Library ({len(DEBATE_ENGINE.library)} entries)

{"".join(f"" for d in DEBATE_ENGINE.library)}
DomainAxioms
{d.name}{', '.join(sorted(d.axioms))}
""" - return html if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860, log_level="warning") \ No newline at end of file