"""jsonl-loader — the Phase-1 seam. The ONLY code permitted to touch raw JSONL. `load(path) -> {'events': [...], 'turns': [...], 'session': {...}}` Pure, deterministic, structural parse. NO model. NO provenance/heavy/loop/re-read logic (that is Phase 2; the loader leaves those fields at neutral defaults). Key rules (verified ground truth — see CLAUDE.md / TRACE-CONTRACT §3): TURN BOUNDARY (START-ANCHORED): a row opens a new turn IFF type == 'user' AND not isMeta AND message.content is a non-empty string (after .strip()) AND the trimmed string does NOT start with any of: '' rows ARE boundaries. ORIGIN: 'system' if the trimmed prompt starts with '', else 'human'. SIDECHAIN FOLDING (sub-agent / Task work): a row with top-level `isSidechain == true` is sub-agent activity, NOT a top-level query. Such a row is FOLDED into the current parent turn — it never opens a new turn (even if its message.content is a user-string that would otherwise look like a boundary), and its tool_use / tool_result / token content rolls up into the parent turn exactly like inline assistant/user rows. This keeps the Task's cost attributed to the query that spawned it instead of leaking into a spurious turn or being dropped. This fixture has 0 sidechains (every row is isSidechain == false), so the fold is a strict NO-OP here and the regression oracle is byte-identical. cwd / session metadata: trusted from INSIDE the file (the encoded dir name is lossy). Non-negotiable #5. """ from __future__ import annotations import json import os from typing import Any, Optional from engine.contract import Event, Tokens, ToolCall, Turn # START-anchored prefixes that DISQUALIFY a user-string row from being a turn. # (Slash-command scaffolding rows — NOT real prompts.) _NON_PROMPT_PREFIXES = (" tuple[bool, str]: """Return (is_boundary, trimmed_prompt_or_empty). START-anchored, never substring.""" if row.get("type") != "user": return False, "" if row.get("isMeta"): return False, "" content = row.get("message", {}).get("content") if not isinstance(content, str): return False, "" s = content.strip() if not s: return False, "" if s.startswith(_NON_PROMPT_PREFIXES): return False, "" return True, s def _origin_for(prompt: str) -> str: return "system" if prompt.startswith(_SYSTEM_PREFIX) else "human" def _is_sidechain(row: dict[str, Any]) -> bool: """True iff this row is sub-agent (Task) activity to FOLD into the parent turn. Sidechain rows must never open a new turn; their content rolls up into the current parent turn. In this fixture every row is isSidechain == false, so this returns False for all 1316 rows and the fold is a strict no-op. """ return bool(row.get("isSidechain")) def _basename(path: Any) -> str: if not isinstance(path, str) or not path: return str(path) return os.path.basename(path.rstrip("/")) or path def _mcp_of(name: str, row: dict[str, Any]) -> Optional[dict[str, str]]: """Derive {server, tool} ONLY from an mcp____ tool name — that name IS the tool's identity. The row's attributionMcpServer/attributionMcpTool is a turn-level *context* attribution (which MCP server was in play for the message), NOT a claim that this particular tool_use is that MCP call. Using it as a fallback mislabels ordinary Bash/Read/Edit calls as MCP (68 of them in the fixture, all tagged 'claude.ai Hugging Face:hub_repo_details'). Tool identity comes from the name alone; the legend's MCP tally and these labels then agree (true MCP = 1).""" if isinstance(name, str) and name.startswith("mcp__"): rest = name[len("mcp__"):] server, sep, tool = rest.partition("__") if sep: return {"server": server, "tool": tool} return {"server": rest, "tool": ""} return None def _summary(name: str, inp: Any, mcp: Optional[dict[str, str]]) -> str: """Human-scannable one-liner per tool. Deterministic, no model.""" inp = inp if isinstance(inp, dict) else {} if name == "Read": return f"Read {_basename(inp.get('file_path'))}" if name in ("Edit", "Write"): return f"Edit {_basename(inp.get('file_path'))}" if name == "Bash": cmd = str(inp.get("command", "") or "") return f"Bash: {cmd[:60]}" if name in ("Grep", "Glob"): return f"{name} {inp.get('pattern', '')}" if name == "Task": desc = str(inp.get("description", "") or "") return f"Task: {desc[:60]}" if mcp is not None: return f"{mcp.get('server', '')}:{mcp.get('tool', '')}" return name def _tool_result_text(block: dict[str, Any]) -> str: """Extract displayable text from a tool_result content block. content is usually a str; sometimes a list of {type:'text'|...} blocks.""" content = block.get("content") if isinstance(content, str): return content if isinstance(content, list): parts: list[str] = [] for b in content: if isinstance(b, dict): if isinstance(b.get("text"), str): parts.append(b["text"]) return "\n".join(parts) if content is None: return "" return str(content) def _visible_text(block: dict[str, Any]) -> Optional[str]: """Visible assistant text (kind='text'); thinking is NOT shown in reply.""" if block.get("type") == "text": t = block.get("text") if isinstance(t, str): return t return None # --------------------------------------------------------------------------- # # the seam # --------------------------------------------------------------------------- # def load(path: str) -> dict[str, Any]: """Parse a Claude Code session .jsonl into the normalized contract. Returns {'events': [Event...], 'turns': [Turn...], 'session': {...}}. Signature kept clean so an hf-loader drops in behind the same contract. """ rows: list[dict[str, Any]] = [] with open(path, "r", encoding="utf-8") as fh: for line in fh: line = line.strip() if not line: continue rows.append(json.loads(line)) # --- session metadata: trust the file, not the folder name -------------- # session = { "cwd": None, "sessionId": None, "gitBranch": None, "version": None, "startedAt": None, "endedAt": None, "model": None, } for r in rows: if r.get("type") in ("user", "assistant"): session["cwd"] = r.get("cwd") session["sessionId"] = r.get("sessionId") session["gitBranch"] = r.get("gitBranch") session["version"] = r.get("version") break # --- model: the most-used assistant model id (for the report's "Model" line) -- # # Purely informational metadata (message.model on assistant rows); never affects # parsing or counts. Picks the modal model so a one-off sidechain model doesn't win. _models: dict[str, int] = {} for r in rows: if r.get("type") == "assistant": m = (r.get("message", {}) or {}).get("model") if m: _models[m] = _models.get(m, 0) + 1 if _models: session["model"] = max(_models, key=_models.get) # --- session span: first/last ISO timestamp across ANY row that has one --- # # Rows are in file order, so the FIRST row carrying a "timestamp" is the start # and the LAST is the end. Any row type counts (a meta/system row can open or # close the file). Purely structural — no effect on turn parsing or counts; # the regression oracle stays byte-identical (Shripal: tell sessions apart). for r in rows: ts = r.get("timestamp") if ts: if session["startedAt"] is None: session["startedAt"] = ts session["endedAt"] = ts # --- index tool_results by tool_use_id (239 ↔ 239, 1:1 in fixture) ------ # result_text_by_id: dict[str, str] = {} for r in rows: if r.get("type") != "user": continue content = r.get("message", {}).get("content") if not isinstance(content, list): continue for b in content: if isinstance(b, dict) and b.get("type") == "tool_result": tuid = b.get("tool_use_id") if tuid is not None: result_text_by_id[tuid] = _tool_result_text(b) # --- walk rows: assign every row to a turn; build events + turns -------- # turns: list[Turn] = [] events: list[Event] = [] cur: Optional[Turn] = None cur_req_ids: set[str] = set() reply_parts: list[str] = [] def _finalize(turn: Optional[Turn]) -> None: if turn is None: return turn.reqs = len(cur_req_ids) turn.reply = "\n".join(p for p in reply_parts if p).strip() for idx, r in enumerate(rows): rtype = r.get("type") # SIDECHAIN FOLD: sub-agent (Task) rows fold into the current parent turn. # Suppress their boundary candidacy so they can never open a turn; their # tool_use / tool_result / token content then rolls up below exactly like # any inline assistant/user row. (No-op on this fixture: all rows false.) sidechain = _is_sidechain(r) is_boundary, prompt = (False, "") if sidechain else _is_turn_boundary(r) if is_boundary: # close the previous turn, open a new one _finalize(cur) cur_req_ids = set() reply_parts = [] cur = Turn( i=len(turns), prompt=prompt, origin=_origin_for(prompt), ts=r.get("timestamp"), ) turns.append(cur) events.append( Event( id=str(r.get("uuid", f"row{idx}")), turn=cur.i, role="user", kind="prompt", ts=r.get("timestamp"), input=prompt, ) ) continue # rows before the first boundary belong to no turn (only mode / # permission-mode / file-history-snapshot precede row index 3) → skip. if cur is None: continue turn_i = cur.i if rtype == "assistant": msg = r.get("message", {}) or {} usage = msg.get("usage", {}) or {} req_id = r.get("requestId") if req_id is not None: cur_req_ids.add(req_id) # token rollup for the turn (sum across assistant rows) cur.tokens = cur.tokens.add( Tokens( in_=usage.get("input_tokens", 0) or 0, out=usage.get("output_tokens", 0) or 0, cacheRead=usage.get("cache_read_input_tokens", 0) or 0, cacheCreate=usage.get("cache_creation_input_tokens", 0) or 0, ) ) # point-in-time context-window occupancy (the "fuel gauge"): the prompt # size of THIS request = input + cacheRead + cacheCreate (output is the # reply, not occupancy). Sidechain rows run in a sub-agent's own window, so # they must NOT count toward the main thread's gauge — guard on `sidechain`. if not sidechain: occ = ( (usage.get("input_tokens", 0) or 0) + (usage.get("cache_read_input_tokens", 0) or 0) + (usage.get("cache_creation_input_tokens", 0) or 0) ) if occ: if not cur.ctxStart: cur.ctxStart = occ if occ > cur.ctxPeak: cur.ctxPeak = occ cur.ctxEnd = occ for b in msg.get("content", []) or []: if not isinstance(b, dict): continue btype = b.get("type") if btype == "tool_use": name = b.get("name", "") inp = b.get("input") mcp = _mcp_of(name, r) tuid = b.get("id") rtext = result_text_by_id.get(tuid) tc = ToolCall( name=name, input=inp, summary=_summary(name, inp, mcp), mcp=mcp, id=tuid, result_text=rtext, ts=r.get("timestamp"), # Phase-2 fills provenance/sourceTool/flowValue/errored ) cur.tools.append(tc) events.append( Event( id=str(tuid) if tuid is not None else f"row{idx}-tooluse", turn=turn_i, role="assistant", kind="tool_use", ts=r.get("timestamp"), tool=name, input=inp, mcp=mcp, ) ) elif btype == "text": vis = _visible_text(b) if vis is not None: reply_parts.append(vis) events.append( Event( id=str(r.get("uuid", f"row{idx}")) + "-text", turn=turn_i, role="assistant", kind="text", ts=r.get("timestamp"), input=vis, ) ) # 'thinking' blocks are not visible reply; they emit no event/text elif rtype == "user": # tool_result user rows (lists) — emit tool_result events content = r.get("message", {}).get("content") if isinstance(content, list): for b in content: if isinstance(b, dict) and b.get("type") == "tool_result": tuid = b.get("tool_use_id") events.append( Event( id=(str(tuid) + "-result") if tuid is not None else f"row{idx}-result", turn=turn_i, role="user", kind="tool_result", ts=r.get("timestamp"), resultText=_tool_result_text(b), ) ) # all other meta row types (system, mode, attachment, etc.) carry no # chain content → no event. _finalize(cur) return { "events": events, "turns": turns, "session": session, }