File size: 10,816 Bytes
5f43c7d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
#!/usr/bin/env python3
"""Phase 5 gate — falsification test for multi-session discovery.

Runs against the REAL ``~/.claude/projects`` (Non-negotiable: read cwd from
INSIDE each file; never decode the lossy encoded folder name). Demonstrates:

  1. PREFIX SURFACING  — ticking a parent folder surfaces EVERY session whose
     real cwd is under it (the parent itself or any descendant).
  2. DEEPEST-WINS       — ticking that parent AND a nested child together: the
     sessions in the nested dir are attributed to the CHILD, not the parent.
  3. NO DOUBLE-COUNT    — the parent+child tick attributes each session exactly
     once; the union count equals the parent-alone count (the child's sessions
     just move down a level, none are duplicated or lost).

Roots are chosen data-driven from the real cwds, so the gate stays honest as the
session store changes:
  parent = deepest common ancestor of all discovered cwds.
  child  = a real intermediate directory strictly between parent and some cwd
           that actually owns sessions (a genuine nested tick).

Run:
  python3 "tools/phase5_gate.py"
"""
from __future__ import annotations

import os
import sys
from pathlib import Path

REPO = Path(__file__).resolve().parent.parent
if str(REPO) not in sys.path:
    sys.path.insert(0, str(REPO))

from engine import discovery  # noqa: E402


def _line(label: str, got, want, ok: bool) -> bool:
    flag = "OK  " if ok else "DIFF"
    print(f"  [{flag}] {label:<34} got={got!r:>10}  want={want!r}")
    return ok


def _common_ancestor(paths: list[str]) -> str:
    """Deepest directory that is an ancestor of (or equal to) every path.

    Computed on PATH SEGMENTS (never raw os.path.commonprefix, which can cut a
    name mid-segment)."""
    seg_lists = [p.split(os.sep) for p in paths]
    common: list[str] = []
    for parts in zip(*seg_lists):
        first = parts[0]
        if all(x == first for x in parts):
            common.append(first)
        else:
            break
    anc = os.sep.join(common)
    return anc or os.sep


def _pick_nested_child(parent: str, cwds: list[str]) -> str | None:
    """Find a real intermediate dir strictly BETWEEN ``parent`` and some cwd that
    owns sessions — a genuine nested tick. Prefer the cwd itself if it sits below
    parent by >=1 segment and other cwds remain outside it (so the move is
    observable); otherwise use the cwd's parent directory."""
    parent_segs = parent.split(os.sep)
    best: str | None = None
    for c in sorted(set(cwds)):
        if not discovery._is_under(c, parent) or c == parent:
            continue
        c_segs = c.split(os.sep)
        depth = len(c_segs) - len(parent_segs)
        if depth < 1:
            continue
        # candidate nested root = the cwd itself (deepest, cleanest demo)
        # keep the FIRST deterministic candidate that is a strict descendant.
        best = c
        break
    return best


