File size: 4,544 Bytes
7952f32
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
"""Constraint checker dispatch.

Each constraint kind has a small ``_check_*`` function. ``check`` routes by
isinstance and ``evaluate_all`` reports which constraints from a list are
satisfied or not.

Behavioral / materialization constraints (currently just ``materializes``)
delegate to the materializer and validator subsystems.
"""

from __future__ import annotations

from dataclasses import dataclass, field

from graphforge.constraints.schema import (
    AcyclicImports,
    Constraint,
    EdgeExists,
    Materializes,
    ModuleCount,
    ModuleResponsibility,
    ModuleSizeMax,
    NodeAbsent,
    NodeExists,
    STRUCTURAL_KINDS,
)
from graphforge.graph.schema import Graph


@dataclass
class SatisfactionReport:
    satisfied: list[Constraint] = field(default_factory=list)
    unsatisfied: list[Constraint] = field(default_factory=list)

    @property
    def total(self) -> int:
        return len(self.satisfied) + len(self.unsatisfied)

    @property
    def all_satisfied(self) -> bool:
        return self.total > 0 and not self.unsatisfied

    def split_by_family(self) -> tuple["SatisfactionReport", "SatisfactionReport"]:
        """Split into (structural, behavioral) sub-reports.

        Useful for the reward engine, which scores the two families with
        different magnitudes per PROPOSAL.md §5.2.
        """
        sr = SatisfactionReport()
        br = SatisfactionReport()
        for c in self.satisfied:
            (sr if c.kind in STRUCTURAL_KINDS else br).satisfied.append(c)
        for c in self.unsatisfied:
            (sr if c.kind in STRUCTURAL_KINDS else br).unsatisfied.append(c)
        return sr, br

    def to_dict(self) -> dict[str, object]:
        return {
            "satisfied": [c.model_dump() for c in self.satisfied],
            "unsatisfied": [c.model_dump() for c in self.unsatisfied],
            "total": self.total,
            "all_satisfied": self.all_satisfied,
        }


# ---- per-kind checkers ----------------------------------------------


def _check_node_exists(g: Graph, c: NodeExists) -> bool:
    return g.find_node(c.name, c.module) is not None


def _check_node_absent(g: Graph, c: NodeAbsent) -> bool:
    return g.find_node(c.name, c.module) is None


def _check_edge_exists(g: Graph, c: EdgeExists) -> bool:
    return g.find_edge(c.caller, c.callee) is not None


def _check_module_count(g: Graph, c: ModuleCount) -> bool:
    return len(g.modules) == c.n


def _check_module_size_max(g: Graph, c: ModuleSizeMax) -> bool:
    return len(g.nodes_in_module(c.module)) <= c.n


def _check_module_responsibility(g: Graph, c: ModuleResponsibility) -> bool:
    m = g.find_module(c.module)
    return m is not None and m.responsibility == c.responsibility


def _check_acyclic_imports(g: Graph, _c: AcyclicImports) -> bool:
    return not g.has_module_cycle()


def _check_materializes(g: Graph, _c: Materializes) -> bool:
    # Imported lazily so that callers who don't use this checker don't pay
    # the cost of pulling the materializer/validator graph.
    from graphforge.materializer import materialize
    from graphforge.validator import full_check

    try:
        files = materialize(g)
    except Exception:
        return False
    return full_check(files).ok


# ---- dispatch --------------------------------------------------------


def check(graph: Graph, constraint: Constraint) -> bool:
    if isinstance(constraint, NodeExists):
        return _check_node_exists(graph, constraint)
    if isinstance(constraint, NodeAbsent):
        return _check_node_absent(graph, constraint)
    if isinstance(constraint, EdgeExists):
        return _check_edge_exists(graph, constraint)
    if isinstance(constraint, ModuleCount):
        return _check_module_count(graph, constraint)
    if isinstance(constraint, ModuleSizeMax):
        return _check_module_size_max(graph, constraint)
    if isinstance(constraint, ModuleResponsibility):
        return _check_module_responsibility(graph, constraint)
    if isinstance(constraint, AcyclicImports):
        return _check_acyclic_imports(graph, constraint)
    if isinstance(constraint, Materializes):
        return _check_materializes(graph, constraint)
    raise ValueError(f"unknown constraint kind: {constraint!r}")


def evaluate_all(graph: Graph, constraints: list[Constraint]) -> SatisfactionReport:
    rep = SatisfactionReport()
    for c in constraints:
        if check(graph, c):
            rep.satisfied.append(c)
        else:
            rep.unsatisfied.append(c)
    return rep