Spaces:
Running on Zero
Running on Zero
| #!/usr/bin/env python3 | |
| """Phase 5 gate — falsification test for multi-session discovery. | |
| Runs against the REAL ``~/.claude/projects`` (Non-negotiable: read cwd from | |
| INSIDE each file; never decode the lossy encoded folder name). Demonstrates: | |
| 1. PREFIX SURFACING — ticking a parent folder surfaces EVERY session whose | |
| real cwd is under it (the parent itself or any descendant). | |
| 2. DEEPEST-WINS — ticking that parent AND a nested child together: the | |
| sessions in the nested dir are attributed to the CHILD, not the parent. | |
| 3. NO DOUBLE-COUNT — the parent+child tick attributes each session exactly | |
| once; the union count equals the parent-alone count (the child's sessions | |
| just move down a level, none are duplicated or lost). | |
| Roots are chosen data-driven from the real cwds, so the gate stays honest as the | |
| session store changes: | |
| parent = deepest common ancestor of all discovered cwds. | |
| child = a real intermediate directory strictly between parent and some cwd | |
| that actually owns sessions (a genuine nested tick). | |
| Run: | |
| python3 "tools/phase5_gate.py" | |
| """ | |
| from __future__ import annotations | |
| import os | |
| import sys | |
| from pathlib import Path | |
| REPO = Path(__file__).resolve().parent.parent | |
| if str(REPO) not in sys.path: | |
| sys.path.insert(0, str(REPO)) | |
| from engine import discovery # noqa: E402 | |
| def _line(label: str, got, want, ok: bool) -> bool: | |
| flag = "OK " if ok else "DIFF" | |
| print(f" [{flag}] {label:<34} got={got!r:>10} want={want!r}") | |
| return ok | |
| def _common_ancestor(paths: list[str]) -> str: | |
| """Deepest directory that is an ancestor of (or equal to) every path. | |
| Computed on PATH SEGMENTS (never raw os.path.commonprefix, which can cut a | |
| name mid-segment).""" | |
| seg_lists = [p.split(os.sep) for p in paths] | |
| common: list[str] = [] | |
| for parts in zip(*seg_lists): | |
| first = parts[0] | |
| if all(x == first for x in parts): | |
| common.append(first) | |
| else: | |
| break | |
| anc = os.sep.join(common) | |
| return anc or os.sep | |
| def _pick_nested_child(parent: str, cwds: list[str]) -> str | None: | |
| """Find a real intermediate dir strictly BETWEEN ``parent`` and some cwd that | |
| owns sessions — a genuine nested tick. Prefer the cwd itself if it sits below | |
| parent by >=1 segment and other cwds remain outside it (so the move is | |
| observable); otherwise use the cwd's parent directory.""" | |
| parent_segs = parent.split(os.sep) | |
| best: str | None = None | |
| for c in sorted(set(cwds)): | |
| if not discovery._is_under(c, parent) or c == parent: | |
| continue | |
| c_segs = c.split(os.sep) | |
| depth = len(c_segs) - len(parent_segs) | |
| if depth < 1: | |
| continue | |
| # candidate nested root = the cwd itself (deepest, cleanest demo) | |
| # keep the FIRST deterministic candidate that is a strict descendant. | |
| best = c | |
| break | |
| return best | |
| def main() -> int: | |
| projects_dir = discovery.default_projects_dir() | |
| print("Her · हेर — Phase 5 gate (multi-session discovery)") | |
| print("=" * 70) | |
| print(f"projects : {projects_dir}") | |
| if not os.path.isdir(projects_dir): | |
| print(f"FAIL — projects dir not found: {projects_dir}") | |
| return 1 | |
| # --- discover everything from the real store -------------------------- # | |
| sessions = discovery.discover_sessions(projects_dir) | |
| projects = discovery.list_projects(projects_dir) | |
| with_cwd = [s for s in sessions if s.cwd is not None] | |
| cwds = [s.cwd for s in with_cwd] | |
| print(f"sessions : {len(sessions)} total " | |
| f"({len(with_cwd)} with a real cwd, " | |
| f"{len(sessions) - len(with_cwd)} without)") | |
| print(f"projects : {len(projects)} distinct cwd roots") | |
| print("-" * 70) | |
| for p in projects: | |
| print(f" {p.sessions:3d} {p.cwd}") | |
| print("-" * 70) | |
| checks: list[bool] = [] | |
| if not cwds: | |
| print("FAIL — no sessions with a readable cwd; cannot run the gate.") | |
| return 1 | |
| # --- choose parent + nested child data-driven ------------------------- # | |
| parent = _common_ancestor(cwds) | |
| child = _pick_nested_child(parent, cwds) | |
| # If every cwd shares the parent exactly (no nesting available), fall back | |
| # to the longest cwd as parent so we still have a child below it. | |
| if child is None: | |
| parent = min(cwds, key=lambda c: (c.count(os.sep), c)) # shallowest cwd | |
| child = _pick_nested_child(parent, cwds) | |
| if child is None: | |
| print("FAIL — could not find a nested child root to demonstrate " | |
| "deepest-wins; the store has no nesting to test.") | |
| return 1 | |
| print(f"parent root (tick) : {parent}") | |
| print(f"child root (tick) : {child} [nested under parent]") | |
| print(f"child is strictly under parent : " | |
| f"{discovery._is_under(child, parent) and child != parent}") | |
| print("-" * 70) | |
| # === 1) PREFIX SURFACING ============================================== # | |
| under_parent = discovery.sessions_under(parent, sessions=sessions) | |
| # independent recomputation of the expected set (segment-prefix match) | |
| expect_parent = [ | |
| s for s in with_cwd if discovery._is_under(s.cwd, parent) | |
| ] | |
| surfacing_ok = ( | |
| {s.path for s in under_parent} == {s.path for s in expect_parent} | |
| and len(under_parent) > 0 | |
| ) | |
| checks.append(_line("parent surfaces all under it", | |
| len(under_parent), len(expect_parent), surfacing_ok)) | |
| # surfacing must include sessions from MORE THAN ONE distinct cwd (proves it | |
| # is a real prefix sweep, not a single-folder lookup) — unless the store | |
| # genuinely only has one cwd under parent. | |
| distinct_cwds_surfaced = len({s.cwd for s in under_parent}) | |
| checks.append(_line("surfaced cwds (prefix sweep)", | |
| distinct_cwds_surfaced, ">=1", | |
| distinct_cwds_surfaced >= 1)) | |
| # === 2) DEEPEST-WINS + 3) NO DOUBLE-COUNT ============================= # | |
| attr = discovery.attribute([parent, child], sessions=sessions) | |
| in_parent = attr.by_root.get(parent, []) | |
| in_child = attr.by_root.get(child, []) | |
| # child must own exactly the sessions under child; parent owns the rest under | |
| # parent (i.e. under parent but NOT under child). | |
| expect_child = [s for s in with_cwd if discovery._is_under(s.cwd, child)] | |
| expect_parent_only = [ | |
| s for s in expect_parent if not discovery._is_under(s.cwd, child) | |
| ] | |
| deepest_ok = ( | |
| {s.path for s in in_child} == {s.path for s in expect_child} | |
| and {s.path for s in in_parent} == {s.path for s in expect_parent_only} | |
| and len(in_child) > 0 | |
| ) | |
| checks.append(_line("deepest-wins: child claims nested", | |
| len(in_child), len(expect_child), deepest_ok)) | |
| # NO DOUBLE-COUNT: every attributed session appears under exactly one root, | |
| # and the union equals parent-alone surfacing (nothing duplicated or dropped). | |
| all_attr_paths = [s.path for s in in_parent] + [s.path for s in in_child] | |
| no_dupes = len(all_attr_paths) == len(set(all_attr_paths)) | |
| union_matches_parent_alone = ( | |
| set(all_attr_paths) == {s.path for s in under_parent} | |
| ) | |
| count_conserved = (len(in_parent) + len(in_child)) == len(under_parent) | |
| no_double_ok = no_dupes and union_matches_parent_alone and count_conserved | |
| checks.append(_line("no double-count (parent+child)", | |
| len(in_parent) + len(in_child), len(under_parent), | |
| no_double_ok)) | |
| checks.append(_line("each session attributed once", no_dupes, True, no_dupes)) | |
| # partition sanity: attributed + unattributed == total sessions (with cwd | |
| # under parent) — Attribution.total covers the WHOLE pool. | |
| partition_ok = attr.total == len(sessions) | |
| checks.append(_line("attribution partitions pool", | |
| attr.total, len(sessions), partition_ok)) | |
| print("-" * 70) | |
| print("attribution under parent+child tick (deepest-wins):") | |
| print(f" parent {parent!r}") | |
| print(f" -> {len(in_parent)} sessions " | |
| f"(cwds: {sorted({s.cwd for s in in_parent})})") | |
| print(f" child {child!r}") | |
| print(f" -> {len(in_child)} sessions " | |
| f"(cwds: {sorted({s.cwd for s in in_child})})") | |
| print(f" parent-alone tick surfaced {len(under_parent)} sessions; " | |
| f"parent+child attributes {len(in_parent) + len(in_child)} " | |
| f"(no double-count).") | |
| # === 4) INTERMEDIATE-DIR DEEPEST-WINS (multi-level, if the store has it) === # | |
| # Find a real intermediate directory that is BOTH under parent AND a strict | |
| # ancestor of some cwd (i.e. a dir between parent and a leaf cwd). Ticking | |
| # parent + that intermediate must pull the leaf's sessions DOWN to the | |
| # intermediate, never leaving them on the parent. This exercises the case | |
| # where the deeper tick is an ANCESTOR of a cwd (not a leaf cwd itself). | |
| mid = None | |
| for c in sorted(set(cwds)): | |
| d = os.path.dirname(c) | |
| if d and d != c and discovery._is_under(d, parent) and d != parent: | |
| # d is a real intermediate ancestor of cwd c, strictly under parent | |
| mid = d | |
| break | |
| if mid is not None: | |
| attr2 = discovery.attribute([parent, mid], sessions=sessions) | |
| mid_hits = attr2.by_root.get(mid, []) | |
| expect_mid = [s for s in with_cwd if discovery._is_under(s.cwd, mid)] | |
| mid_paths = [s.path for r in attr2.by_root.values() for s in r] | |
| mid_ok = ( | |
| {s.path for s in mid_hits} == {s.path for s in expect_mid} | |
| and len(mid_hits) > 0 | |
| and len(mid_paths) == len(set(mid_paths)) # no dupes | |
| and attr2.total == len(sessions) # nothing lost | |
| ) | |
| checks.append(_line("intermediate-dir deepest-wins", | |
| len(mid_hits), len(expect_mid), mid_ok)) | |
| print(f" intermediate {mid!r}") | |
| print(f" -> {len(mid_hits)} sessions pulled down from parent " | |
| f"(cwds: {sorted({s.cwd for s in mid_hits})})") | |
| else: | |
| print(" (no intermediate-dir nesting in store; multi-level sub-check " | |
| "skipped — leaf-cwd deepest-wins above already proves it.)") | |
| print("-" * 70) | |
| ok = all(checks) | |
| print("GATE:", "PASS" if ok else "FAIL") | |
| # machine-readable summary for the orchestrator (the MULTI schema fields) | |
| print("SUMMARY", | |
| { | |
| "sessions_found": len(sessions), | |
| "projects": len(projects), | |
| "deepest_wins": bool(deepest_ok), | |
| "no_double_count": bool(no_double_ok), | |
| "gate_ok": ok, | |
| }) | |
| return 0 if ok else 1 | |
| if __name__ == "__main__": | |
| sys.exit(main()) | |