File size: 15,709 Bytes
5f43c7d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
"""jsonl-loader — the Phase-1 seam. The ONLY code permitted to touch raw JSONL.

`load(path) -> {'events': [...], 'turns': [...], 'session': {...}}`

Pure, deterministic, structural parse. NO model. NO provenance/heavy/loop/re-read
logic (that is Phase 2; the loader leaves those fields at neutral defaults).

Key rules (verified ground truth — see CLAUDE.md / TRACE-CONTRACT §3):

  TURN BOUNDARY (START-ANCHORED): a row opens a new turn IFF
    type == 'user' AND not isMeta AND message.content is a non-empty string
    (after .strip()) AND the trimmed string does NOT start with any of:
      '<command-name', '<command-message', '<local-command'.
    This is a str.startswith() test, NEVER a substring-anywhere test.
    '<task-notification>' rows ARE boundaries.

  ORIGIN: 'system' if the trimmed prompt starts with '<task-notification>',
          else 'human'.

  SIDECHAIN FOLDING (sub-agent / Task work): a row with top-level
  `isSidechain == true` is sub-agent activity, NOT a top-level query. Such a row
  is FOLDED into the current parent turn — it never opens a new turn (even if its
  message.content is a user-string that would otherwise look like a boundary), and
  its tool_use / tool_result / token content rolls up into the parent turn exactly
  like inline assistant/user rows. This keeps the Task's cost attributed to the
  query that spawned it instead of leaking into a spurious turn or being dropped.
  This fixture has 0 sidechains (every row is isSidechain == false), so the fold
  is a strict NO-OP here and the regression oracle is byte-identical.

  cwd / session metadata: trusted from INSIDE the file (the encoded dir name is
  lossy). Non-negotiable #5.
"""
from __future__ import annotations

import json
import os
from typing import Any, Optional

from engine.contract import Event, Tokens, ToolCall, Turn

# START-anchored prefixes that DISQUALIFY a user-string row from being a turn.
# (Slash-command scaffolding rows — NOT real prompts.)
_NON_PROMPT_PREFIXES = ("<command-name", "<command-message", "<local-command")
# A trimmed prompt starting with this marks a system-origin (background) turn.
_SYSTEM_PREFIX = "<task-notification>"


# --------------------------------------------------------------------------- #
# small structural helpers
# --------------------------------------------------------------------------- #
def _is_turn_boundary(row: dict[str, Any]) -> tuple[bool, str]:
    """Return (is_boundary, trimmed_prompt_or_empty). START-anchored, never substring."""
    if row.get("type") != "user":
        return False, ""
    if row.get("isMeta"):
        return False, ""
    content = row.get("message", {}).get("content")
    if not isinstance(content, str):
        return False, ""
    s = content.strip()
    if not s:
        return False, ""
    if s.startswith(_NON_PROMPT_PREFIXES):
        return False, ""
    return True, s


def _origin_for(prompt: str) -> str:
    return "system" if prompt.startswith(_SYSTEM_PREFIX) else "human"


def _is_sidechain(row: dict[str, Any]) -> bool:
    """True iff this row is sub-agent (Task) activity to FOLD into the parent turn.

    Sidechain rows must never open a new turn; their content rolls up into the
    current parent turn. In this fixture every row is isSidechain == false, so this
    returns False for all 1316 rows and the fold is a strict no-op.
    """
    return bool(row.get("isSidechain"))


def _basename(path: Any) -> str:
    if not isinstance(path, str) or not path:
        return str(path)
    return os.path.basename(path.rstrip("/")) or path


def _mcp_of(name: str, row: dict[str, Any]) -> Optional[dict[str, str]]:
    """Derive {server, tool} ONLY from an mcp__<server>__<tool> tool name — that
    name IS the tool's identity.

    The row's attributionMcpServer/attributionMcpTool is a turn-level *context*
    attribution (which MCP server was in play for the message), NOT a claim that
    this particular tool_use is that MCP call. Using it as a fallback mislabels
    ordinary Bash/Read/Edit calls as MCP (68 of them in the fixture, all tagged
    'claude.ai Hugging Face:hub_repo_details'). Tool identity comes from the name
    alone; the legend's MCP tally and these labels then agree (true MCP = 1)."""
    if isinstance(name, str) and name.startswith("mcp__"):
        rest = name[len("mcp__"):]
        server, sep, tool = rest.partition("__")
        if sep:
            return {"server": server, "tool": tool}
        return {"server": rest, "tool": ""}
    return None


