Spaces:
Running on Zero
Running on Zero
| """discovery.py — Phase-5 multi-session discovery. Deterministic. NO model. | |
| Walks ``~/.claude/projects/*/*.jsonl`` and groups sessions by the REAL working | |
| directory read from INSIDE each file. The encoded ``~/.claude/projects/<dir>`` | |
| folder name is lossy (``/``, ``-``, ``_``, ``.`` all collapse to ``-``), so it is | |
| NEVER decoded — the cwd is trusted from the events (Non-negotiable #5). | |
| This module is pure path/IO bookkeeping over the SAME session-metadata rule the | |
| loader uses (the first ``user``/``assistant`` row's ``cwd``). It does not parse | |
| turns or run any engine logic, and it touches no model and no network | |
| (Non-negotiables #1, #2). It is a sibling of the loaders: a thin discovery layer | |
| that hands the rest of the engine a list of session files + their roots. | |
| Public API | |
| ---------- | |
| ``discover_sessions(projects_dir=None)`` -> list[SessionRef] | |
| Every session under ``~/.claude/projects`` with its real cwd. | |
| ``list_projects(projects_dir=None)`` -> list[ProjectRoot] | |
| Distinct cwd roots, each with its session count (the root browser feeds on this). | |
| ``sessions_under(root, projects_dir=None, sessions=None)`` -> list[SessionRef] | |
| All sessions whose cwd is ``root`` or a descendant of ``root`` (PREFIX match). | |
| ``attribute(roots, projects_dir=None, sessions=None)`` -> Attribution | |
| DEEPEST-folder-wins assignment of every session to the most specific ticked | |
| root. Nested ticks NEVER double-count: a session lands under exactly one root. | |
| Determinism: every list returned is sorted (sessions by (cwd, path); roots by | |
| cwd) so callers and gates get a stable order. | |
| """ | |
| from __future__ import annotations | |
| import glob | |
| import json | |
| import os | |
| from dataclasses import dataclass, field | |
| from typing import Any, Optional | |
| # The encoded folder name is lossy — we read cwd from INSIDE the file instead. | |
| # These are the only row types the loader trusts for session metadata; we use the | |
| # SAME rule (first user/assistant row carrying a cwd) so discovery and the loader | |
| # agree on every session's home directory. | |
| _CWD_BEARING_TYPES = ("user", "assistant") | |
| # --------------------------------------------------------------------------- # | |
| # shapes | |
| # --------------------------------------------------------------------------- # | |
| class SessionRef: | |
| """One discovered session file + the real cwd read from inside it. | |
| ``cwd`` is None only when no user/assistant row carried one (a malformed or | |
| metadata-only file); such sessions are kept but never attributed to a root. | |
| """ | |
| path: str # absolute path to the .jsonl | |
| cwd: Optional[str] # real working dir, trusted from inside the file | |
| sessionId: Optional[str] = None | |
| encodedDir: Optional[str] = None # the lossy folder name (kept for display only) | |
| startedAt: Optional[str] = None # ISO ts of the first user/assistant row (start time) | |
| def to_dict(self) -> dict[str, Any]: | |
| return { | |
| "path": self.path, | |
| "cwd": self.cwd, | |
| "sessionId": self.sessionId, | |
| "encodedDir": self.encodedDir, | |
| "startedAt": self.startedAt, | |
| } | |
| class ProjectRoot: | |
| """A distinct cwd root and how many sessions live exactly at it.""" | |
| cwd: str | |
| sessions: int | |
| def to_dict(self) -> dict[str, Any]: | |
| return {"cwd": self.cwd, "sessions": self.sessions} | |
| class Attribution: | |
| """Result of a DEEPEST-folder-wins multi-root tick. | |
| ``by_root`` maps each ticked root -> the sessions attributed to it (each | |
| session appears under at most ONE root). ``unattributed`` holds sessions | |
| whose cwd is under no ticked root (or has no cwd). The three counts are a | |
| partition: ``attributed + unattributed == total`` always holds. | |
| """ | |
| by_root: dict[str, list[SessionRef]] = field(default_factory=dict) | |
| unattributed: list[SessionRef] = field(default_factory=list) | |
| def attributed_count(self) -> int: | |
| return sum(len(v) for v in self.by_root.values()) | |
| def unattributed_count(self) -> int: | |
| return len(self.unattributed) | |
| def total(self) -> int: | |
| return self.attributed_count + self.unattributed_count | |
| def to_dict(self) -> dict[str, Any]: | |
| return { | |
| "by_root": { | |
| r: [s.to_dict() for s in ss] for r, ss in self.by_root.items() | |
| }, | |
| "unattributed": [s.to_dict() for s in self.unattributed], | |
| "attributed_count": self.attributed_count, | |
| "unattributed_count": self.unattributed_count, | |
| "total": self.total, | |
| } | |
| # --------------------------------------------------------------------------- # | |
| # path helpers — normalize WITHOUT resolving symlinks or touching the FS. | |
| # Normalization is purely lexical so a session's cwd (a string written by a past | |
| # run) is comparable to a root the user ticks now, even if that dir is gone. | |
| # --------------------------------------------------------------------------- # | |
| def _norm(path: Optional[str]) -> Optional[str]: | |
| """Lexically normalize an absolute-ish path: strip a trailing slash, collapse | |
| redundant separators / '.' segments. Does NOT resolve symlinks or '..' against | |
| the real FS (paths may no longer exist). Returns None for falsy input.""" | |
| if not path or not isinstance(path, str): | |
| return None | |
| p = os.path.normpath(path) | |
| # normpath turns '/' into '/' already; for non-root, drop any trailing sep. | |
| if len(p) > 1: | |
| p = p.rstrip(os.sep) | |
| return p | |
| def _is_under(cwd: str, root: str) -> bool: | |
| """True iff normalized ``cwd`` is ``root`` itself or a descendant of ``root``. | |
| PREFIX match on PATH SEGMENTS — never a raw string ``startswith`` (which would | |
| falsely match ``/a/foo`` under ``/a/fo``). Both args must already be _norm'd. | |
| """ | |
| if cwd == root: | |
| return True | |
| # ensure a segment boundary: '/a/b' is under '/a' (root + sep) but '/ab' is not | |
| prefix = root if root.endswith(os.sep) else root + os.sep | |
| return cwd.startswith(prefix) | |
| def default_projects_dir() -> str: | |
| """``~/.claude/projects`` with ``~`` expanded. The standard Claude Code store.""" | |
| return os.path.join(os.path.expanduser("~"), ".claude", "projects") | |
| # --------------------------------------------------------------------------- # | |
| # the seam: read the REAL cwd from inside one session file | |
| # --------------------------------------------------------------------------- # | |
| def _read_session_meta( | |
| path: str, | |
| ) -> tuple[Optional[str], Optional[str], Optional[str]]: | |
| """Return (cwd, sessionId, startedAt) read from the FIRST user/assistant row. | |
| Mirrors the loader's session-metadata rule exactly. Streams line-by-line and | |
| stops at the first qualifying row — we never load the whole file just to find | |
| a home directory. ``startedAt`` is that same row's ISO ``timestamp`` (the | |
| session start time, so Shripal can tell two sessions apart at a glance); it is | |
| free to grab while we are already on the row. Malformed lines are skipped; an | |
| unreadable file yields (None, None, None) rather than raising, so one bad | |
| session can't break discovery. | |
| """ | |
| try: | |
| with open(path, "r", encoding="utf-8") as fh: | |
| for line in fh: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| r = json.loads(line) | |
| except (ValueError, json.JSONDecodeError): | |
| continue | |
| if not isinstance(r, dict): | |
| continue | |
| if r.get("type") in _CWD_BEARING_TYPES and r.get("cwd"): | |
| return _norm(r.get("cwd")), r.get("sessionId"), r.get("timestamp") | |
| except OSError: | |
| return None, None, None | |
| return None, None, None | |
| # --------------------------------------------------------------------------- # | |
| # public API | |
| # --------------------------------------------------------------------------- # | |
| def discover_sessions(projects_dir: Optional[str] = None) -> list[SessionRef]: | |
| """Walk ``<projects_dir>/*/*.jsonl`` and return a SessionRef per file. | |
| The real cwd is read from inside each file; the lossy encoded folder name is | |
| carried only for display. Result is sorted deterministically by (cwd, path) | |
| with None cwds (if any) sorted last. | |
| """ | |
| base = _norm(projects_dir) or default_projects_dir() | |
| refs: list[SessionRef] = [] | |
| # one level of project dirs, then session files — the standard CC layout. | |
| for fp in glob.glob(os.path.join(base, "*", "*.jsonl")): | |
| fp_abs = os.path.abspath(fp) | |
| cwd, sid, started = _read_session_meta(fp_abs) | |
| refs.append( | |
| SessionRef( | |
| path=fp_abs, | |
| cwd=cwd, | |
| sessionId=sid, | |
| encodedDir=os.path.basename(os.path.dirname(fp_abs)), | |
| startedAt=started, | |
| ) | |
| ) | |
| # deterministic order; None cwd last | |
| refs.sort(key=lambda s: (s.cwd is None, s.cwd or "", s.path)) | |
| return refs | |
| def list_projects(projects_dir: Optional[str] = None) -> list[ProjectRoot]: | |
| """Distinct cwd roots and their session counts (sessions sitting EXACTLY at | |
| that cwd). This is the raw browser inventory — the UI builds a folder tree | |
| over these. Sessions with no cwd are excluded. Sorted by cwd.""" | |
| counts: dict[str, int] = {} | |
| for s in discover_sessions(projects_dir): | |
| if s.cwd is None: | |
| continue | |
| counts[s.cwd] = counts.get(s.cwd, 0) + 1 | |
| return [ProjectRoot(cwd=c, sessions=n) for c, n in sorted(counts.items())] | |
| def sessions_under( | |
| root: str, | |
| projects_dir: Optional[str] = None, | |
| sessions: Optional[list[SessionRef]] = None, | |
| ) -> list[SessionRef]: | |
| """All sessions whose real cwd is ``root`` or a descendant of it (PREFIX | |
| match on path segments, deepest-folder-aware). Pass a pre-computed | |
| ``sessions`` list to avoid re-walking the disk. Sorted by (cwd, path).""" | |
| root_n = _norm(root) | |
| if root_n is None: | |
| return [] | |
| pool = sessions if sessions is not None else discover_sessions(projects_dir) | |
| hits = [s for s in pool if s.cwd is not None and _is_under(s.cwd, root_n)] | |
| hits.sort(key=lambda s: (s.cwd or "", s.path)) | |
| return hits | |
| def attribute( | |
| roots: list[str], | |
| projects_dir: Optional[str] = None, | |
| sessions: Optional[list[SessionRef]] = None, | |
| ) -> Attribution: | |
| """DEEPEST-folder-wins attribution of every session to the most specific | |
| ticked root. | |
| For each session, among the ticked roots that are ancestors of (or equal to) | |
| its cwd, the LONGEST such root wins — so a session in a nested directory is | |
| counted under the deepest ticked folder and a parent tick does not also claim | |
| it. Every session lands under at most one root: nested ticks NEVER | |
| double-count. Sessions under no ticked root go to ``unattributed``. | |
| """ | |
| # normalize + de-dup ticked roots; keep deterministic order for the output map | |
| seen: set[str] = set() | |
| norm_roots: list[str] = [] | |
| for r in roots: | |
| rn = _norm(r) | |
| if rn is None or rn in seen: | |
| continue | |
| seen.add(rn) | |
| norm_roots.append(rn) | |
| norm_roots.sort() | |
| pool = sessions if sessions is not None else discover_sessions(projects_dir) | |
| by_root: dict[str, list[SessionRef]] = {r: [] for r in norm_roots} | |
| unattributed: list[SessionRef] = [] | |
| for s in pool: | |
| if s.cwd is None: | |
| unattributed.append(s) | |
| continue | |
| # candidate ticked roots that are ancestors of (or equal to) this cwd | |
| ancestors = [r for r in norm_roots if _is_under(s.cwd, r)] | |
| if not ancestors: | |
| unattributed.append(s) | |
| continue | |
| # DEEPEST wins: longest root path string == most path segments == most | |
| # specific. Ties are impossible (roots are de-duped distinct paths). | |
| winner = max(ancestors, key=len) | |
| by_root[winner].append(s) | |
| # deterministic order inside each bucket | |
| for r in by_root: | |
| by_root[r].sort(key=lambda s: (s.cwd or "", s.path)) | |
| unattributed.sort(key=lambda s: (s.cwd is None, s.cwd or "", s.path)) | |
| return Attribution(by_root=by_root, unattributed=unattributed) | |