verantyx-logic-math / patch_search.py
kofdai's picture
Initial upload of Verantyx Logic Engine (v1.0)
29b87da verified
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Set, Tuple, Callable
from patch_proof import (
Patch,
apply_patch_to_counterexample,
check_assumption_satisfied,
recheck_formula_with_model_search,
)
Edge = Tuple[int, int]
@dataclass
class PatchCandidate:
patch: Patch
patch_size: int
assumption_satisfied: bool
still_counterexample: Optional[bool]
note: str
def _edges_of_worlds(worlds: int) -> List[Edge]:
"""世界数Nに対して取りうる全辺候補。MVPは全組み合わせ。"""
return [(i, j) for i in range(worlds) for j in range(worlds)]
def _patch_size(p: Patch) -> int:
return len(p.add_edges) + len(p.remove_edges)
def _merge_patch(a: Patch, b: Patch) -> Patch:
# setで正規化して重複除去(順序は最後に整列)
add = set(a.add_edges) | set(b.add_edges)
rem = set(a.remove_edges) | set(b.remove_edges)
# 同じ辺を add と remove の両方に入れない(remove優先)
add = add - rem
return Patch(add_edges=sorted(add), remove_edges=sorted(rem))
# -----------------------------
# Assumption -> Patch generator
# -----------------------------
def candidate_patches_for_assumption(worlds: int, assumption: str, max_extra: int = 2) -> List[Patch]:
"""
「assumption を満たす方向」のパッチ候補を生成する。
max_extra: “最小っぽい候補”を少しだけ増やすための緩さ。
"""
all_edges = _edges_of_worlds(worlds)
if assumption == "assume:reflexive":
# 必須:全 i に (i,i)
base = Patch(add_edges=[(i, i) for i in range(worlds)], remove_edges=[])
# worlds=1ならadd 1本。worlds>1のときは多いが、reflexiveの定義上必要。
return [base]
if assumption == "assume:symmetric":
# ある辺 (a,b) があれば (b,a) が必要
# “追加だけで”対称性を満たす候補:全辺を対称化する
# MVP: この1個を返す(removeを混ぜる最適化はPhase 13で)
# engine側で「不足分だけ add」へ絞るので、生成は空でもOK
return [Patch(add_edges=[], remove_edges=[])] # engine側で不足分生成
if assumption == "assume:transitive":
# 推移性は “欠けている (x,z) を追加する” で満たせる
# ただし欠けている辺は現モデル依存なので、ここでも空を返して engine側で生成
return [Patch(add_edges=[], remove_edges=[])]
if assumption == "assume:euclidean":
# ユークリッド性も現モデル依存
return [Patch(add_edges=[], remove_edges=[])]
return []
def patch_to_satisfy_assumption_minimal(
cex: Dict[str, Any],
assumption: str,
limit_k: int = 3,
) -> List[Patch]:
"""
現反例モデル cex を、assumption を満たすように最小変更で修正する候補を返す。
MVP:
- reflexive: 必要な self-loop を全部追加(定義上それが最小)
- symmetric/transitive/euclidean: “不足している辺だけ add” を greedily 作る
"""
worlds = int(cex.get("n_worlds") or cex.get("worlds") or 0)
# Ensure edges are tuples
raw_edges = cex.get("edges") or []
edges = set()
for e in raw_edges:
if len(e) >= 2:
edges.add((int(e[0]), int(e[1])))
if assumption == "assume:reflexive":
add = [(i, i) for i in range(worlds) if (i, i) not in edges]
return [Patch(add_edges=add, remove_edges=[])]
if assumption == "assume:symmetric":
add: List[Edge] = []
for (a, b) in list(edges):
if (b, a) not in edges:
add.append((b, a))
# 追加数が大きすぎたら切る(MVP)
if len(add) > limit_k:
add = add[:limit_k]
return [Patch(add_edges=add, remove_edges=[])]
if assumption == "assume:transitive":
# 欠けてる (x,z) を列挙して少数返す
missing: List[Edge] = []
for (x, y) in edges:
for (y2, z) in edges:
if y2 == y and (x, z) not in edges:
missing.append((x, z))
# できるだけ少なく
missing = list(dict.fromkeys(missing)) # preserve order unique
if not missing:
return [Patch(add_edges=[], remove_edges=[])]
# 最小っぽく 1本ずつ試す + まとめて試す
out = [Patch(add_edges=[e], remove_edges=[]) for e in missing[:limit_k]]
out.append(Patch(add_edges=missing[:limit_k], remove_edges=[]))
return out
if assumption == "assume:euclidean":
missing: List[Edge] = []
for (a, b) in edges:
for (a2, c) in edges:
if a2 == a and (b, c) not in edges:
missing.append((b, c))
missing = list(dict.fromkeys(missing))
if not missing:
return [Patch(add_edges=[], remove_edges=[])]
out = [Patch(add_edges=[e], remove_edges=[]) for e in missing[:limit_k]]
out.append(Patch(add_edges=missing[:limit_k], remove_edges=[]))
return out
return []
# -----------------------------
# Main search API
# -----------------------------
def find_minimal_patches(
model_checker_fn: Callable[[str, Dict[str, Any]], bool],
formula: str,
base_assumptions: List[str],
cex: Dict[str, Any],
goal_assumption: str,
limit_k: int = 3,
max_results: int = 5,
) -> List[Dict[str, Any]]:
"""
goal_assumption を満たしつつ、同じ反例モデルがもう反例でなくなるパッチを列挙。
limit_k: transitive/euclidean の欠けedge候補の探索幅
"""
# 1) goal_assumption を満たす候補 patch を作る(最小寄り)
patches = patch_to_satisfy_assumption_minimal(cex, goal_assumption, limit_k=limit_k)
results: List[PatchCandidate] = []
for p in patches:
patched = apply_patch_to_counterexample(cex, p)
satisfied = check_assumption_satisfied(patched, goal_assumption)
recheck = recheck_formula_with_model_search(
model_checker_fn=model_checker_fn,
formula=formula,
assumptions=(base_assumptions + [goal_assumption]),
patched_cex=patched,
)
results.append(PatchCandidate(
patch=p,
patch_size=_patch_size(p),
assumption_satisfied=satisfied,
still_counterexample=recheck.get("still_counterexample"),
note=recheck.get("note", ""),
))
# 2) ソート:小さいほど良い、反例が潰れているほど良い
# Prioritize: Satisfied assumptions AND destroyed counterexample (still_cex=False)
def key(pc: PatchCandidate):
# Rank:
# 0: Satisfied + Not Still CEX (Best)
# 1: Satisfied + Still CEX (Intermediate)
# 2: Not Satisfied (Worst)
if not pc.assumption_satisfied:
rank = 2
elif pc.still_counterexample is False:
rank = 0
else:
rank = 1
return (rank, pc.patch_size)
results.sort(key=key)
out: List[Dict[str, Any]] = []
for pc in results[:max_results]:
out.append({
"goal": f"make_assumption_true: {goal_assumption}",
"patch": {
"add_edges": [list(e) for e in pc.patch.add_edges],
"remove_edges": [list(e) for e in pc.patch.remove_edges],
},
"patch_size": pc.patch_size,
"assumption_satisfied": pc.assumption_satisfied,
"still_counterexample": pc.still_counterexample,
"note": pc.note,
})
return out