|
|
from __future__ import annotations |
|
|
|
|
|
from dataclasses import dataclass |
|
|
from typing import Any, Dict, List, Optional, Set, Tuple, Callable |
|
|
|
|
|
from patch_proof import ( |
|
|
Patch, |
|
|
apply_patch_to_counterexample, |
|
|
check_assumption_satisfied, |
|
|
recheck_formula_with_model_search, |
|
|
) |
|
|
|
|
|
Edge = Tuple[int, int] |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class PatchCandidate: |
|
|
patch: Patch |
|
|
patch_size: int |
|
|
assumption_satisfied: bool |
|
|
still_counterexample: Optional[bool] |
|
|
note: str |
|
|
|
|
|
|
|
|
def _edges_of_worlds(worlds: int) -> List[Edge]: |
|
|
"""世界数Nに対して取りうる全辺候補。MVPは全組み合わせ。""" |
|
|
return [(i, j) for i in range(worlds) for j in range(worlds)] |
|
|
|
|
|
|
|
|
def _patch_size(p: Patch) -> int: |
|
|
return len(p.add_edges) + len(p.remove_edges) |
|
|
|
|
|
|
|
|
def _merge_patch(a: Patch, b: Patch) -> Patch: |
|
|
|
|
|
add = set(a.add_edges) | set(b.add_edges) |
|
|
rem = set(a.remove_edges) | set(b.remove_edges) |
|
|
|
|
|
add = add - rem |
|
|
return Patch(add_edges=sorted(add), remove_edges=sorted(rem)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def candidate_patches_for_assumption(worlds: int, assumption: str, max_extra: int = 2) -> List[Patch]: |
|
|
""" |
|
|
「assumption を満たす方向」のパッチ候補を生成する。 |
|
|
max_extra: “最小っぽい候補”を少しだけ増やすための緩さ。 |
|
|
""" |
|
|
all_edges = _edges_of_worlds(worlds) |
|
|
|
|
|
if assumption == "assume:reflexive": |
|
|
|
|
|
base = Patch(add_edges=[(i, i) for i in range(worlds)], remove_edges=[]) |
|
|
|
|
|
return [base] |
|
|
|
|
|
if assumption == "assume:symmetric": |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return [Patch(add_edges=[], remove_edges=[])] |
|
|
|
|
|
if assumption == "assume:transitive": |
|
|
|
|
|
|
|
|
return [Patch(add_edges=[], remove_edges=[])] |
|
|
|
|
|
if assumption == "assume:euclidean": |
|
|
|
|
|
return [Patch(add_edges=[], remove_edges=[])] |
|
|
|
|
|
return [] |
|
|
|
|
|
|
|
|
def patch_to_satisfy_assumption_minimal( |
|
|
cex: Dict[str, Any], |
|
|
assumption: str, |
|
|
limit_k: int = 3, |
|
|
) -> List[Patch]: |
|
|
""" |
|
|
現反例モデル cex を、assumption を満たすように最小変更で修正する候補を返す。 |
|
|
MVP: |
|
|
- reflexive: 必要な self-loop を全部追加(定義上それが最小) |
|
|
- symmetric/transitive/euclidean: “不足している辺だけ add” を greedily 作る |
|
|
""" |
|
|
worlds = int(cex.get("n_worlds") or cex.get("worlds") or 0) |
|
|
|
|
|
raw_edges = cex.get("edges") or [] |
|
|
edges = set() |
|
|
for e in raw_edges: |
|
|
if len(e) >= 2: |
|
|
edges.add((int(e[0]), int(e[1]))) |
|
|
|
|
|
if assumption == "assume:reflexive": |
|
|
add = [(i, i) for i in range(worlds) if (i, i) not in edges] |
|
|
return [Patch(add_edges=add, remove_edges=[])] |
|
|
|
|
|
if assumption == "assume:symmetric": |
|
|
add: List[Edge] = [] |
|
|
for (a, b) in list(edges): |
|
|
if (b, a) not in edges: |
|
|
add.append((b, a)) |
|
|
|
|
|
if len(add) > limit_k: |
|
|
add = add[:limit_k] |
|
|
return [Patch(add_edges=add, remove_edges=[])] |
|
|
|
|
|
if assumption == "assume:transitive": |
|
|
|
|
|
missing: List[Edge] = [] |
|
|
for (x, y) in edges: |
|
|
for (y2, z) in edges: |
|
|
if y2 == y and (x, z) not in edges: |
|
|
missing.append((x, z)) |
|
|
|
|
|
missing = list(dict.fromkeys(missing)) |
|
|
if not missing: |
|
|
return [Patch(add_edges=[], remove_edges=[])] |
|
|
|
|
|
out = [Patch(add_edges=[e], remove_edges=[]) for e in missing[:limit_k]] |
|
|
out.append(Patch(add_edges=missing[:limit_k], remove_edges=[])) |
|
|
return out |
|
|
|
|
|
if assumption == "assume:euclidean": |
|
|
missing: List[Edge] = [] |
|
|
for (a, b) in edges: |
|
|
for (a2, c) in edges: |
|
|
if a2 == a and (b, c) not in edges: |
|
|
missing.append((b, c)) |
|
|
missing = list(dict.fromkeys(missing)) |
|
|
if not missing: |
|
|
return [Patch(add_edges=[], remove_edges=[])] |
|
|
out = [Patch(add_edges=[e], remove_edges=[]) for e in missing[:limit_k]] |
|
|
out.append(Patch(add_edges=missing[:limit_k], remove_edges=[])) |
|
|
return out |
|
|
|
|
|
return [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def find_minimal_patches( |
|
|
model_checker_fn: Callable[[str, Dict[str, Any]], bool], |
|
|
formula: str, |
|
|
base_assumptions: List[str], |
|
|
cex: Dict[str, Any], |
|
|
goal_assumption: str, |
|
|
limit_k: int = 3, |
|
|
max_results: int = 5, |
|
|
) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
goal_assumption を満たしつつ、同じ反例モデルがもう反例でなくなるパッチを列挙。 |
|
|
limit_k: transitive/euclidean の欠けedge候補の探索幅 |
|
|
""" |
|
|
|
|
|
patches = patch_to_satisfy_assumption_minimal(cex, goal_assumption, limit_k=limit_k) |
|
|
|
|
|
results: List[PatchCandidate] = [] |
|
|
|
|
|
for p in patches: |
|
|
patched = apply_patch_to_counterexample(cex, p) |
|
|
satisfied = check_assumption_satisfied(patched, goal_assumption) |
|
|
|
|
|
recheck = recheck_formula_with_model_search( |
|
|
model_checker_fn=model_checker_fn, |
|
|
formula=formula, |
|
|
assumptions=(base_assumptions + [goal_assumption]), |
|
|
patched_cex=patched, |
|
|
) |
|
|
|
|
|
results.append(PatchCandidate( |
|
|
patch=p, |
|
|
patch_size=_patch_size(p), |
|
|
assumption_satisfied=satisfied, |
|
|
still_counterexample=recheck.get("still_counterexample"), |
|
|
note=recheck.get("note", ""), |
|
|
)) |
|
|
|
|
|
|
|
|
|
|
|
def key(pc: PatchCandidate): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if not pc.assumption_satisfied: |
|
|
rank = 2 |
|
|
elif pc.still_counterexample is False: |
|
|
rank = 0 |
|
|
else: |
|
|
rank = 1 |
|
|
return (rank, pc.patch_size) |
|
|
|
|
|
results.sort(key=key) |
|
|
|
|
|
out: List[Dict[str, Any]] = [] |
|
|
for pc in results[:max_results]: |
|
|
out.append({ |
|
|
"goal": f"make_assumption_true: {goal_assumption}", |
|
|
"patch": { |
|
|
"add_edges": [list(e) for e in pc.patch.add_edges], |
|
|
"remove_edges": [list(e) for e in pc.patch.remove_edges], |
|
|
}, |
|
|
"patch_size": pc.patch_size, |
|
|
"assumption_satisfied": pc.assumption_satisfied, |
|
|
"still_counterexample": pc.still_counterexample, |
|
|
"note": pc.note, |
|
|
}) |
|
|
return out |