File size: 7,616 Bytes
6d1438c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
"""Agent graph-growth: ``create_node`` action + relaxed agent-path caps.

Covers the validator (new action + same-batch edge resolution + cap threading),
the reducer (model_inferred node/claim + immediate visibility), and the
``apply_agent_actions`` density path (a multi-node turn beyond the legacy 4-action
cap actually lands).
"""

from __future__ import annotations

import time
import uuid

from loosecanvas import turn_logic
from loosecanvas.contracts import (
    Node,
    SceneAction,
    SceneActionType,
    ScenePlan,
    SceneState,
    SourceSnapshot,
)
from loosecanvas.graph_repository import LooseGraphRepository
from loosecanvas.reducer import reduce_turn
from loosecanvas.validator import validate_sceneplan

# ── Builders ──────────────────────────────────────────────────────────────────


def _node(node_id: str) -> Node:
    return Node(id=node_id, kind="concept", label=node_id)


def _graph(*node_ids: str) -> LooseGraphRepository:
    repo = LooseGraphRepository()
    repo.upsert_nodes([_node(nid) for nid in node_ids])
    return repo


def _scene(
    *, visible_nodes: list[str] | None = None, fogged: list[str] | None = None
) -> SceneState:
    return SceneState(
        visible_node_ids=visible_nodes or [], fogged_node_ids=fogged or []
    )


def _create_node(node_id: str, label: str, note: str = "") -> SceneAction:
    return SceneAction(
        type=SceneActionType.create_node, target_id=node_id, label=label, note=note
    )


# ── Validator: create_node ──────────────────────────────────────────────────


def test_create_node_valid() -> None:
    graph = _graph("a")
    scene = _scene(visible_nodes=["a"])
    plan = ScenePlan(actions=[_create_node("concept::woody", "Woody Guthrie")])
    result = validate_sceneplan(plan, graph, scene)
    assert len(result.valid_actions) == 1
    assert result.rejected_actions == []


def test_create_node_missing_label_rejected() -> None:
    graph = _graph("a")
    scene = _scene(visible_nodes=["a"])
    plan = ScenePlan(actions=[_create_node("concept::x", "")])
    result = validate_sceneplan(plan, graph, scene)
    assert len(result.rejected_actions) == 1
    assert "missing_required_field" in result.rejection_reasons[0]


def test_create_node_existing_id_rejected() -> None:
    graph = _graph("a")
    scene = _scene(visible_nodes=["a"])
    plan = ScenePlan(actions=[_create_node("a", "Clobber")])
    result = validate_sceneplan(plan, graph, scene)
    assert len(result.rejected_actions) == 1
    assert "id_exists" in result.rejection_reasons[0]


def test_create_edge_resolves_node_created_earlier_in_same_plan() -> None:
    """A create_edge may reference a node create_node'd earlier in the same batch."""
    graph = _graph("dylan")
    scene = _scene(visible_nodes=["dylan"])
    plan = ScenePlan(
        actions=[
            _create_node("concept::woody", "Woody Guthrie"),
            SceneAction(
                type=SceneActionType.create_edge,
                source_id="concept::woody",
                target_id="dylan",
                label="influenced",
            ),
        ]
    )
    result = validate_sceneplan(plan, graph, scene)
    assert len(result.valid_actions) == 2
    assert result.rejected_actions == []


# ── Validator: cap threading ─────────────────────────────────────────────────


def test_default_action_cap_still_four() -> None:
    graph = _graph("a")
    scene = _scene(visible_nodes=["a"])
    plan = ScenePlan(
        actions=[_create_node(f"concept::n{i}", f"N{i}") for i in range(6)]
    )
    result = validate_sceneplan(plan, graph, scene)
    # Legacy default unchanged: only the first 4 survive.
    assert len(result.valid_actions) == 4
    assert all("action_cap_exceeded" in r for r in result.rejection_reasons)


def test_raised_action_cap_admits_more() -> None:
    graph = _graph("a")
    scene = _scene(visible_nodes=["a"])
    plan = ScenePlan(
        actions=[_create_node(f"concept::n{i}", f"N{i}") for i in range(6)]
    )
    result = validate_sceneplan(plan, graph, scene, max_actions=10)
    assert len(result.valid_actions) == 6
    assert result.rejected_actions == []


# ── Reducer: trust + visibility ──────────────────────────────────────────────


def test_create_node_emits_model_inferred_node_and_claim() -> None:
    graph = _graph("a")
    scene = _scene(visible_nodes=["a"])
    action = _create_node("concept::woody", "Woody Guthrie", note="Folk forefather")
    gp, sp, _events = reduce_turn([action], graph, scene)

    node = next(n for n in gp.upsert_nodes if n.id == "concept::woody")
    assert node.kind == "concept"
    assert node.label == "Woody Guthrie"
    assert node.summary == "Folk forefather"
    assert node.properties.get("origin") == "model_inferred"

    claim = next(c for c in gp.upsert_claims if c.target_id == "concept::woody")
    assert claim.origin == "model_inferred"
    assert claim.support_state == "unverified"
    assert claim.review_state == "pending"

    # The new node is immediately visible so the agent can wire edges to it.
    assert "concept::woody" in sp.add_visible_node_ids


def test_create_node_then_edge_reveals_edge_same_batch() -> None:
    graph = _graph("dylan")
    scene = _scene(visible_nodes=["dylan"])
    actions = [
        _create_node("concept::woody", "Woody Guthrie"),
        SceneAction(
            type=SceneActionType.create_edge,
            source_id="concept::woody",
            target_id="dylan",
            label="influenced",
        ),
    ]
    gp, sp, _events = reduce_turn(actions, graph, scene)
    assert "concept::woody" in sp.add_visible_node_ids
    new_edge = gp.upsert_edges[0]
    assert new_edge.id in sp.add_visible_edge_ids


# ── apply_agent_actions: density beyond the legacy cap ───────────────────────


def _make_session() -> str:
    repo = LooseGraphRepository.from_extraction(
        [Node(id="dylan", kind="concept", label="Bob Dylan")], [], [], SourceSnapshot()
    )
    scene = SceneState(scene_id="s1", visible_node_ids=["dylan"])
    sid = str(uuid.uuid4())
    now = time.time()
    turn_logic.session_store[sid] = turn_logic.SessionRecord(
        session_id=sid,
        graph=repo,
        scene=scene,
        history=[],
        graph_version=1,
        created_at=now,
        last_seen=now,
        render_scene_id="s1",
    )
    return sid


def test_apply_agent_actions_lands_dense_turn() -> None:
    sid = _make_session()
    record = turn_logic.session_store[sid]
    actions: list[SceneAction] = []
    for i in range(8):
        actions.append(_create_node(f"concept::n{i}", f"Concept {i}"))
        actions.append(
            SceneAction(
                type=SceneActionType.create_edge,
                source_id=f"concept::n{i}",
                target_id="dylan",
                label="relates to",
            )
        )
    turn_logic.apply_agent_actions(record, actions, {}, "building a dense web")
    # All 8 new concept nodes landed (legacy 4-action cap would have dropped most).
    for i in range(8):
        assert record.graph.get_node(f"concept::n{i}") is not None
    del turn_logic.session_store[sid]