Spaces:
Sleeping
Sleeping
File size: 4,544 Bytes
7952f32 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 | """Constraint checker dispatch.
Each constraint kind has a small ``_check_*`` function. ``check`` routes by
isinstance and ``evaluate_all`` reports which constraints from a list are
satisfied or not.
Behavioral / materialization constraints (currently just ``materializes``)
delegate to the materializer and validator subsystems.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from graphforge.constraints.schema import (
AcyclicImports,
Constraint,
EdgeExists,
Materializes,
ModuleCount,
ModuleResponsibility,
ModuleSizeMax,
NodeAbsent,
NodeExists,
STRUCTURAL_KINDS,
)
from graphforge.graph.schema import Graph
@dataclass
class SatisfactionReport:
satisfied: list[Constraint] = field(default_factory=list)
unsatisfied: list[Constraint] = field(default_factory=list)
@property
def total(self) -> int:
return len(self.satisfied) + len(self.unsatisfied)
@property
def all_satisfied(self) -> bool:
return self.total > 0 and not self.unsatisfied
def split_by_family(self) -> tuple["SatisfactionReport", "SatisfactionReport"]:
"""Split into (structural, behavioral) sub-reports.
Useful for the reward engine, which scores the two families with
different magnitudes per PROPOSAL.md §5.2.
"""
sr = SatisfactionReport()
br = SatisfactionReport()
for c in self.satisfied:
(sr if c.kind in STRUCTURAL_KINDS else br).satisfied.append(c)
for c in self.unsatisfied:
(sr if c.kind in STRUCTURAL_KINDS else br).unsatisfied.append(c)
return sr, br
def to_dict(self) -> dict[str, object]:
return {
"satisfied": [c.model_dump() for c in self.satisfied],
"unsatisfied": [c.model_dump() for c in self.unsatisfied],
"total": self.total,
"all_satisfied": self.all_satisfied,
}
# ---- per-kind checkers ----------------------------------------------
def _check_node_exists(g: Graph, c: NodeExists) -> bool:
return g.find_node(c.name, c.module) is not None
def _check_node_absent(g: Graph, c: NodeAbsent) -> bool:
return g.find_node(c.name, c.module) is None
def _check_edge_exists(g: Graph, c: EdgeExists) -> bool:
return g.find_edge(c.caller, c.callee) is not None
def _check_module_count(g: Graph, c: ModuleCount) -> bool:
return len(g.modules) == c.n
def _check_module_size_max(g: Graph, c: ModuleSizeMax) -> bool:
return len(g.nodes_in_module(c.module)) <= c.n
def _check_module_responsibility(g: Graph, c: ModuleResponsibility) -> bool:
m = g.find_module(c.module)
return m is not None and m.responsibility == c.responsibility
def _check_acyclic_imports(g: Graph, _c: AcyclicImports) -> bool:
return not g.has_module_cycle()
def _check_materializes(g: Graph, _c: Materializes) -> bool:
# Imported lazily so that callers who don't use this checker don't pay
# the cost of pulling the materializer/validator graph.
from graphforge.materializer import materialize
from graphforge.validator import full_check
try:
files = materialize(g)
except Exception:
return False
return full_check(files).ok
# ---- dispatch --------------------------------------------------------
def check(graph: Graph, constraint: Constraint) -> bool:
if isinstance(constraint, NodeExists):
return _check_node_exists(graph, constraint)
if isinstance(constraint, NodeAbsent):
return _check_node_absent(graph, constraint)
if isinstance(constraint, EdgeExists):
return _check_edge_exists(graph, constraint)
if isinstance(constraint, ModuleCount):
return _check_module_count(graph, constraint)
if isinstance(constraint, ModuleSizeMax):
return _check_module_size_max(graph, constraint)
if isinstance(constraint, ModuleResponsibility):
return _check_module_responsibility(graph, constraint)
if isinstance(constraint, AcyclicImports):
return _check_acyclic_imports(graph, constraint)
if isinstance(constraint, Materializes):
return _check_materializes(graph, constraint)
raise ValueError(f"unknown constraint kind: {constraint!r}")
def evaluate_all(graph: Graph, constraints: list[Constraint]) -> SatisfactionReport:
rep = SatisfactionReport()
for c in constraints:
if check(graph, c):
rep.satisfied.append(c)
else:
rep.unsatisfied.append(c)
return rep
|