her / engine /loaders /jsonl_loader.py
geekwrestler's picture
Squash history (purge pre-scrub demo session blobs)
5f43c7d
"""jsonl-loader — the Phase-1 seam. The ONLY code permitted to touch raw JSONL.
`load(path) -> {'events': [...], 'turns': [...], 'session': {...}}`
Pure, deterministic, structural parse. NO model. NO provenance/heavy/loop/re-read
logic (that is Phase 2; the loader leaves those fields at neutral defaults).
Key rules (verified ground truth — see CLAUDE.md / TRACE-CONTRACT §3):
TURN BOUNDARY (START-ANCHORED): a row opens a new turn IFF
type == 'user' AND not isMeta AND message.content is a non-empty string
(after .strip()) AND the trimmed string does NOT start with any of:
'<command-name', '<command-message', '<local-command'.
This is a str.startswith() test, NEVER a substring-anywhere test.
'<task-notification>' rows ARE boundaries.
ORIGIN: 'system' if the trimmed prompt starts with '<task-notification>',
else 'human'.
SIDECHAIN FOLDING (sub-agent / Task work): a row with top-level
`isSidechain == true` is sub-agent activity, NOT a top-level query. Such a row
is FOLDED into the current parent turn — it never opens a new turn (even if its
message.content is a user-string that would otherwise look like a boundary), and
its tool_use / tool_result / token content rolls up into the parent turn exactly
like inline assistant/user rows. This keeps the Task's cost attributed to the
query that spawned it instead of leaking into a spurious turn or being dropped.
This fixture has 0 sidechains (every row is isSidechain == false), so the fold
is a strict NO-OP here and the regression oracle is byte-identical.
cwd / session metadata: trusted from INSIDE the file (the encoded dir name is
lossy). Non-negotiable #5.
"""
from __future__ import annotations
import json
import os
from typing import Any, Optional
from engine.contract import Event, Tokens, ToolCall, Turn
# START-anchored prefixes that DISQUALIFY a user-string row from being a turn.
# (Slash-command scaffolding rows — NOT real prompts.)
_NON_PROMPT_PREFIXES = ("<command-name", "<command-message", "<local-command")
# A trimmed prompt starting with this marks a system-origin (background) turn.
_SYSTEM_PREFIX = "<task-notification>"
# --------------------------------------------------------------------------- #
# small structural helpers
# --------------------------------------------------------------------------- #
def _is_turn_boundary(row: dict[str, Any]) -> tuple[bool, str]:
"""Return (is_boundary, trimmed_prompt_or_empty). START-anchored, never substring."""
if row.get("type") != "user":
return False, ""
if row.get("isMeta"):
return False, ""
content = row.get("message", {}).get("content")
if not isinstance(content, str):
return False, ""
s = content.strip()
if not s:
return False, ""
if s.startswith(_NON_PROMPT_PREFIXES):
return False, ""
return True, s
def _origin_for(prompt: str) -> str:
return "system" if prompt.startswith(_SYSTEM_PREFIX) else "human"
def _is_sidechain(row: dict[str, Any]) -> bool:
"""True iff this row is sub-agent (Task) activity to FOLD into the parent turn.
Sidechain rows must never open a new turn; their content rolls up into the
current parent turn. In this fixture every row is isSidechain == false, so this
returns False for all 1316 rows and the fold is a strict no-op.
"""
return bool(row.get("isSidechain"))
def _basename(path: Any) -> str:
if not isinstance(path, str) or not path:
return str(path)
return os.path.basename(path.rstrip("/")) or path
def _mcp_of(name: str, row: dict[str, Any]) -> Optional[dict[str, str]]:
"""Derive {server, tool} ONLY from an mcp__<server>__<tool> tool name — that
name IS the tool's identity.
The row's attributionMcpServer/attributionMcpTool is a turn-level *context*
attribution (which MCP server was in play for the message), NOT a claim that
this particular tool_use is that MCP call. Using it as a fallback mislabels
ordinary Bash/Read/Edit calls as MCP (68 of them in the fixture, all tagged
'claude.ai Hugging Face:hub_repo_details'). Tool identity comes from the name
alone; the legend's MCP tally and these labels then agree (true MCP = 1)."""
if isinstance(name, str) and name.startswith("mcp__"):
rest = name[len("mcp__"):]
server, sep, tool = rest.partition("__")
if sep:
return {"server": server, "tool": tool}
return {"server": rest, "tool": ""}
return None
def _summary(name: str, inp: Any, mcp: Optional[dict[str, str]]) -> str:
"""Human-scannable one-liner per tool. Deterministic, no model."""
inp = inp if isinstance(inp, dict) else {}
if name == "Read":
return f"Read {_basename(inp.get('file_path'))}"
if name in ("Edit", "Write"):
return f"Edit {_basename(inp.get('file_path'))}"
if name == "Bash":
cmd = str(inp.get("command", "") or "")
return f"Bash: {cmd[:60]}"
if name in ("Grep", "Glob"):
return f"{name} {inp.get('pattern', '')}"
if name == "Task":
desc = str(inp.get("description", "") or "")
return f"Task: {desc[:60]}"
if mcp is not None:
return f"{mcp.get('server', '')}:{mcp.get('tool', '')}"
return name
def _tool_result_text(block: dict[str, Any]) -> str:
"""Extract displayable text from a tool_result content block.
content is usually a str; sometimes a list of {type:'text'|...} blocks."""
content = block.get("content")
if isinstance(content, str):
return content
if isinstance(content, list):
parts: list[str] = []
for b in content:
if isinstance(b, dict):
if isinstance(b.get("text"), str):
parts.append(b["text"])
return "\n".join(parts)
if content is None:
return ""
return str(content)
def _visible_text(block: dict[str, Any]) -> Optional[str]:
"""Visible assistant text (kind='text'); thinking is NOT shown in reply."""
if block.get("type") == "text":
t = block.get("text")
if isinstance(t, str):
return t
return None
# --------------------------------------------------------------------------- #
# the seam
# --------------------------------------------------------------------------- #
def load(path: str) -> dict[str, Any]:
"""Parse a Claude Code session .jsonl into the normalized contract.
Returns {'events': [Event...], 'turns': [Turn...], 'session': {...}}.
Signature kept clean so an hf-loader drops in behind the same contract.
"""
rows: list[dict[str, Any]] = []
with open(path, "r", encoding="utf-8") as fh:
for line in fh:
line = line.strip()
if not line:
continue
rows.append(json.loads(line))
# --- session metadata: trust the file, not the folder name -------------- #
session = {
"cwd": None, "sessionId": None, "gitBranch": None, "version": None,
"startedAt": None, "endedAt": None, "model": None,
}
for r in rows:
if r.get("type") in ("user", "assistant"):
session["cwd"] = r.get("cwd")
session["sessionId"] = r.get("sessionId")
session["gitBranch"] = r.get("gitBranch")
session["version"] = r.get("version")
break
# --- model: the most-used assistant model id (for the report's "Model" line) -- #
# Purely informational metadata (message.model on assistant rows); never affects
# parsing or counts. Picks the modal model so a one-off sidechain model doesn't win.
_models: dict[str, int] = {}
for r in rows:
if r.get("type") == "assistant":
m = (r.get("message", {}) or {}).get("model")
if m:
_models[m] = _models.get(m, 0) + 1
if _models:
session["model"] = max(_models, key=_models.get)
# --- session span: first/last ISO timestamp across ANY row that has one --- #
# Rows are in file order, so the FIRST row carrying a "timestamp" is the start
# and the LAST is the end. Any row type counts (a meta/system row can open or
# close the file). Purely structural — no effect on turn parsing or counts;
# the regression oracle stays byte-identical (Shripal: tell sessions apart).
for r in rows:
ts = r.get("timestamp")
if ts:
if session["startedAt"] is None:
session["startedAt"] = ts
session["endedAt"] = ts
# --- index tool_results by tool_use_id (239 ↔ 239, 1:1 in fixture) ------ #
result_text_by_id: dict[str, str] = {}
for r in rows:
if r.get("type") != "user":
continue
content = r.get("message", {}).get("content")
if not isinstance(content, list):
continue
for b in content:
if isinstance(b, dict) and b.get("type") == "tool_result":
tuid = b.get("tool_use_id")
if tuid is not None:
result_text_by_id[tuid] = _tool_result_text(b)
# --- walk rows: assign every row to a turn; build events + turns -------- #
turns: list[Turn] = []
events: list[Event] = []
cur: Optional[Turn] = None
cur_req_ids: set[str] = set()
reply_parts: list[str] = []
def _finalize(turn: Optional[Turn]) -> None:
if turn is None:
return
turn.reqs = len(cur_req_ids)
turn.reply = "\n".join(p for p in reply_parts if p).strip()
for idx, r in enumerate(rows):
rtype = r.get("type")
# SIDECHAIN FOLD: sub-agent (Task) rows fold into the current parent turn.
# Suppress their boundary candidacy so they can never open a turn; their
# tool_use / tool_result / token content then rolls up below exactly like
# any inline assistant/user row. (No-op on this fixture: all rows false.)
sidechain = _is_sidechain(r)
is_boundary, prompt = (False, "") if sidechain else _is_turn_boundary(r)
if is_boundary:
# close the previous turn, open a new one
_finalize(cur)
cur_req_ids = set()
reply_parts = []
cur = Turn(
i=len(turns),
prompt=prompt,
origin=_origin_for(prompt),
ts=r.get("timestamp"),
)
turns.append(cur)
events.append(
Event(
id=str(r.get("uuid", f"row{idx}")),
turn=cur.i,
role="user",
kind="prompt",
ts=r.get("timestamp"),
input=prompt,
)
)
continue
# rows before the first boundary belong to no turn (only mode /
# permission-mode / file-history-snapshot precede row index 3) → skip.
if cur is None:
continue
turn_i = cur.i
if rtype == "assistant":
msg = r.get("message", {}) or {}
usage = msg.get("usage", {}) or {}
req_id = r.get("requestId")
if req_id is not None:
cur_req_ids.add(req_id)
# token rollup for the turn (sum across assistant rows)
cur.tokens = cur.tokens.add(
Tokens(
in_=usage.get("input_tokens", 0) or 0,
out=usage.get("output_tokens", 0) or 0,
cacheRead=usage.get("cache_read_input_tokens", 0) or 0,
cacheCreate=usage.get("cache_creation_input_tokens", 0) or 0,
)
)
# point-in-time context-window occupancy (the "fuel gauge"): the prompt
# size of THIS request = input + cacheRead + cacheCreate (output is the
# reply, not occupancy). Sidechain rows run in a sub-agent's own window, so
# they must NOT count toward the main thread's gauge — guard on `sidechain`.
if not sidechain:
occ = (
(usage.get("input_tokens", 0) or 0)
+ (usage.get("cache_read_input_tokens", 0) or 0)
+ (usage.get("cache_creation_input_tokens", 0) or 0)
)
if occ:
if not cur.ctxStart:
cur.ctxStart = occ
if occ > cur.ctxPeak:
cur.ctxPeak = occ
cur.ctxEnd = occ
for b in msg.get("content", []) or []:
if not isinstance(b, dict):
continue
btype = b.get("type")
if btype == "tool_use":
name = b.get("name", "")
inp = b.get("input")
mcp = _mcp_of(name, r)
tuid = b.get("id")
rtext = result_text_by_id.get(tuid)
tc = ToolCall(
name=name,
input=inp,
summary=_summary(name, inp, mcp),
mcp=mcp,
id=tuid,
result_text=rtext,
ts=r.get("timestamp"),
# Phase-2 fills provenance/sourceTool/flowValue/errored
)
cur.tools.append(tc)
events.append(
Event(
id=str(tuid) if tuid is not None else f"row{idx}-tooluse",
turn=turn_i,
role="assistant",
kind="tool_use",
ts=r.get("timestamp"),
tool=name,
input=inp,
mcp=mcp,
)
)
elif btype == "text":
vis = _visible_text(b)
if vis is not None:
reply_parts.append(vis)
events.append(
Event(
id=str(r.get("uuid", f"row{idx}")) + "-text",
turn=turn_i,
role="assistant",
kind="text",
ts=r.get("timestamp"),
input=vis,
)
)
# 'thinking' blocks are not visible reply; they emit no event/text
elif rtype == "user":
# tool_result user rows (lists) — emit tool_result events
content = r.get("message", {}).get("content")
if isinstance(content, list):
for b in content:
if isinstance(b, dict) and b.get("type") == "tool_result":
tuid = b.get("tool_use_id")
events.append(
Event(
id=(str(tuid) + "-result") if tuid is not None else f"row{idx}-result",
turn=turn_i,
role="user",
kind="tool_result",
ts=r.get("timestamp"),
resultText=_tool_result_text(b),
)
)
# all other meta row types (system, mode, attachment, etc.) carry no
# chain content → no event.
_finalize(cur)
return {
"events": events,
"turns": turns,
"session": session,
}