File size: 12,317 Bytes
5f43c7d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
"""discovery.py — Phase-5 multi-session discovery. Deterministic. NO model.

Walks ``~/.claude/projects/*/*.jsonl`` and groups sessions by the REAL working
directory read from INSIDE each file. The encoded ``~/.claude/projects/<dir>``
folder name is lossy (``/``, ``-``, ``_``, ``.`` all collapse to ``-``), so it is
NEVER decoded — the cwd is trusted from the events (Non-negotiable #5).

This module is pure path/IO bookkeeping over the SAME session-metadata rule the
loader uses (the first ``user``/``assistant`` row's ``cwd``). It does not parse
turns or run any engine logic, and it touches no model and no network
(Non-negotiables #1, #2). It is a sibling of the loaders: a thin discovery layer
that hands the rest of the engine a list of session files + their roots.

Public API
----------
``discover_sessions(projects_dir=None)`` -> list[SessionRef]
    Every session under ``~/.claude/projects`` with its real cwd.
``list_projects(projects_dir=None)`` -> list[ProjectRoot]
    Distinct cwd roots, each with its session count (the root browser feeds on this).
``sessions_under(root, projects_dir=None, sessions=None)`` -> list[SessionRef]
    All sessions whose cwd is ``root`` or a descendant of ``root`` (PREFIX match).
``attribute(roots, projects_dir=None, sessions=None)`` -> Attribution
    DEEPEST-folder-wins assignment of every session to the most specific ticked
    root. Nested ticks NEVER double-count: a session lands under exactly one root.

Determinism: every list returned is sorted (sessions by (cwd, path); roots by
cwd) so callers and gates get a stable order.
"""
from __future__ import annotations

import glob
import json
import os
from dataclasses import dataclass, field
from typing import Any, Optional

# The encoded folder name is lossy — we read cwd from INSIDE the file instead.
# These are the only row types the loader trusts for session metadata; we use the
# SAME rule (first user/assistant row carrying a cwd) so discovery and the loader
# agree on every session's home directory.
_CWD_BEARING_TYPES = ("user", "assistant")


# --------------------------------------------------------------------------- #
# shapes
# --------------------------------------------------------------------------- #
@dataclass(frozen=True)
class SessionRef:
    """One discovered session file + the real cwd read from inside it.

    ``cwd`` is None only when no user/assistant row carried one (a malformed or
    metadata-only file); such sessions are kept but never attributed to a root.
    """

    path: str                      # absolute path to the .jsonl
    cwd: Optional[str]             # real working dir, trusted from inside the file
    sessionId: Optional[str] = None
    encodedDir: Optional[str] = None  # the lossy folder name (kept for display only)
    startedAt: Optional[str] = None   # ISO ts of the first user/assistant row (start time)

    def to_dict(self) -> dict[str, Any]:
        return {
            "path": self.path,
            "cwd": self.cwd,
            "sessionId": self.sessionId,
            "encodedDir": self.encodedDir,
            "startedAt": self.startedAt,
        }


@dataclass(frozen=True)
class ProjectRoot:
    """A distinct cwd root and how many sessions live exactly at it."""

    cwd: str
    sessions: int

    def to_dict(self) -> dict[str, Any]:
        return {"cwd": self.cwd, "sessions": self.sessions}


@dataclass
class Attribution:
    """Result of a DEEPEST-folder-wins multi-root tick.

    ``by_root`` maps each ticked root -> the sessions attributed to it (each
    session appears under at most ONE root). ``unattributed`` holds sessions
    whose cwd is under no ticked root (or has no cwd). The three counts are a
    partition: ``attributed + unattributed == total`` always holds.
    """

    by_root: dict[str, list[SessionRef]] = field(default_factory=dict)
    unattributed: list[SessionRef] = field(default_factory=list)

    @property
    def attributed_count(self) -> int:
        return sum(len(v) for v in self.by_root.values())

    @property
    def unattributed_count(self) -> int:
        return len(self.unattributed)

    @property
    def total(self) -> int:
        return self.attributed_count + self.unattributed_count

    def to_dict(self) -> dict[str, Any]:
        return {
            "by_root": {
                r: [s.to_dict() for s in ss] for r, ss in self.by_root.items()
            },
            "unattributed": [s.to_dict() for s in self.unattributed],
            "attributed_count": self.attributed_count,
            "unattributed_count": self.unattributed_count,
            "total": self.total,
        }