def _summary(name: str, inp: Any, mcp: Optional[dict[str, str]]) -> str:
    """Human-scannable one-liner per tool. Deterministic, no model."""
    inp = inp if isinstance(inp, dict) else {}

    if name == "Read":
        return f"Read {_basename(inp.get('file_path'))}"
    if name in ("Edit", "Write"):
        return f"Edit {_basename(inp.get('file_path'))}"
    if name == "Bash":
        cmd = str(inp.get("command", "") or "")
        return f"Bash: {cmd[:60]}"
    if name in ("Grep", "Glob"):
        return f"{name} {inp.get('pattern', '')}"
    if name == "Task":
        desc = str(inp.get("description", "") or "")
        return f"Task: {desc[:60]}"
    if mcp is not None:
        return f"{mcp.get('server', '')}:{mcp.get('tool', '')}"
    return name


def _tool_result_text(block: dict[str, Any]) -> str:
    """Extract displayable text from a tool_result content block.
    content is usually a str; sometimes a list of {type:'text'|...} blocks."""
    content = block.get("content")
    if isinstance(content, str):
        return content
    if isinstance(content, list):
        parts: list[str] = []
        for b in content:
            if isinstance(b, dict):
                if isinstance(b.get("text"), str):
                    parts.append(b["text"])
        return "\n".join(parts)
    if content is None:
        return ""
    return str(content)


def _visible_text(block: dict[str, Any]) -> Optional[str]:
    """Visible assistant text (kind='text'); thinking is NOT shown in reply."""
    if block.get("type") == "text":
        t = block.get("text")
        if isinstance(t, str):
            return t
    return None


