"""discovery.py — Phase-5 multi-session discovery. Deterministic. NO model. Walks ``~/.claude/projects/*/*.jsonl`` and groups sessions by the REAL working directory read from INSIDE each file. The encoded ``~/.claude/projects/`` folder name is lossy (``/``, ``-``, ``_``, ``.`` all collapse to ``-``), so it is NEVER decoded — the cwd is trusted from the events (Non-negotiable #5). This module is pure path/IO bookkeeping over the SAME session-metadata rule the loader uses (the first ``user``/``assistant`` row's ``cwd``). It does not parse turns or run any engine logic, and it touches no model and no network (Non-negotiables #1, #2). It is a sibling of the loaders: a thin discovery layer that hands the rest of the engine a list of session files + their roots. Public API ---------- ``discover_sessions(projects_dir=None)`` -> list[SessionRef] Every session under ``~/.claude/projects`` with its real cwd. ``list_projects(projects_dir=None)`` -> list[ProjectRoot] Distinct cwd roots, each with its session count (the root browser feeds on this). ``sessions_under(root, projects_dir=None, sessions=None)`` -> list[SessionRef] All sessions whose cwd is ``root`` or a descendant of ``root`` (PREFIX match). ``attribute(roots, projects_dir=None, sessions=None)`` -> Attribution DEEPEST-folder-wins assignment of every session to the most specific ticked root. Nested ticks NEVER double-count: a session lands under exactly one root. Determinism: every list returned is sorted (sessions by (cwd, path); roots by cwd) so callers and gates get a stable order. """ from __future__ import annotations import glob import json import os from dataclasses import dataclass, field from typing import Any, Optional # The encoded folder name is lossy — we read cwd from INSIDE the file instead. # These are the only row types the loader trusts for session metadata; we use the # SAME rule (first user/assistant row carrying a cwd) so discovery and the loader # agree on every session's home directory. _CWD_BEARING_TYPES = ("user", "assistant") # --------------------------------------------------------------------------- # # shapes # --------------------------------------------------------------------------- # @dataclass(frozen=True) class SessionRef: """One discovered session file + the real cwd read from inside it. ``cwd`` is None only when no user/assistant row carried one (a malformed or metadata-only file); such sessions are kept but never attributed to a root. """ path: str # absolute path to the .jsonl cwd: Optional[str] # real working dir, trusted from inside the file sessionId: Optional[str] = None encodedDir: Optional[str] = None # the lossy folder name (kept for display only) startedAt: Optional[str] = None # ISO ts of the first user/assistant row (start time) def to_dict(self) -> dict[str, Any]: return { "path": self.path, "cwd": self.cwd, "sessionId": self.sessionId, "encodedDir": self.encodedDir, "startedAt": self.startedAt, } @dataclass(frozen=True) class ProjectRoot: """A distinct cwd root and how many sessions live exactly at it.""" cwd: str sessions: int def to_dict(self) -> dict[str, Any]: return {"cwd": self.cwd, "sessions": self.sessions} @dataclass class Attribution: """Result of a DEEPEST-folder-wins multi-root tick. ``by_root`` maps each ticked root -> the sessions attributed to it (each session appears under at most ONE root). ``unattributed`` holds sessions whose cwd is under no ticked root (or has no cwd). The three counts are a partition: ``attributed + unattributed == total`` always holds. """ by_root: dict[str, list[SessionRef]] = field(default_factory=dict) unattributed: list[SessionRef] = field(default_factory=list) @property def attributed_count(self) -> int: return sum(len(v) for v in self.by_root.values()) @property def unattributed_count(self) -> int: return len(self.unattributed) @property def total(self) -> int: return self.attributed_count + self.unattributed_count def to_dict(self) -> dict[str, Any]: return { "by_root": { r: [s.to_dict() for s in ss] for r, ss in self.by_root.items() }, "unattributed": [s.to_dict() for s in self.unattributed], "attributed_count": self.attributed_count, "unattributed_count": self.unattributed_count, "total": self.total, } # --------------------------------------------------------------------------- # # path helpers — normalize WITHOUT resolving symlinks or touching the FS. # Normalization is purely lexical so a session's cwd (a string written by a past # run) is comparable to a root the user ticks now, even if that dir is gone. # --------------------------------------------------------------------------- # def _norm(path: Optional[str]) -> Optional[str]: """Lexically normalize an absolute-ish path: strip a trailing slash, collapse redundant separators / '.' segments. Does NOT resolve symlinks or '..' against the real FS (paths may no longer exist). Returns None for falsy input.""" if not path or not isinstance(path, str): return None p = os.path.normpath(path) # normpath turns '/' into '/' already; for non-root, drop any trailing sep. if len(p) > 1: p = p.rstrip(os.sep) return p def _is_under(cwd: str, root: str) -> bool: """True iff normalized ``cwd`` is ``root`` itself or a descendant of ``root``. PREFIX match on PATH SEGMENTS — never a raw string ``startswith`` (which would falsely match ``/a/foo`` under ``/a/fo``). Both args must already be _norm'd. """ if cwd == root: return True # ensure a segment boundary: '/a/b' is under '/a' (root + sep) but '/ab' is not prefix = root if root.endswith(os.sep) else root + os.sep return cwd.startswith(prefix) def default_projects_dir() -> str: """``~/.claude/projects`` with ``~`` expanded. The standard Claude Code store.""" return os.path.join(os.path.expanduser("~"), ".claude", "projects") # --------------------------------------------------------------------------- # # the seam: read the REAL cwd from inside one session file # --------------------------------------------------------------------------- # def _read_session_meta( path: str, ) -> tuple[Optional[str], Optional[str], Optional[str]]: """Return (cwd, sessionId, startedAt) read from the FIRST user/assistant row. Mirrors the loader's session-metadata rule exactly. Streams line-by-line and stops at the first qualifying row — we never load the whole file just to find a home directory. ``startedAt`` is that same row's ISO ``timestamp`` (the session start time, so Shripal can tell two sessions apart at a glance); it is free to grab while we are already on the row. Malformed lines are skipped; an unreadable file yields (None, None, None) rather than raising, so one bad session can't break discovery. """ try: with open(path, "r", encoding="utf-8") as fh: for line in fh: line = line.strip() if not line: continue try: r = json.loads(line) except (ValueError, json.JSONDecodeError): continue if not isinstance(r, dict): continue if r.get("type") in _CWD_BEARING_TYPES and r.get("cwd"): return _norm(r.get("cwd")), r.get("sessionId"), r.get("timestamp") except OSError: return None, None, None return None, None, None # --------------------------------------------------------------------------- # # public API # --------------------------------------------------------------------------- # def discover_sessions(projects_dir: Optional[str] = None) -> list[SessionRef]: """Walk ``/*/*.jsonl`` and return a SessionRef per file. The real cwd is read from inside each file; the lossy encoded folder name is carried only for display. Result is sorted deterministically by (cwd, path) with None cwds (if any) sorted last. """ base = _norm(projects_dir) or default_projects_dir() refs: list[SessionRef] = [] # one level of project dirs, then session files — the standard CC layout. for fp in glob.glob(os.path.join(base, "*", "*.jsonl")): fp_abs = os.path.abspath(fp) cwd, sid, started = _read_session_meta(fp_abs) refs.append( SessionRef( path=fp_abs, cwd=cwd, sessionId=sid, encodedDir=os.path.basename(os.path.dirname(fp_abs)), startedAt=started, ) ) # deterministic order; None cwd last refs.sort(key=lambda s: (s.cwd is None, s.cwd or "", s.path)) return refs def list_projects(projects_dir: Optional[str] = None) -> list[ProjectRoot]: """Distinct cwd roots and their session counts (sessions sitting EXACTLY at that cwd). This is the raw browser inventory — the UI builds a folder tree over these. Sessions with no cwd are excluded. Sorted by cwd.""" counts: dict[str, int] = {} for s in discover_sessions(projects_dir): if s.cwd is None: continue counts[s.cwd] = counts.get(s.cwd, 0) + 1 return [ProjectRoot(cwd=c, sessions=n) for c, n in sorted(counts.items())] def sessions_under( root: str, projects_dir: Optional[str] = None, sessions: Optional[list[SessionRef]] = None, ) -> list[SessionRef]: """All sessions whose real cwd is ``root`` or a descendant of it (PREFIX match on path segments, deepest-folder-aware). Pass a pre-computed ``sessions`` list to avoid re-walking the disk. Sorted by (cwd, path).""" root_n = _norm(root) if root_n is None: return [] pool = sessions if sessions is not None else discover_sessions(projects_dir) hits = [s for s in pool if s.cwd is not None and _is_under(s.cwd, root_n)] hits.sort(key=lambda s: (s.cwd or "", s.path)) return hits def attribute( roots: list[str], projects_dir: Optional[str] = None, sessions: Optional[list[SessionRef]] = None, ) -> Attribution: """DEEPEST-folder-wins attribution of every session to the most specific ticked root. For each session, among the ticked roots that are ancestors of (or equal to) its cwd, the LONGEST such root wins — so a session in a nested directory is counted under the deepest ticked folder and a parent tick does not also claim it. Every session lands under at most one root: nested ticks NEVER double-count. Sessions under no ticked root go to ``unattributed``. """ # normalize + de-dup ticked roots; keep deterministic order for the output map seen: set[str] = set() norm_roots: list[str] = [] for r in roots: rn = _norm(r) if rn is None or rn in seen: continue seen.add(rn) norm_roots.append(rn) norm_roots.sort() pool = sessions if sessions is not None else discover_sessions(projects_dir) by_root: dict[str, list[SessionRef]] = {r: [] for r in norm_roots} unattributed: list[SessionRef] = [] for s in pool: if s.cwd is None: unattributed.append(s) continue # candidate ticked roots that are ancestors of (or equal to) this cwd ancestors = [r for r in norm_roots if _is_under(s.cwd, r)] if not ancestors: unattributed.append(s) continue # DEEPEST wins: longest root path string == most path segments == most # specific. Ties are impossible (roots are de-duped distinct paths). winner = max(ancestors, key=len) by_root[winner].append(s) # deterministic order inside each bucket for r in by_root: by_root[r].sort(key=lambda s: (s.cwd or "", s.path)) unattributed.sort(key=lambda s: (s.cwd is None, s.cwd or "", s.path)) return Attribution(by_root=by_root, unattributed=unattributed)