Spaces:
Running on Zero
Running on Zero
File size: 15,709 Bytes
5f43c7d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 | """jsonl-loader — the Phase-1 seam. The ONLY code permitted to touch raw JSONL.
`load(path) -> {'events': [...], 'turns': [...], 'session': {...}}`
Pure, deterministic, structural parse. NO model. NO provenance/heavy/loop/re-read
logic (that is Phase 2; the loader leaves those fields at neutral defaults).
Key rules (verified ground truth — see CLAUDE.md / TRACE-CONTRACT §3):
TURN BOUNDARY (START-ANCHORED): a row opens a new turn IFF
type == 'user' AND not isMeta AND message.content is a non-empty string
(after .strip()) AND the trimmed string does NOT start with any of:
'<command-name', '<command-message', '<local-command'.
This is a str.startswith() test, NEVER a substring-anywhere test.
'<task-notification>' rows ARE boundaries.
ORIGIN: 'system' if the trimmed prompt starts with '<task-notification>',
else 'human'.
SIDECHAIN FOLDING (sub-agent / Task work): a row with top-level
`isSidechain == true` is sub-agent activity, NOT a top-level query. Such a row
is FOLDED into the current parent turn — it never opens a new turn (even if its
message.content is a user-string that would otherwise look like a boundary), and
its tool_use / tool_result / token content rolls up into the parent turn exactly
like inline assistant/user rows. This keeps the Task's cost attributed to the
query that spawned it instead of leaking into a spurious turn or being dropped.
This fixture has 0 sidechains (every row is isSidechain == false), so the fold
is a strict NO-OP here and the regression oracle is byte-identical.
cwd / session metadata: trusted from INSIDE the file (the encoded dir name is
lossy). Non-negotiable #5.
"""
from __future__ import annotations
import json
import os
from typing import Any, Optional
from engine.contract import Event, Tokens, ToolCall, Turn
# START-anchored prefixes that DISQUALIFY a user-string row from being a turn.
# (Slash-command scaffolding rows — NOT real prompts.)
_NON_PROMPT_PREFIXES = ("<command-name", "<command-message", "<local-command")
# A trimmed prompt starting with this marks a system-origin (background) turn.
_SYSTEM_PREFIX = "<task-notification>"
# --------------------------------------------------------------------------- #
# small structural helpers
# --------------------------------------------------------------------------- #
def _is_turn_boundary(row: dict[str, Any]) -> tuple[bool, str]:
"""Return (is_boundary, trimmed_prompt_or_empty). START-anchored, never substring."""
if row.get("type") != "user":
return False, ""
if row.get("isMeta"):
return False, ""
content = row.get("message", {}).get("content")
if not isinstance(content, str):
return False, ""
s = content.strip()
if not s:
return False, ""
if s.startswith(_NON_PROMPT_PREFIXES):
return False, ""
return True, s
def _origin_for(prompt: str) -> str:
return "system" if prompt.startswith(_SYSTEM_PREFIX) else "human"
def _is_sidechain(row: dict[str, Any]) -> bool:
"""True iff this row is sub-agent (Task) activity to FOLD into the parent turn.
Sidechain rows must never open a new turn; their content rolls up into the
current parent turn. In this fixture every row is isSidechain == false, so this
returns False for all 1316 rows and the fold is a strict no-op.
"""
return bool(row.get("isSidechain"))
def _basename(path: Any) -> str:
if not isinstance(path, str) or not path:
return str(path)
return os.path.basename(path.rstrip("/")) or path
def _mcp_of(name: str, row: dict[str, Any]) -> Optional[dict[str, str]]:
"""Derive {server, tool} ONLY from an mcp__<server>__<tool> tool name — that
name IS the tool's identity.
The row's attributionMcpServer/attributionMcpTool is a turn-level *context*
attribution (which MCP server was in play for the message), NOT a claim that
this particular tool_use is that MCP call. Using it as a fallback mislabels
ordinary Bash/Read/Edit calls as MCP (68 of them in the fixture, all tagged
'claude.ai Hugging Face:hub_repo_details'). Tool identity comes from the name
alone; the legend's MCP tally and these labels then agree (true MCP = 1)."""
if isinstance(name, str) and name.startswith("mcp__"):
rest = name[len("mcp__"):]
server, sep, tool = rest.partition("__")
if sep:
return {"server": server, "tool": tool}
return {"server": rest, "tool": ""}
return None
def _summary(name: str, inp: Any, mcp: Optional[dict[str, str]]) -> str:
"""Human-scannable one-liner per tool. Deterministic, no model."""
inp = inp if isinstance(inp, dict) else {}
if name == "Read":
return f"Read {_basename(inp.get('file_path'))}"
if name in ("Edit", "Write"):
return f"Edit {_basename(inp.get('file_path'))}"
if name == "Bash":
cmd = str(inp.get("command", "") or "")
return f"Bash: {cmd[:60]}"
if name in ("Grep", "Glob"):
return f"{name} {inp.get('pattern', '')}"
if name == "Task":
desc = str(inp.get("description", "") or "")
return f"Task: {desc[:60]}"
if mcp is not None:
return f"{mcp.get('server', '')}:{mcp.get('tool', '')}"
return name
def _tool_result_text(block: dict[str, Any]) -> str:
"""Extract displayable text from a tool_result content block.
content is usually a str; sometimes a list of {type:'text'|...} blocks."""
content = block.get("content")
if isinstance(content, str):
return content
if isinstance(content, list):
parts: list[str] = []
for b in content:
if isinstance(b, dict):
if isinstance(b.get("text"), str):
parts.append(b["text"])
return "\n".join(parts)
if content is None:
return ""
return str(content)
def _visible_text(block: dict[str, Any]) -> Optional[str]:
"""Visible assistant text (kind='text'); thinking is NOT shown in reply."""
if block.get("type") == "text":
t = block.get("text")
if isinstance(t, str):
return t
return None
# --------------------------------------------------------------------------- #
# the seam
# --------------------------------------------------------------------------- #
def load(path: str) -> dict[str, Any]:
"""Parse a Claude Code session .jsonl into the normalized contract.
Returns {'events': [Event...], 'turns': [Turn...], 'session': {...}}.
Signature kept clean so an hf-loader drops in behind the same contract.
"""
rows: list[dict[str, Any]] = []
with open(path, "r", encoding="utf-8") as fh:
for line in fh:
line = line.strip()
if not line:
continue
rows.append(json.loads(line))
# --- session metadata: trust the file, not the folder name -------------- #
session = {
"cwd": None, "sessionId": None, "gitBranch": None, "version": None,
"startedAt": None, "endedAt": None, "model": None,
}
for r in rows:
if r.get("type") in ("user", "assistant"):
session["cwd"] = r.get("cwd")
session["sessionId"] = r.get("sessionId")
session["gitBranch"] = r.get("gitBranch")
session["version"] = r.get("version")
break
# --- model: the most-used assistant model id (for the report's "Model" line) -- #
# Purely informational metadata (message.model on assistant rows); never affects
# parsing or counts. Picks the modal model so a one-off sidechain model doesn't win.
_models: dict[str, int] = {}
for r in rows:
if r.get("type") == "assistant":
m = (r.get("message", {}) or {}).get("model")
if m:
_models[m] = _models.get(m, 0) + 1
if _models:
session["model"] = max(_models, key=_models.get)
# --- session span: first/last ISO timestamp across ANY row that has one --- #
# Rows are in file order, so the FIRST row carrying a "timestamp" is the start
# and the LAST is the end. Any row type counts (a meta/system row can open or
# close the file). Purely structural — no effect on turn parsing or counts;
# the regression oracle stays byte-identical (Shripal: tell sessions apart).
for r in rows:
ts = r.get("timestamp")
if ts:
if session["startedAt"] is None:
session["startedAt"] = ts
session["endedAt"] = ts
# --- index tool_results by tool_use_id (239 ↔ 239, 1:1 in fixture) ------ #
result_text_by_id: dict[str, str] = {}
for r in rows:
if r.get("type") != "user":
continue
content = r.get("message", {}).get("content")
if not isinstance(content, list):
continue
for b in content:
if isinstance(b, dict) and b.get("type") == "tool_result":
tuid = b.get("tool_use_id")
if tuid is not None:
result_text_by_id[tuid] = _tool_result_text(b)
# --- walk rows: assign every row to a turn; build events + turns -------- #
turns: list[Turn] = []
events: list[Event] = []
cur: Optional[Turn] = None
cur_req_ids: set[str] = set()
reply_parts: list[str] = []
def _finalize(turn: Optional[Turn]) -> None:
if turn is None:
return
turn.reqs = len(cur_req_ids)
turn.reply = "\n".join(p for p in reply_parts if p).strip()
for idx, r in enumerate(rows):
rtype = r.get("type")
# SIDECHAIN FOLD: sub-agent (Task) rows fold into the current parent turn.
# Suppress their boundary candidacy so they can never open a turn; their
# tool_use / tool_result / token content then rolls up below exactly like
# any inline assistant/user row. (No-op on this fixture: all rows false.)
sidechain = _is_sidechain(r)
is_boundary, prompt = (False, "") if sidechain else _is_turn_boundary(r)
if is_boundary:
# close the previous turn, open a new one
_finalize(cur)
cur_req_ids = set()
reply_parts = []
cur = Turn(
i=len(turns),
prompt=prompt,
origin=_origin_for(prompt),
ts=r.get("timestamp"),
)
turns.append(cur)
events.append(
Event(
id=str(r.get("uuid", f"row{idx}")),
turn=cur.i,
role="user",
kind="prompt",
ts=r.get("timestamp"),
input=prompt,
)
)
continue
# rows before the first boundary belong to no turn (only mode /
# permission-mode / file-history-snapshot precede row index 3) → skip.
if cur is None:
continue
turn_i = cur.i
if rtype == "assistant":
msg = r.get("message", {}) or {}
usage = msg.get("usage", {}) or {}
req_id = r.get("requestId")
if req_id is not None:
cur_req_ids.add(req_id)
# token rollup for the turn (sum across assistant rows)
cur.tokens = cur.tokens.add(
Tokens(
in_=usage.get("input_tokens", 0) or 0,
out=usage.get("output_tokens", 0) or 0,
cacheRead=usage.get("cache_read_input_tokens", 0) or 0,
cacheCreate=usage.get("cache_creation_input_tokens", 0) or 0,
)
)
# point-in-time context-window occupancy (the "fuel gauge"): the prompt
# size of THIS request = input + cacheRead + cacheCreate (output is the
# reply, not occupancy). Sidechain rows run in a sub-agent's own window, so
# they must NOT count toward the main thread's gauge — guard on `sidechain`.
if not sidechain:
occ = (
(usage.get("input_tokens", 0) or 0)
+ (usage.get("cache_read_input_tokens", 0) or 0)
+ (usage.get("cache_creation_input_tokens", 0) or 0)
)
if occ:
if not cur.ctxStart:
cur.ctxStart = occ
if occ > cur.ctxPeak:
cur.ctxPeak = occ
cur.ctxEnd = occ
for b in msg.get("content", []) or []:
if not isinstance(b, dict):
continue
btype = b.get("type")
if btype == "tool_use":
name = b.get("name", "")
inp = b.get("input")
mcp = _mcp_of(name, r)
tuid = b.get("id")
rtext = result_text_by_id.get(tuid)
tc = ToolCall(
name=name,
input=inp,
summary=_summary(name, inp, mcp),
mcp=mcp,
id=tuid,
result_text=rtext,
ts=r.get("timestamp"),
# Phase-2 fills provenance/sourceTool/flowValue/errored
)
cur.tools.append(tc)
events.append(
Event(
id=str(tuid) if tuid is not None else f"row{idx}-tooluse",
turn=turn_i,
role="assistant",
kind="tool_use",
ts=r.get("timestamp"),
tool=name,
input=inp,
mcp=mcp,
)
)
elif btype == "text":
vis = _visible_text(b)
if vis is not None:
reply_parts.append(vis)
events.append(
Event(
id=str(r.get("uuid", f"row{idx}")) + "-text",
turn=turn_i,
role="assistant",
kind="text",
ts=r.get("timestamp"),
input=vis,
)
)
# 'thinking' blocks are not visible reply; they emit no event/text
elif rtype == "user":
# tool_result user rows (lists) — emit tool_result events
content = r.get("message", {}).get("content")
if isinstance(content, list):
for b in content:
if isinstance(b, dict) and b.get("type") == "tool_result":
tuid = b.get("tool_use_id")
events.append(
Event(
id=(str(tuid) + "-result") if tuid is not None else f"row{idx}-result",
turn=turn_i,
role="user",
kind="tool_result",
ts=r.get("timestamp"),
resultText=_tool_result_text(b),
)
)
# all other meta row types (system, mode, attachment, etc.) carry no
# chain content → no event.
_finalize(cur)
return {
"events": events,
"turns": turns,
"session": session,
}
|