# --------------------------------------------------------------------------- #
# the seam
# --------------------------------------------------------------------------- #
def load(path: str) -> dict[str, Any]:
    """Parse a Claude Code session .jsonl into the normalized contract.

    Returns {'events': [Event...], 'turns': [Turn...], 'session': {...}}.
    Signature kept clean so an hf-loader drops in behind the same contract.
    """
    rows: list[dict[str, Any]] = []
    with open(path, "r", encoding="utf-8") as fh:
        for line in fh:
            line = line.strip()
            if not line:
                continue
            rows.append(json.loads(line))

    # --- session metadata: trust the file, not the folder name -------------- #
    session = {
        "cwd": None, "sessionId": None, "gitBranch": None, "version": None,
        "startedAt": None, "endedAt": None, "model": None,
    }
    for r in rows:
        if r.get("type") in ("user", "assistant"):
            session["cwd"] = r.get("cwd")
            session["sessionId"] = r.get("sessionId")
            session["gitBranch"] = r.get("gitBranch")
            session["version"] = r.get("version")
            break

    # --- model: the most-used assistant model id (for the report's "Model" line) -- #
    # Purely informational metadata (message.model on assistant rows); never affects
    # parsing or counts. Picks the modal model so a one-off sidechain model doesn't win.
    _models: dict[str, int] = {}
    for r in rows:
        if r.get("type") == "assistant":
            m = (r.get("message", {}) or {}).get("model")
            if m:
                _models[m] = _models.get(m, 0) + 1
    if _models:
        session["model"] = max(_models, key=_models.get)

    # --- session span: first/last ISO timestamp across ANY row that has one --- #
    # Rows are in file order, so the FIRST row carrying a "timestamp" is the start
    # and the LAST is the end. Any row type counts (a meta/system row can open or
    # close the file). Purely structural — no effect on turn parsing or counts;
    # the regression oracle stays byte-identical (Shripal: tell sessions apart).
    for r in rows:
        ts = r.get("timestamp")
        if ts:
            if session["startedAt"] is None:
                session["startedAt"] = ts
            session["endedAt"] = ts

    # --- index tool_results by tool_use_id (239 ↔ 239, 1:1 in fixture) ------ #
    result_text_by_id: dict[str, str] = {}
    for r in rows:
        if r.get("type") != "user":
            continue
        content = r.get("message", {}).get("content")
        if not isinstance(content, list):
            continue
        for b in content:
            if isinstance(b, dict) and b.get("type") == "tool_result":
                tuid = b.get("tool_use_id")
                if tuid is not None:
                    result_text_by_id[tuid] = _tool_result_text(b)

    # --- walk rows: assign every row to a turn; build events + turns -------- #
    turns: list[Turn] = []
    events: list[Event] = []
    cur: Optional[Turn] = None
    cur_req_ids: set[str] = set()
    reply_parts: list[str] = []

    def _finalize(turn: Optional[Turn]) -> None:
        if turn is None:
            return
        turn.reqs = len(cur_req_ids)
        turn.reply = "\n".join(p for p in reply_parts if p).strip()

    for idx, r in enumerate(rows):
        rtype = r.get("type")
        # SIDECHAIN FOLD: sub-agent (Task) rows fold into the current parent turn.
        # Suppress their boundary candidacy so they can never open a turn; their
        # tool_use / tool_result / token content then rolls up below exactly like
        # any inline assistant/user row. (No-op on this fixture: all rows false.)
        sidechain = _is_sidechain(r)
        is_boundary, prompt = (False, "") if sidechain else _is_turn_boundary(r)

        if is_boundary:
            # close the previous turn, open a new one
            _finalize(cur)
            cur_req_ids = set()
            reply_parts = []
            cur = Turn(
                i=len(turns),
                prompt=prompt,
                origin=_origin_for(prompt),
                ts=r.get("timestamp"),
            )
            turns.append(cur)
            events.append(
                Event(
                    id=str(r.get("uuid", f"row{idx}")),
                    turn=cur.i,
                    role="user",
                    kind="prompt",
                    ts=r.get("timestamp"),
                    input=prompt,
                )
            )
            continue

        # rows before the first boundary belong to no turn (only mode /
        # permission-mode / file-history-snapshot precede row index 3) → skip.
        if cur is None:
            continue

        turn_i = cur.i

        if rtype == "assistant":
            msg = r.get("message", {}) or {}
            usage = msg.get("usage", {}) or {}
            req_id = r.get("requestId")
            if req_id is not None:
                cur_req_ids.add(req_id)

            # token rollup for the turn (sum across assistant rows)
            cur.tokens = cur.tokens.add(
                Tokens(
                    in_=usage.get("input_tokens", 0) or 0,
                    out=usage.get("output_tokens", 0) or 0,
                    cacheRead=usage.get("cache_read_input_tokens", 0) or 0,
                    cacheCreate=usage.get("cache_creation_input_tokens", 0) or 0,
                )
            )

            # point-in-time context-window occupancy (the "fuel gauge"): the prompt
            # size of THIS request = input + cacheRead + cacheCreate (output is the
            # reply, not occupancy). Sidechain rows run in a sub-agent's own window, so
            # they must NOT count toward the main thread's gauge — guard on `sidechain`.
            if not sidechain:
                occ = (
                    (usage.get("input_tokens", 0) or 0)
                    + (usage.get("cache_read_input_tokens", 0) or 0)
                    + (usage.get("cache_creation_input_tokens", 0) or 0)
                )
                if occ:
                    if not cur.ctxStart:
                        cur.ctxStart = occ
                    if occ > cur.ctxPeak:
                        cur.ctxPeak = occ
                    cur.ctxEnd = occ

            for b in msg.get("content", []) or []:
                if not isinstance(b, dict):
                    continue
                btype = b.get("type")
                if btype == "tool_use":
                    name = b.get("name", "")
                    inp = b.get("input")
                    mcp = _mcp_of(name, r)
                    tuid = b.get("id")
                    rtext = result_text_by_id.get(tuid)
                    tc = ToolCall(
                        name=name,
                        input=inp,
                        summary=_summary(name, inp, mcp),
                        mcp=mcp,
                        id=tuid,
                        result_text=rtext,
                        ts=r.get("timestamp"),
                        # Phase-2 fills provenance/sourceTool/flowValue/errored
                    )
                    cur.tools.append(tc)
                    events.append(
                        Event(
                            id=str(tuid) if tuid is not None else f"row{idx}-tooluse",
                            turn=turn_i,
                            role="assistant",
                            kind="tool_use",
                            ts=r.get("timestamp"),
                            tool=name,
                            input=inp,
                            mcp=mcp,
                        )
                    )
                elif btype == "text":
                    vis = _visible_text(b)
                    if vis is not None:
                        reply_parts.append(vis)
                        events.append(
                            Event(
                                id=str(r.get("uuid", f"row{idx}")) + "-text",
                                turn=turn_i,
                                role="assistant",
                                kind="text",
                                ts=r.get("timestamp"),
                                input=vis,
                            )
                        )
                # 'thinking' blocks are not visible reply; they emit no event/text

        elif rtype == "user":
            # tool_result user rows (lists) — emit tool_result events
            content = r.get("message", {}).get("content")
            if isinstance(content, list):
                for b in content:
                    if isinstance(b, dict) and b.get("type") == "tool_result":
                        tuid = b.get("tool_use_id")
                        events.append(
                            Event(
                                id=(str(tuid) + "-result") if tuid is not None else f"row{idx}-result",
                                turn=turn_i,
                                role="user",
                                kind="tool_result",
                                ts=r.get("timestamp"),
                                resultText=_tool_result_text(b),
                            )
                        )
        # all other meta row types (system, mode, attachment, etc.) carry no
        # chain content → no event.

    _finalize(cur)

    return {
        "events": events,
        "turns": turns,
        "session": session,
    }