# --------------------------------------------------------------------------- #
# path helpers — normalize WITHOUT resolving symlinks or touching the FS.
# Normalization is purely lexical so a session's cwd (a string written by a past
# run) is comparable to a root the user ticks now, even if that dir is gone.
# --------------------------------------------------------------------------- #
def _norm(path: Optional[str]) -> Optional[str]:
    """Lexically normalize an absolute-ish path: strip a trailing slash, collapse
    redundant separators / '.' segments. Does NOT resolve symlinks or '..' against
    the real FS (paths may no longer exist). Returns None for falsy input."""
    if not path or not isinstance(path, str):
        return None
    p = os.path.normpath(path)
    # normpath turns '/' into '/' already; for non-root, drop any trailing sep.
    if len(p) > 1:
        p = p.rstrip(os.sep)
    return p


def _is_under(cwd: str, root: str) -> bool:
    """True iff normalized ``cwd`` is ``root`` itself or a descendant of ``root``.

    PREFIX match on PATH SEGMENTS — never a raw string ``startswith`` (which would
    falsely match ``/a/foo`` under ``/a/fo``). Both args must already be _norm'd.
    """
    if cwd == root:
        return True
    # ensure a segment boundary: '/a/b' is under '/a' (root + sep) but '/ab' is not
    prefix = root if root.endswith(os.sep) else root + os.sep
    return cwd.startswith(prefix)


def default_projects_dir() -> str:
    """``~/.claude/projects`` with ``~`` expanded. The standard Claude Code store."""
    return os.path.join(os.path.expanduser("~"), ".claude", "projects")


# --------------------------------------------------------------------------- #
# the seam: read the REAL cwd from inside one session file
# --------------------------------------------------------------------------- #
def _read_session_meta(
    path: str,
) -> tuple[Optional[str], Optional[str], Optional[str]]:
    """Return (cwd, sessionId, startedAt) read from the FIRST user/assistant row.

    Mirrors the loader's session-metadata rule exactly. Streams line-by-line and
    stops at the first qualifying row — we never load the whole file just to find
    a home directory. ``startedAt`` is that same row's ISO ``timestamp`` (the
    session start time, so Shripal can tell two sessions apart at a glance); it is
    free to grab while we are already on the row. Malformed lines are skipped; an
    unreadable file yields (None, None, None) rather than raising, so one bad
    session can't break discovery.
    """
    try:
        with open(path, "r", encoding="utf-8") as fh:
            for line in fh:
                line = line.strip()
                if not line:
                    continue
                try:
                    r = json.loads(line)
                except (ValueError, json.JSONDecodeError):
                    continue
                if not isinstance(r, dict):
                    continue
                if r.get("type") in _CWD_BEARING_TYPES and r.get("cwd"):
                    return _norm(r.get("cwd")), r.get("sessionId"), r.get("timestamp")
    except OSError:
        return None, None, None
    return None, None, None


