Spaces:
Running on Zero
Running on Zero
File size: 10,816 Bytes
5f43c7d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 | #!/usr/bin/env python3
"""Phase 5 gate — falsification test for multi-session discovery.
Runs against the REAL ``~/.claude/projects`` (Non-negotiable: read cwd from
INSIDE each file; never decode the lossy encoded folder name). Demonstrates:
1. PREFIX SURFACING — ticking a parent folder surfaces EVERY session whose
real cwd is under it (the parent itself or any descendant).
2. DEEPEST-WINS — ticking that parent AND a nested child together: the
sessions in the nested dir are attributed to the CHILD, not the parent.
3. NO DOUBLE-COUNT — the parent+child tick attributes each session exactly
once; the union count equals the parent-alone count (the child's sessions
just move down a level, none are duplicated or lost).
Roots are chosen data-driven from the real cwds, so the gate stays honest as the
session store changes:
parent = deepest common ancestor of all discovered cwds.
child = a real intermediate directory strictly between parent and some cwd
that actually owns sessions (a genuine nested tick).
Run:
python3 "tools/phase5_gate.py"
"""
from __future__ import annotations
import os
import sys
from pathlib import Path
REPO = Path(__file__).resolve().parent.parent
if str(REPO) not in sys.path:
sys.path.insert(0, str(REPO))
from engine import discovery # noqa: E402
def _line(label: str, got, want, ok: bool) -> bool:
flag = "OK " if ok else "DIFF"
print(f" [{flag}] {label:<34} got={got!r:>10} want={want!r}")
return ok
def _common_ancestor(paths: list[str]) -> str:
"""Deepest directory that is an ancestor of (or equal to) every path.
Computed on PATH SEGMENTS (never raw os.path.commonprefix, which can cut a
name mid-segment)."""
seg_lists = [p.split(os.sep) for p in paths]
common: list[str] = []
for parts in zip(*seg_lists):
first = parts[0]
if all(x == first for x in parts):
common.append(first)
else:
break
anc = os.sep.join(common)
return anc or os.sep
def _pick_nested_child(parent: str, cwds: list[str]) -> str | None:
"""Find a real intermediate dir strictly BETWEEN ``parent`` and some cwd that
owns sessions — a genuine nested tick. Prefer the cwd itself if it sits below
parent by >=1 segment and other cwds remain outside it (so the move is
observable); otherwise use the cwd's parent directory."""
parent_segs = parent.split(os.sep)
best: str | None = None
for c in sorted(set(cwds)):
if not discovery._is_under(c, parent) or c == parent:
continue
c_segs = c.split(os.sep)
depth = len(c_segs) - len(parent_segs)
if depth < 1:
continue
# candidate nested root = the cwd itself (deepest, cleanest demo)
# keep the FIRST deterministic candidate that is a strict descendant.
best = c
break
return best
def main() -> int:
projects_dir = discovery.default_projects_dir()
print("Her · हेर — Phase 5 gate (multi-session discovery)")
print("=" * 70)
print(f"projects : {projects_dir}")
if not os.path.isdir(projects_dir):
print(f"FAIL — projects dir not found: {projects_dir}")
return 1
# --- discover everything from the real store -------------------------- #
sessions = discovery.discover_sessions(projects_dir)
projects = discovery.list_projects(projects_dir)
with_cwd = [s for s in sessions if s.cwd is not None]
cwds = [s.cwd for s in with_cwd]
print(f"sessions : {len(sessions)} total "
f"({len(with_cwd)} with a real cwd, "
f"{len(sessions) - len(with_cwd)} without)")
print(f"projects : {len(projects)} distinct cwd roots")
print("-" * 70)
for p in projects:
print(f" {p.sessions:3d} {p.cwd}")
print("-" * 70)
checks: list[bool] = []
if not cwds:
print("FAIL — no sessions with a readable cwd; cannot run the gate.")
return 1
# --- choose parent + nested child data-driven ------------------------- #
parent = _common_ancestor(cwds)
child = _pick_nested_child(parent, cwds)
# If every cwd shares the parent exactly (no nesting available), fall back
# to the longest cwd as parent so we still have a child below it.
if child is None:
parent = min(cwds, key=lambda c: (c.count(os.sep), c)) # shallowest cwd
child = _pick_nested_child(parent, cwds)
if child is None:
print("FAIL — could not find a nested child root to demonstrate "
"deepest-wins; the store has no nesting to test.")
return 1
print(f"parent root (tick) : {parent}")
print(f"child root (tick) : {child} [nested under parent]")
print(f"child is strictly under parent : "
f"{discovery._is_under(child, parent) and child != parent}")
print("-" * 70)
# === 1) PREFIX SURFACING ============================================== #
under_parent = discovery.sessions_under(parent, sessions=sessions)
# independent recomputation of the expected set (segment-prefix match)
expect_parent = [
s for s in with_cwd if discovery._is_under(s.cwd, parent)
]
surfacing_ok = (
{s.path for s in under_parent} == {s.path for s in expect_parent}
and len(under_parent) > 0
)
checks.append(_line("parent surfaces all under it",
len(under_parent), len(expect_parent), surfacing_ok))
# surfacing must include sessions from MORE THAN ONE distinct cwd (proves it
# is a real prefix sweep, not a single-folder lookup) — unless the store
# genuinely only has one cwd under parent.
distinct_cwds_surfaced = len({s.cwd for s in under_parent})
checks.append(_line("surfaced cwds (prefix sweep)",
distinct_cwds_surfaced, ">=1",
distinct_cwds_surfaced >= 1))
# === 2) DEEPEST-WINS + 3) NO DOUBLE-COUNT ============================= #
attr = discovery.attribute([parent, child], sessions=sessions)
in_parent = attr.by_root.get(parent, [])
in_child = attr.by_root.get(child, [])
# child must own exactly the sessions under child; parent owns the rest under
# parent (i.e. under parent but NOT under child).
expect_child = [s for s in with_cwd if discovery._is_under(s.cwd, child)]
expect_parent_only = [
s for s in expect_parent if not discovery._is_under(s.cwd, child)
]
deepest_ok = (
{s.path for s in in_child} == {s.path for s in expect_child}
and {s.path for s in in_parent} == {s.path for s in expect_parent_only}
and len(in_child) > 0
)
checks.append(_line("deepest-wins: child claims nested",
len(in_child), len(expect_child), deepest_ok))
# NO DOUBLE-COUNT: every attributed session appears under exactly one root,
# and the union equals parent-alone surfacing (nothing duplicated or dropped).
all_attr_paths = [s.path for s in in_parent] + [s.path for s in in_child]
no_dupes = len(all_attr_paths) == len(set(all_attr_paths))
union_matches_parent_alone = (
set(all_attr_paths) == {s.path for s in under_parent}
)
count_conserved = (len(in_parent) + len(in_child)) == len(under_parent)
no_double_ok = no_dupes and union_matches_parent_alone and count_conserved
checks.append(_line("no double-count (parent+child)",
len(in_parent) + len(in_child), len(under_parent),
no_double_ok))
checks.append(_line("each session attributed once", no_dupes, True, no_dupes))
# partition sanity: attributed + unattributed == total sessions (with cwd
# under parent) — Attribution.total covers the WHOLE pool.
partition_ok = attr.total == len(sessions)
checks.append(_line("attribution partitions pool",
attr.total, len(sessions), partition_ok))
print("-" * 70)
print("attribution under parent+child tick (deepest-wins):")
print(f" parent {parent!r}")
print(f" -> {len(in_parent)} sessions "
f"(cwds: {sorted({s.cwd for s in in_parent})})")
print(f" child {child!r}")
print(f" -> {len(in_child)} sessions "
f"(cwds: {sorted({s.cwd for s in in_child})})")
print(f" parent-alone tick surfaced {len(under_parent)} sessions; "
f"parent+child attributes {len(in_parent) + len(in_child)} "
f"(no double-count).")
# === 4) INTERMEDIATE-DIR DEEPEST-WINS (multi-level, if the store has it) === #
# Find a real intermediate directory that is BOTH under parent AND a strict
# ancestor of some cwd (i.e. a dir between parent and a leaf cwd). Ticking
# parent + that intermediate must pull the leaf's sessions DOWN to the
# intermediate, never leaving them on the parent. This exercises the case
# where the deeper tick is an ANCESTOR of a cwd (not a leaf cwd itself).
mid = None
for c in sorted(set(cwds)):
d = os.path.dirname(c)
if d and d != c and discovery._is_under(d, parent) and d != parent:
# d is a real intermediate ancestor of cwd c, strictly under parent
mid = d
break
if mid is not None:
attr2 = discovery.attribute([parent, mid], sessions=sessions)
mid_hits = attr2.by_root.get(mid, [])
expect_mid = [s for s in with_cwd if discovery._is_under(s.cwd, mid)]
mid_paths = [s.path for r in attr2.by_root.values() for s in r]
mid_ok = (
{s.path for s in mid_hits} == {s.path for s in expect_mid}
and len(mid_hits) > 0
and len(mid_paths) == len(set(mid_paths)) # no dupes
and attr2.total == len(sessions) # nothing lost
)
checks.append(_line("intermediate-dir deepest-wins",
len(mid_hits), len(expect_mid), mid_ok))
print(f" intermediate {mid!r}")
print(f" -> {len(mid_hits)} sessions pulled down from parent "
f"(cwds: {sorted({s.cwd for s in mid_hits})})")
else:
print(" (no intermediate-dir nesting in store; multi-level sub-check "
"skipped — leaf-cwd deepest-wins above already proves it.)")
print("-" * 70)
ok = all(checks)
print("GATE:", "PASS" if ok else "FAIL")
# machine-readable summary for the orchestrator (the MULTI schema fields)
print("SUMMARY",
{
"sessions_found": len(sessions),
"projects": len(projects),
"deepest_wins": bool(deepest_ok),
"no_double_count": bool(no_double_ok),
"gate_ok": ok,
})
return 0 if ok else 1
if __name__ == "__main__":
sys.exit(main())
|