Spaces:
Running on Zero
Running on Zero
| """jsonl-loader — the Phase-1 seam. The ONLY code permitted to touch raw JSONL. | |
| `load(path) -> {'events': [...], 'turns': [...], 'session': {...}}` | |
| Pure, deterministic, structural parse. NO model. NO provenance/heavy/loop/re-read | |
| logic (that is Phase 2; the loader leaves those fields at neutral defaults). | |
| Key rules (verified ground truth — see CLAUDE.md / TRACE-CONTRACT §3): | |
| TURN BOUNDARY (START-ANCHORED): a row opens a new turn IFF | |
| type == 'user' AND not isMeta AND message.content is a non-empty string | |
| (after .strip()) AND the trimmed string does NOT start with any of: | |
| '<command-name', '<command-message', '<local-command'. | |
| This is a str.startswith() test, NEVER a substring-anywhere test. | |
| '<task-notification>' rows ARE boundaries. | |
| ORIGIN: 'system' if the trimmed prompt starts with '<task-notification>', | |
| else 'human'. | |
| SIDECHAIN FOLDING (sub-agent / Task work): a row with top-level | |
| `isSidechain == true` is sub-agent activity, NOT a top-level query. Such a row | |
| is FOLDED into the current parent turn — it never opens a new turn (even if its | |
| message.content is a user-string that would otherwise look like a boundary), and | |
| its tool_use / tool_result / token content rolls up into the parent turn exactly | |
| like inline assistant/user rows. This keeps the Task's cost attributed to the | |
| query that spawned it instead of leaking into a spurious turn or being dropped. | |
| This fixture has 0 sidechains (every row is isSidechain == false), so the fold | |
| is a strict NO-OP here and the regression oracle is byte-identical. | |
| cwd / session metadata: trusted from INSIDE the file (the encoded dir name is | |
| lossy). Non-negotiable #5. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import os | |
| from typing import Any, Optional | |
| from engine.contract import Event, Tokens, ToolCall, Turn | |
| # START-anchored prefixes that DISQUALIFY a user-string row from being a turn. | |
| # (Slash-command scaffolding rows — NOT real prompts.) | |
| _NON_PROMPT_PREFIXES = ("<command-name", "<command-message", "<local-command") | |
| # A trimmed prompt starting with this marks a system-origin (background) turn. | |
| _SYSTEM_PREFIX = "<task-notification>" | |
| # --------------------------------------------------------------------------- # | |
| # small structural helpers | |
| # --------------------------------------------------------------------------- # | |
| def _is_turn_boundary(row: dict[str, Any]) -> tuple[bool, str]: | |
| """Return (is_boundary, trimmed_prompt_or_empty). START-anchored, never substring.""" | |
| if row.get("type") != "user": | |
| return False, "" | |
| if row.get("isMeta"): | |
| return False, "" | |
| content = row.get("message", {}).get("content") | |
| if not isinstance(content, str): | |
| return False, "" | |
| s = content.strip() | |
| if not s: | |
| return False, "" | |
| if s.startswith(_NON_PROMPT_PREFIXES): | |
| return False, "" | |
| return True, s | |
| def _origin_for(prompt: str) -> str: | |
| return "system" if prompt.startswith(_SYSTEM_PREFIX) else "human" | |
| def _is_sidechain(row: dict[str, Any]) -> bool: | |
| """True iff this row is sub-agent (Task) activity to FOLD into the parent turn. | |
| Sidechain rows must never open a new turn; their content rolls up into the | |
| current parent turn. In this fixture every row is isSidechain == false, so this | |
| returns False for all 1316 rows and the fold is a strict no-op. | |
| """ | |
| return bool(row.get("isSidechain")) | |
| def _basename(path: Any) -> str: | |
| if not isinstance(path, str) or not path: | |
| return str(path) | |
| return os.path.basename(path.rstrip("/")) or path | |
| def _mcp_of(name: str, row: dict[str, Any]) -> Optional[dict[str, str]]: | |
| """Derive {server, tool} ONLY from an mcp__<server>__<tool> tool name — that | |
| name IS the tool's identity. | |
| The row's attributionMcpServer/attributionMcpTool is a turn-level *context* | |
| attribution (which MCP server was in play for the message), NOT a claim that | |
| this particular tool_use is that MCP call. Using it as a fallback mislabels | |
| ordinary Bash/Read/Edit calls as MCP (68 of them in the fixture, all tagged | |
| 'claude.ai Hugging Face:hub_repo_details'). Tool identity comes from the name | |
| alone; the legend's MCP tally and these labels then agree (true MCP = 1).""" | |
| if isinstance(name, str) and name.startswith("mcp__"): | |
| rest = name[len("mcp__"):] | |
| server, sep, tool = rest.partition("__") | |
| if sep: | |
| return {"server": server, "tool": tool} | |
| return {"server": rest, "tool": ""} | |
| return None | |
| def _summary(name: str, inp: Any, mcp: Optional[dict[str, str]]) -> str: | |
| """Human-scannable one-liner per tool. Deterministic, no model.""" | |
| inp = inp if isinstance(inp, dict) else {} | |
| if name == "Read": | |
| return f"Read {_basename(inp.get('file_path'))}" | |
| if name in ("Edit", "Write"): | |
| return f"Edit {_basename(inp.get('file_path'))}" | |
| if name == "Bash": | |
| cmd = str(inp.get("command", "") or "") | |
| return f"Bash: {cmd[:60]}" | |
| if name in ("Grep", "Glob"): | |
| return f"{name} {inp.get('pattern', '')}" | |
| if name == "Task": | |
| desc = str(inp.get("description", "") or "") | |
| return f"Task: {desc[:60]}" | |
| if mcp is not None: | |
| return f"{mcp.get('server', '')}:{mcp.get('tool', '')}" | |
| return name | |
| def _tool_result_text(block: dict[str, Any]) -> str: | |
| """Extract displayable text from a tool_result content block. | |
| content is usually a str; sometimes a list of {type:'text'|...} blocks.""" | |
| content = block.get("content") | |
| if isinstance(content, str): | |
| return content | |
| if isinstance(content, list): | |
| parts: list[str] = [] | |
| for b in content: | |
| if isinstance(b, dict): | |
| if isinstance(b.get("text"), str): | |
| parts.append(b["text"]) | |
| return "\n".join(parts) | |
| if content is None: | |
| return "" | |
| return str(content) | |
| def _visible_text(block: dict[str, Any]) -> Optional[str]: | |
| """Visible assistant text (kind='text'); thinking is NOT shown in reply.""" | |
| if block.get("type") == "text": | |
| t = block.get("text") | |
| if isinstance(t, str): | |
| return t | |
| return None | |
| # --------------------------------------------------------------------------- # | |
| # the seam | |
| # --------------------------------------------------------------------------- # | |
| def load(path: str) -> dict[str, Any]: | |
| """Parse a Claude Code session .jsonl into the normalized contract. | |
| Returns {'events': [Event...], 'turns': [Turn...], 'session': {...}}. | |
| Signature kept clean so an hf-loader drops in behind the same contract. | |
| """ | |
| rows: list[dict[str, Any]] = [] | |
| with open(path, "r", encoding="utf-8") as fh: | |
| for line in fh: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| rows.append(json.loads(line)) | |
| # --- session metadata: trust the file, not the folder name -------------- # | |
| session = { | |
| "cwd": None, "sessionId": None, "gitBranch": None, "version": None, | |
| "startedAt": None, "endedAt": None, "model": None, | |
| } | |
| for r in rows: | |
| if r.get("type") in ("user", "assistant"): | |
| session["cwd"] = r.get("cwd") | |
| session["sessionId"] = r.get("sessionId") | |
| session["gitBranch"] = r.get("gitBranch") | |
| session["version"] = r.get("version") | |
| break | |
| # --- model: the most-used assistant model id (for the report's "Model" line) -- # | |
| # Purely informational metadata (message.model on assistant rows); never affects | |
| # parsing or counts. Picks the modal model so a one-off sidechain model doesn't win. | |
| _models: dict[str, int] = {} | |
| for r in rows: | |
| if r.get("type") == "assistant": | |
| m = (r.get("message", {}) or {}).get("model") | |
| if m: | |
| _models[m] = _models.get(m, 0) + 1 | |
| if _models: | |
| session["model"] = max(_models, key=_models.get) | |
| # --- session span: first/last ISO timestamp across ANY row that has one --- # | |
| # Rows are in file order, so the FIRST row carrying a "timestamp" is the start | |
| # and the LAST is the end. Any row type counts (a meta/system row can open or | |
| # close the file). Purely structural — no effect on turn parsing or counts; | |
| # the regression oracle stays byte-identical (Shripal: tell sessions apart). | |
| for r in rows: | |
| ts = r.get("timestamp") | |
| if ts: | |
| if session["startedAt"] is None: | |
| session["startedAt"] = ts | |
| session["endedAt"] = ts | |
| # --- index tool_results by tool_use_id (239 ↔ 239, 1:1 in fixture) ------ # | |
| result_text_by_id: dict[str, str] = {} | |
| for r in rows: | |
| if r.get("type") != "user": | |
| continue | |
| content = r.get("message", {}).get("content") | |
| if not isinstance(content, list): | |
| continue | |
| for b in content: | |
| if isinstance(b, dict) and b.get("type") == "tool_result": | |
| tuid = b.get("tool_use_id") | |
| if tuid is not None: | |
| result_text_by_id[tuid] = _tool_result_text(b) | |
| # --- walk rows: assign every row to a turn; build events + turns -------- # | |
| turns: list[Turn] = [] | |
| events: list[Event] = [] | |
| cur: Optional[Turn] = None | |
| cur_req_ids: set[str] = set() | |
| reply_parts: list[str] = [] | |
| def _finalize(turn: Optional[Turn]) -> None: | |
| if turn is None: | |
| return | |
| turn.reqs = len(cur_req_ids) | |
| turn.reply = "\n".join(p for p in reply_parts if p).strip() | |
| for idx, r in enumerate(rows): | |
| rtype = r.get("type") | |
| # SIDECHAIN FOLD: sub-agent (Task) rows fold into the current parent turn. | |
| # Suppress their boundary candidacy so they can never open a turn; their | |
| # tool_use / tool_result / token content then rolls up below exactly like | |
| # any inline assistant/user row. (No-op on this fixture: all rows false.) | |
| sidechain = _is_sidechain(r) | |
| is_boundary, prompt = (False, "") if sidechain else _is_turn_boundary(r) | |
| if is_boundary: | |
| # close the previous turn, open a new one | |
| _finalize(cur) | |
| cur_req_ids = set() | |
| reply_parts = [] | |
| cur = Turn( | |
| i=len(turns), | |
| prompt=prompt, | |
| origin=_origin_for(prompt), | |
| ts=r.get("timestamp"), | |
| ) | |
| turns.append(cur) | |
| events.append( | |
| Event( | |
| id=str(r.get("uuid", f"row{idx}")), | |
| turn=cur.i, | |
| role="user", | |
| kind="prompt", | |
| ts=r.get("timestamp"), | |
| input=prompt, | |
| ) | |
| ) | |
| continue | |
| # rows before the first boundary belong to no turn (only mode / | |
| # permission-mode / file-history-snapshot precede row index 3) → skip. | |
| if cur is None: | |
| continue | |
| turn_i = cur.i | |
| if rtype == "assistant": | |
| msg = r.get("message", {}) or {} | |
| usage = msg.get("usage", {}) or {} | |
| req_id = r.get("requestId") | |
| if req_id is not None: | |
| cur_req_ids.add(req_id) | |
| # token rollup for the turn (sum across assistant rows) | |
| cur.tokens = cur.tokens.add( | |
| Tokens( | |
| in_=usage.get("input_tokens", 0) or 0, | |
| out=usage.get("output_tokens", 0) or 0, | |
| cacheRead=usage.get("cache_read_input_tokens", 0) or 0, | |
| cacheCreate=usage.get("cache_creation_input_tokens", 0) or 0, | |
| ) | |
| ) | |
| # point-in-time context-window occupancy (the "fuel gauge"): the prompt | |
| # size of THIS request = input + cacheRead + cacheCreate (output is the | |
| # reply, not occupancy). Sidechain rows run in a sub-agent's own window, so | |
| # they must NOT count toward the main thread's gauge — guard on `sidechain`. | |
| if not sidechain: | |
| occ = ( | |
| (usage.get("input_tokens", 0) or 0) | |
| + (usage.get("cache_read_input_tokens", 0) or 0) | |
| + (usage.get("cache_creation_input_tokens", 0) or 0) | |
| ) | |
| if occ: | |
| if not cur.ctxStart: | |
| cur.ctxStart = occ | |
| if occ > cur.ctxPeak: | |
| cur.ctxPeak = occ | |
| cur.ctxEnd = occ | |
| for b in msg.get("content", []) or []: | |
| if not isinstance(b, dict): | |
| continue | |
| btype = b.get("type") | |
| if btype == "tool_use": | |
| name = b.get("name", "") | |
| inp = b.get("input") | |
| mcp = _mcp_of(name, r) | |
| tuid = b.get("id") | |
| rtext = result_text_by_id.get(tuid) | |
| tc = ToolCall( | |
| name=name, | |
| input=inp, | |
| summary=_summary(name, inp, mcp), | |
| mcp=mcp, | |
| id=tuid, | |
| result_text=rtext, | |
| ts=r.get("timestamp"), | |
| # Phase-2 fills provenance/sourceTool/flowValue/errored | |
| ) | |
| cur.tools.append(tc) | |
| events.append( | |
| Event( | |
| id=str(tuid) if tuid is not None else f"row{idx}-tooluse", | |
| turn=turn_i, | |
| role="assistant", | |
| kind="tool_use", | |
| ts=r.get("timestamp"), | |
| tool=name, | |
| input=inp, | |
| mcp=mcp, | |
| ) | |
| ) | |
| elif btype == "text": | |
| vis = _visible_text(b) | |
| if vis is not None: | |
| reply_parts.append(vis) | |
| events.append( | |
| Event( | |
| id=str(r.get("uuid", f"row{idx}")) + "-text", | |
| turn=turn_i, | |
| role="assistant", | |
| kind="text", | |
| ts=r.get("timestamp"), | |
| input=vis, | |
| ) | |
| ) | |
| # 'thinking' blocks are not visible reply; they emit no event/text | |
| elif rtype == "user": | |
| # tool_result user rows (lists) — emit tool_result events | |
| content = r.get("message", {}).get("content") | |
| if isinstance(content, list): | |
| for b in content: | |
| if isinstance(b, dict) and b.get("type") == "tool_result": | |
| tuid = b.get("tool_use_id") | |
| events.append( | |
| Event( | |
| id=(str(tuid) + "-result") if tuid is not None else f"row{idx}-result", | |
| turn=turn_i, | |
| role="user", | |
| kind="tool_result", | |
| ts=r.get("timestamp"), | |
| resultText=_tool_result_text(b), | |
| ) | |
| ) | |
| # all other meta row types (system, mode, attachment, etc.) carry no | |
| # chain content → no event. | |
| _finalize(cur) | |
| return { | |
| "events": events, | |
| "turns": turns, | |
| "session": session, | |
| } | |