# --------------------------------------------------------------------------- #
# public API
# --------------------------------------------------------------------------- #
def discover_sessions(projects_dir: Optional[str] = None) -> list[SessionRef]:
    """Walk ``<projects_dir>/*/*.jsonl`` and return a SessionRef per file.

    The real cwd is read from inside each file; the lossy encoded folder name is
    carried only for display. Result is sorted deterministically by (cwd, path)
    with None cwds (if any) sorted last.
    """
    base = _norm(projects_dir) or default_projects_dir()
    refs: list[SessionRef] = []
    # one level of project dirs, then session files — the standard CC layout.
    for fp in glob.glob(os.path.join(base, "*", "*.jsonl")):
        fp_abs = os.path.abspath(fp)
        cwd, sid, started = _read_session_meta(fp_abs)
        refs.append(
            SessionRef(
                path=fp_abs,
                cwd=cwd,
                sessionId=sid,
                encodedDir=os.path.basename(os.path.dirname(fp_abs)),
                startedAt=started,
            )
        )
    # deterministic order; None cwd last
    refs.sort(key=lambda s: (s.cwd is None, s.cwd or "", s.path))
    return refs


def list_projects(projects_dir: Optional[str] = None) -> list[ProjectRoot]:
    """Distinct cwd roots and their session counts (sessions sitting EXACTLY at
    that cwd). This is the raw browser inventory — the UI builds a folder tree
    over these. Sessions with no cwd are excluded. Sorted by cwd."""
    counts: dict[str, int] = {}
    for s in discover_sessions(projects_dir):
        if s.cwd is None:
            continue
        counts[s.cwd] = counts.get(s.cwd, 0) + 1
    return [ProjectRoot(cwd=c, sessions=n) for c, n in sorted(counts.items())]


def sessions_under(
    root: str,
    projects_dir: Optional[str] = None,
    sessions: Optional[list[SessionRef]] = None,
) -> list[SessionRef]:
    """All sessions whose real cwd is ``root`` or a descendant of it (PREFIX
    match on path segments, deepest-folder-aware). Pass a pre-computed
    ``sessions`` list to avoid re-walking the disk. Sorted by (cwd, path)."""
    root_n = _norm(root)
    if root_n is None:
        return []
    pool = sessions if sessions is not None else discover_sessions(projects_dir)
    hits = [s for s in pool if s.cwd is not None and _is_under(s.cwd, root_n)]
    hits.sort(key=lambda s: (s.cwd or "", s.path))
    return hits


def attribute(
    roots: list[str],
    projects_dir: Optional[str] = None,
    sessions: Optional[list[SessionRef]] = None,
) -> Attribution:
    """DEEPEST-folder-wins attribution of every session to the most specific
    ticked root.

    For each session, among the ticked roots that are ancestors of (or equal to)
    its cwd, the LONGEST such root wins — so a session in a nested directory is
    counted under the deepest ticked folder and a parent tick does not also claim
    it. Every session lands under at most one root: nested ticks NEVER
    double-count. Sessions under no ticked root go to ``unattributed``.
    """
    # normalize + de-dup ticked roots; keep deterministic order for the output map
    seen: set[str] = set()
    norm_roots: list[str] = []
    for r in roots:
        rn = _norm(r)
        if rn is None or rn in seen:
            continue
        seen.add(rn)
        norm_roots.append(rn)
    norm_roots.sort()

    pool = sessions if sessions is not None else discover_sessions(projects_dir)

    by_root: dict[str, list[SessionRef]] = {r: [] for r in norm_roots}
    unattributed: list[SessionRef] = []

    for s in pool:
        if s.cwd is None:
            unattributed.append(s)
            continue
        # candidate ticked roots that are ancestors of (or equal to) this cwd
        ancestors = [r for r in norm_roots if _is_under(s.cwd, r)]
        if not ancestors:
            unattributed.append(s)
            continue
        # DEEPEST wins: longest root path string == most path segments == most
        # specific. Ties are impossible (roots are de-duped distinct paths).
        winner = max(ancestors, key=len)
        by_root[winner].append(s)

    # deterministic order inside each bucket
    for r in by_root:
        by_root[r].sort(key=lambda s: (s.cwd or "", s.path))
    unattributed.sort(key=lambda s: (s.cwd is None, s.cwd or "", s.path))

    return Attribution(by_root=by_root, unattributed=unattributed)