def main() -> int:
    projects_dir = discovery.default_projects_dir()
    print("Her · हेर — Phase 5 gate (multi-session discovery)")
    print("=" * 70)
    print(f"projects : {projects_dir}")

    if not os.path.isdir(projects_dir):
        print(f"FAIL — projects dir not found: {projects_dir}")
        return 1

    # --- discover everything from the real store -------------------------- #
    sessions = discovery.discover_sessions(projects_dir)
    projects = discovery.list_projects(projects_dir)
    with_cwd = [s for s in sessions if s.cwd is not None]
    cwds = [s.cwd for s in with_cwd]

    print(f"sessions : {len(sessions)} total "
          f"({len(with_cwd)} with a real cwd, "
          f"{len(sessions) - len(with_cwd)} without)")
    print(f"projects : {len(projects)} distinct cwd roots")
    print("-" * 70)
    for p in projects:
        print(f"    {p.sessions:3d}  {p.cwd}")
    print("-" * 70)

    checks: list[bool] = []

    if not cwds:
        print("FAIL — no sessions with a readable cwd; cannot run the gate.")
        return 1

    # --- choose parent + nested child data-driven ------------------------- #
    parent = _common_ancestor(cwds)
    child = _pick_nested_child(parent, cwds)

    # If every cwd shares the parent exactly (no nesting available), fall back
    # to the longest cwd as parent so we still have a child below it.
    if child is None:
        parent = min(cwds, key=lambda c: (c.count(os.sep), c))  # shallowest cwd
        child = _pick_nested_child(parent, cwds)
    if child is None:
        print("FAIL — could not find a nested child root to demonstrate "
              "deepest-wins; the store has no nesting to test.")
        return 1

    print(f"parent root (tick) : {parent}")
    print(f"child  root (tick) : {child}   [nested under parent]")
    print(f"child is strictly under parent : "
          f"{discovery._is_under(child, parent) and child != parent}")
    print("-" * 70)

    # === 1) PREFIX SURFACING ============================================== #
    under_parent = discovery.sessions_under(parent, sessions=sessions)
    # independent recomputation of the expected set (segment-prefix match)
    expect_parent = [
        s for s in with_cwd if discovery._is_under(s.cwd, parent)
    ]
    surfacing_ok = (
        {s.path for s in under_parent} == {s.path for s in expect_parent}
        and len(under_parent) > 0
    )
    checks.append(_line("parent surfaces all under it",
                        len(under_parent), len(expect_parent), surfacing_ok))

    # surfacing must include sessions from MORE THAN ONE distinct cwd (proves it
    # is a real prefix sweep, not a single-folder lookup) — unless the store
    # genuinely only has one cwd under parent.
    distinct_cwds_surfaced = len({s.cwd for s in under_parent})
    checks.append(_line("surfaced cwds (prefix sweep)",
                        distinct_cwds_surfaced, ">=1",
                        distinct_cwds_surfaced >= 1))

    # === 2) DEEPEST-WINS + 3) NO DOUBLE-COUNT ============================= #
    attr = discovery.attribute([parent, child], sessions=sessions)
    in_parent = attr.by_root.get(parent, [])
    in_child = attr.by_root.get(child, [])

    # child must own exactly the sessions under child; parent owns the rest under
    # parent (i.e. under parent but NOT under child).
    expect_child = [s for s in with_cwd if discovery._is_under(s.cwd, child)]
    expect_parent_only = [
        s for s in expect_parent if not discovery._is_under(s.cwd, child)
    ]

    deepest_ok = (
        {s.path for s in in_child} == {s.path for s in expect_child}
        and {s.path for s in in_parent} == {s.path for s in expect_parent_only}
        and len(in_child) > 0
    )
    checks.append(_line("deepest-wins: child claims nested",
                        len(in_child), len(expect_child), deepest_ok))

    # NO DOUBLE-COUNT: every attributed session appears under exactly one root,
    # and the union equals parent-alone surfacing (nothing duplicated or dropped).
    all_attr_paths = [s.path for s in in_parent] + [s.path for s in in_child]
    no_dupes = len(all_attr_paths) == len(set(all_attr_paths))
    union_matches_parent_alone = (
        set(all_attr_paths) == {s.path for s in under_parent}
    )
    count_conserved = (len(in_parent) + len(in_child)) == len(under_parent)
    no_double_ok = no_dupes and union_matches_parent_alone and count_conserved
    checks.append(_line("no double-count (parent+child)",
                        len(in_parent) + len(in_child), len(under_parent),
                        no_double_ok))
    checks.append(_line("each session attributed once", no_dupes, True, no_dupes))

    # partition sanity: attributed + unattributed == total sessions (with cwd
    # under parent) — Attribution.total covers the WHOLE pool.
    partition_ok = attr.total == len(sessions)
    checks.append(_line("attribution partitions pool",
                        attr.total, len(sessions), partition_ok))

    print("-" * 70)
    print("attribution under parent+child tick (deepest-wins):")
    print(f"    parent {parent!r}")
    print(f"      -> {len(in_parent)} sessions "
          f"(cwds: {sorted({s.cwd for s in in_parent})})")
    print(f"    child  {child!r}")
    print(f"      -> {len(in_child)} sessions "
          f"(cwds: {sorted({s.cwd for s in in_child})})")
    print(f"    parent-alone tick surfaced {len(under_parent)} sessions; "
          f"parent+child attributes {len(in_parent) + len(in_child)} "
          f"(no double-count).")

    # === 4) INTERMEDIATE-DIR DEEPEST-WINS (multi-level, if the store has it) === #
    # Find a real intermediate directory that is BOTH under parent AND a strict
    # ancestor of some cwd (i.e. a dir between parent and a leaf cwd). Ticking
    # parent + that intermediate must pull the leaf's sessions DOWN to the
    # intermediate, never leaving them on the parent. This exercises the case
    # where the deeper tick is an ANCESTOR of a cwd (not a leaf cwd itself).
    mid = None
    for c in sorted(set(cwds)):
        d = os.path.dirname(c)
        if d and d != c and discovery._is_under(d, parent) and d != parent:
            # d is a real intermediate ancestor of cwd c, strictly under parent
            mid = d
            break
    if mid is not None:
        attr2 = discovery.attribute([parent, mid], sessions=sessions)
        mid_hits = attr2.by_root.get(mid, [])
        expect_mid = [s for s in with_cwd if discovery._is_under(s.cwd, mid)]
        mid_paths = [s.path for r in attr2.by_root.values() for s in r]
        mid_ok = (
            {s.path for s in mid_hits} == {s.path for s in expect_mid}
            and len(mid_hits) > 0
            and len(mid_paths) == len(set(mid_paths))            # no dupes
            and attr2.total == len(sessions)                     # nothing lost
        )
        checks.append(_line("intermediate-dir deepest-wins",
                            len(mid_hits), len(expect_mid), mid_ok))
        print(f"    intermediate {mid!r}")
        print(f"      -> {len(mid_hits)} sessions pulled down from parent "
              f"(cwds: {sorted({s.cwd for s in mid_hits})})")
    else:
        print("    (no intermediate-dir nesting in store; multi-level sub-check "
              "skipped — leaf-cwd deepest-wins above already proves it.)")

    print("-" * 70)
    ok = all(checks)
    print("GATE:", "PASS" if ok else "FAIL")

    # machine-readable summary for the orchestrator (the MULTI schema fields)
    print("SUMMARY",
          {
              "sessions_found": len(sessions),
              "projects": len(projects),
              "deepest_wins": bool(deepest_ok),
              "no_double_count": bool(no_double_ok),
              "gate_ok": ok,
          })
    return 0 if ok else 1


if __name__ == "__main__":
    sys.exit(main())