"""discovery.py — Phase-5 multi-session discovery. Deterministic. NO model.
Walks ``~/.claude/projects/*/*.jsonl`` and groups sessions by the REAL working
directory read from INSIDE each file. The encoded ``~/.claude/projects/
``
folder name is lossy (``/``, ``-``, ``_``, ``.`` all collapse to ``-``), so it is
NEVER decoded — the cwd is trusted from the events (Non-negotiable #5).
This module is pure path/IO bookkeeping over the SAME session-metadata rule the
loader uses (the first ``user``/``assistant`` row's ``cwd``). It does not parse
turns or run any engine logic, and it touches no model and no network
(Non-negotiables #1, #2). It is a sibling of the loaders: a thin discovery layer
that hands the rest of the engine a list of session files + their roots.
Public API
----------
``discover_sessions(projects_dir=None)`` -> list[SessionRef]
Every session under ``~/.claude/projects`` with its real cwd.
``list_projects(projects_dir=None)`` -> list[ProjectRoot]
Distinct cwd roots, each with its session count (the root browser feeds on this).
``sessions_under(root, projects_dir=None, sessions=None)`` -> list[SessionRef]
All sessions whose cwd is ``root`` or a descendant of ``root`` (PREFIX match).
``attribute(roots, projects_dir=None, sessions=None)`` -> Attribution
DEEPEST-folder-wins assignment of every session to the most specific ticked
root. Nested ticks NEVER double-count: a session lands under exactly one root.
Determinism: every list returned is sorted (sessions by (cwd, path); roots by
cwd) so callers and gates get a stable order.
"""
from __future__ import annotations
import glob
import json
import os
from dataclasses import dataclass, field
from typing import Any, Optional
# The encoded folder name is lossy — we read cwd from INSIDE the file instead.
# These are the only row types the loader trusts for session metadata; we use the
# SAME rule (first user/assistant row carrying a cwd) so discovery and the loader
# agree on every session's home directory.
_CWD_BEARING_TYPES = ("user", "assistant")
# --------------------------------------------------------------------------- #
# shapes
# --------------------------------------------------------------------------- #
@dataclass(frozen=True)
class SessionRef:
"""One discovered session file + the real cwd read from inside it.
``cwd`` is None only when no user/assistant row carried one (a malformed or
metadata-only file); such sessions are kept but never attributed to a root.
"""
path: str # absolute path to the .jsonl
cwd: Optional[str] # real working dir, trusted from inside the file
sessionId: Optional[str] = None
encodedDir: Optional[str] = None # the lossy folder name (kept for display only)
startedAt: Optional[str] = None # ISO ts of the first user/assistant row (start time)
def to_dict(self) -> dict[str, Any]:
return {
"path": self.path,
"cwd": self.cwd,
"sessionId": self.sessionId,
"encodedDir": self.encodedDir,
"startedAt": self.startedAt,
}
@dataclass(frozen=True)
class ProjectRoot:
"""A distinct cwd root and how many sessions live exactly at it."""
cwd: str
sessions: int
def to_dict(self) -> dict[str, Any]:
return {"cwd": self.cwd, "sessions": self.sessions}
@dataclass
class Attribution:
"""Result of a DEEPEST-folder-wins multi-root tick.
``by_root`` maps each ticked root -> the sessions attributed to it (each
session appears under at most ONE root). ``unattributed`` holds sessions
whose cwd is under no ticked root (or has no cwd). The three counts are a
partition: ``attributed + unattributed == total`` always holds.
"""
by_root: dict[str, list[SessionRef]] = field(default_factory=dict)
unattributed: list[SessionRef] = field(default_factory=list)
@property
def attributed_count(self) -> int:
return sum(len(v) for v in self.by_root.values())
@property
def unattributed_count(self) -> int:
return len(self.unattributed)
@property
def total(self) -> int:
return self.attributed_count + self.unattributed_count
def to_dict(self) -> dict[str, Any]:
return {
"by_root": {
r: [s.to_dict() for s in ss] for r, ss in self.by_root.items()
},
"unattributed": [s.to_dict() for s in self.unattributed],
"attributed_count": self.attributed_count,
"unattributed_count": self.unattributed_count,
"total": self.total,
}
# --------------------------------------------------------------------------- #
# path helpers — normalize WITHOUT resolving symlinks or touching the FS.
# Normalization is purely lexical so a session's cwd (a string written by a past
# run) is comparable to a root the user ticks now, even if that dir is gone.
# --------------------------------------------------------------------------- #
def _norm(path: Optional[str]) -> Optional[str]:
"""Lexically normalize an absolute-ish path: strip a trailing slash, collapse
redundant separators / '.' segments. Does NOT resolve symlinks or '..' against
the real FS (paths may no longer exist). Returns None for falsy input."""
if not path or not isinstance(path, str):
return None
p = os.path.normpath(path)
# normpath turns '/' into '/' already; for non-root, drop any trailing sep.
if len(p) > 1:
p = p.rstrip(os.sep)
return p
def _is_under(cwd: str, root: str) -> bool:
"""True iff normalized ``cwd`` is ``root`` itself or a descendant of ``root``.
PREFIX match on PATH SEGMENTS — never a raw string ``startswith`` (which would
falsely match ``/a/foo`` under ``/a/fo``). Both args must already be _norm'd.
"""
if cwd == root:
return True
# ensure a segment boundary: '/a/b' is under '/a' (root + sep) but '/ab' is not
prefix = root if root.endswith(os.sep) else root + os.sep
return cwd.startswith(prefix)
def default_projects_dir() -> str:
"""``~/.claude/projects`` with ``~`` expanded. The standard Claude Code store."""
return os.path.join(os.path.expanduser("~"), ".claude", "projects")
# --------------------------------------------------------------------------- #
# the seam: read the REAL cwd from inside one session file
# --------------------------------------------------------------------------- #
def _read_session_meta(
path: str,
) -> tuple[Optional[str], Optional[str], Optional[str]]:
"""Return (cwd, sessionId, startedAt) read from the FIRST user/assistant row.
Mirrors the loader's session-metadata rule exactly. Streams line-by-line and
stops at the first qualifying row — we never load the whole file just to find
a home directory. ``startedAt`` is that same row's ISO ``timestamp`` (the
session start time, so Shripal can tell two sessions apart at a glance); it is
free to grab while we are already on the row. Malformed lines are skipped; an
unreadable file yields (None, None, None) rather than raising, so one bad
session can't break discovery.
"""
try:
with open(path, "r", encoding="utf-8") as fh:
for line in fh:
line = line.strip()
if not line:
continue
try:
r = json.loads(line)
except (ValueError, json.JSONDecodeError):
continue
if not isinstance(r, dict):
continue
if r.get("type") in _CWD_BEARING_TYPES and r.get("cwd"):
return _norm(r.get("cwd")), r.get("sessionId"), r.get("timestamp")
except OSError:
return None, None, None
return None, None, None
# --------------------------------------------------------------------------- #
# public API
# --------------------------------------------------------------------------- #
def discover_sessions(projects_dir: Optional[str] = None) -> list[SessionRef]:
"""Walk ``/*/*.jsonl`` and return a SessionRef per file.
The real cwd is read from inside each file; the lossy encoded folder name is
carried only for display. Result is sorted deterministically by (cwd, path)
with None cwds (if any) sorted last.
"""
base = _norm(projects_dir) or default_projects_dir()
refs: list[SessionRef] = []
# one level of project dirs, then session files — the standard CC layout.
for fp in glob.glob(os.path.join(base, "*", "*.jsonl")):
fp_abs = os.path.abspath(fp)
cwd, sid, started = _read_session_meta(fp_abs)
refs.append(
SessionRef(
path=fp_abs,
cwd=cwd,
sessionId=sid,
encodedDir=os.path.basename(os.path.dirname(fp_abs)),
startedAt=started,
)
)
# deterministic order; None cwd last
refs.sort(key=lambda s: (s.cwd is None, s.cwd or "", s.path))
return refs
def list_projects(projects_dir: Optional[str] = None) -> list[ProjectRoot]:
"""Distinct cwd roots and their session counts (sessions sitting EXACTLY at
that cwd). This is the raw browser inventory — the UI builds a folder tree
over these. Sessions with no cwd are excluded. Sorted by cwd."""
counts: dict[str, int] = {}
for s in discover_sessions(projects_dir):
if s.cwd is None:
continue
counts[s.cwd] = counts.get(s.cwd, 0) + 1
return [ProjectRoot(cwd=c, sessions=n) for c, n in sorted(counts.items())]
def sessions_under(
root: str,
projects_dir: Optional[str] = None,
sessions: Optional[list[SessionRef]] = None,
) -> list[SessionRef]:
"""All sessions whose real cwd is ``root`` or a descendant of it (PREFIX
match on path segments, deepest-folder-aware). Pass a pre-computed
``sessions`` list to avoid re-walking the disk. Sorted by (cwd, path)."""
root_n = _norm(root)
if root_n is None:
return []
pool = sessions if sessions is not None else discover_sessions(projects_dir)
hits = [s for s in pool if s.cwd is not None and _is_under(s.cwd, root_n)]
hits.sort(key=lambda s: (s.cwd or "", s.path))
return hits
def attribute(
roots: list[str],
projects_dir: Optional[str] = None,
sessions: Optional[list[SessionRef]] = None,
) -> Attribution:
"""DEEPEST-folder-wins attribution of every session to the most specific
ticked root.
For each session, among the ticked roots that are ancestors of (or equal to)
its cwd, the LONGEST such root wins — so a session in a nested directory is
counted under the deepest ticked folder and a parent tick does not also claim
it. Every session lands under at most one root: nested ticks NEVER
double-count. Sessions under no ticked root go to ``unattributed``.
"""
# normalize + de-dup ticked roots; keep deterministic order for the output map
seen: set[str] = set()
norm_roots: list[str] = []
for r in roots:
rn = _norm(r)
if rn is None or rn in seen:
continue
seen.add(rn)
norm_roots.append(rn)
norm_roots.sort()
pool = sessions if sessions is not None else discover_sessions(projects_dir)
by_root: dict[str, list[SessionRef]] = {r: [] for r in norm_roots}
unattributed: list[SessionRef] = []
for s in pool:
if s.cwd is None:
unattributed.append(s)
continue
# candidate ticked roots that are ancestors of (or equal to) this cwd
ancestors = [r for r in norm_roots if _is_under(s.cwd, r)]
if not ancestors:
unattributed.append(s)
continue
# DEEPEST wins: longest root path string == most path segments == most
# specific. Ties are impossible (roots are de-duped distinct paths).
winner = max(ancestors, key=len)
by_root[winner].append(s)
# deterministic order inside each bucket
for r in by_root:
by_root[r].sort(key=lambda s: (s.cwd or "", s.path))
unattributed.sort(key=lambda s: (s.cwd is None, s.cwd or "", s.path))
return Attribution(by_root=by_root, unattributed=unattributed)