Spaces:
Running on Zero
Running on Zero
File size: 8,807 Bytes
5f43c7d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 | """The Event/Turn contract — the seam between loaders and the engine.
Non-negotiable #3: the engine consumes ONLY these normalized shapes; it never
reaches into raw JSONL. Loaders (jsonl now, hf later) emit these dataclasses.
Each dataclass carries a `to_dict()` that emits the CONTRACT JSON exactly:
Event = { id, turn, role, kind, tool?, input?, resultText?, tokens?, ts, mcp? }
Turn = { i, prompt, origin, reply, ts, tools:[ToolCall], tokens:Tokens,
reqs, direct, indirect, heavy, guide? }
ToolCall= { id, name, input, summary, mcp?, provenance, sourceTool, flowValue, errored }
Tokens = { in, out, cacheRead, cacheCreate } # NB: JSON key is "in", not "in_"
Phase-1 note: provenance / direct / indirect / heavy / guide are populated by the
Phase-2 engine. The loader leaves them at their neutral defaults. NO model here.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Optional
# --------------------------------------------------------------------------- #
# Tokens
# --------------------------------------------------------------------------- #
# Anthropic cost weights, expressed as multiples of the base (uncached) input
# token price. They are UNIFORM across Opus / Sonnet / Haiku (output is 5x input,
# a cache write is 1.25x, a cache read is 0.1x on every Claude model), so a single
# weighted sum is a faithful, model-agnostic proxy for what the run actually costs.
# `cost()` returns "input-token-equivalents": multiply by the model's input $/token
# for dollars. cacheCreate uses the 5-minute-TTL write multiplier (Claude Code's
# default ephemeral cache); a 1-hour write would be 2.0, which the usage object
# does not let us distinguish, so 1.25 is the documented assumption.
COST_WEIGHTS = {"in": 1.0, "cacheCreate": 1.25, "cacheRead": 0.1, "out": 5.0}
@dataclass
class Tokens:
"""Token rollup. Field `in_` serializes to the JSON key "in" (a reserved word)."""
in_: int = 0
out: int = 0
cacheRead: int = 0
cacheCreate: int = 0
def cost(self) -> int:
"""Cost-weighted tokens (input-token-equivalents) — the real-money signal,
NOT raw cacheRead. cacheRead is cheap (0.1x) and re-paid every round-trip; a
turn's true cost is dominated by generation (5x). Linear in the fields, so
summing per-turn costs == cost of the summed totals (additive-safe)."""
return round(
self.in_ * COST_WEIGHTS["in"]
+ self.cacheCreate * COST_WEIGHTS["cacheCreate"]
+ self.cacheRead * COST_WEIGHTS["cacheRead"]
+ self.out * COST_WEIGHTS["out"]
)
def to_dict(self) -> dict[str, int]:
return {
"in": self.in_,
"out": self.out,
"cacheRead": self.cacheRead,
"cacheCreate": self.cacheCreate,
"cost": self.cost(), # cost-weighted total (the ranking key)
}
def add(self, other: "Tokens") -> "Tokens":
return Tokens(
in_=self.in_ + other.in_,
out=self.out + other.out,
cacheRead=self.cacheRead + other.cacheRead,
cacheCreate=self.cacheCreate + other.cacheCreate,
)
# --------------------------------------------------------------------------- #
# ToolCall
# --------------------------------------------------------------------------- #
@dataclass
class ToolCall:
"""One tool_use, linked to its tool_result.
Internal-only fields (id, result_text, ts, errored, provenance, sourceTool,
flowValue) may be carried for later phases; to_dict emits the contract fields
plus `id` (the UI needs the stable node id).
"""
name: str
input: Any
summary: str
# contract fields the engine/Phase-2 will fill; neutral defaults in Phase 1
mcp: Optional[dict[str, str]] = None
provenance: Optional[str] = None # 'direct' | 'indirect' (Phase 2)
sourceTool: Optional[str] = None # (Phase 2)
flowValue: Optional[str] = None # (Phase 2)
errored: Optional[bool] = None # (Phase 2)
# internal carriers (also useful to the UI / Phase 2)
id: Optional[str] = None
result_text: Optional[str] = None
ts: Optional[str] = None
def to_dict(self) -> dict[str, Any]:
return {
"id": self.id,
"name": self.name,
"input": self.input,
"summary": self.summary,
"mcp": self.mcp,
"provenance": self.provenance,
"sourceTool": self.sourceTool,
"flowValue": self.flowValue,
"errored": self.errored,
}
# --------------------------------------------------------------------------- #
# Turn
# --------------------------------------------------------------------------- #
@dataclass
class Turn:
"""A reconstructed query: one real prompt + everything it caused."""
i: int
prompt: str
origin: str # 'human' | 'system'
reply: str = ""
ts: Optional[str] = None
tools: list[ToolCall] = field(default_factory=list)
tokens: Tokens = field(default_factory=Tokens)
reqs: int = 0
# Point-in-time context-WINDOW occupancy (input + cacheRead + cacheCreate of a
# single request) — the "fuel gauge", NOT the cumulative token sums above. Bounded
# by the model's window (≤1M on Opus 4.8); a single request can never exceed it.
# Excludes sidechain (sub-agent) requests — those run in their own window. ctxStart/
# ctxEnd/ctxPeak are the first/last/max occupancy across this turn's main-thread
# requests; a sharp drop between turns signals a compaction. 0 == no usage seen.
ctxStart: int = 0
ctxPeak: int = 0
ctxEnd: int = 0
# Phase-2 (engine) fields — neutral defaults in Phase 1
direct: int = 0
indirect: int = 0
heavy: bool = False # top-N by COST (relative "heaviest" — drives the graph glow)
overBudget: bool = False # cost >= an absolute floor (every expensive turn, not just top-N)
guide: Optional[dict[str, str]] = None
def to_dict(self) -> dict[str, Any]:
out: dict[str, Any] = {
"i": self.i,
"prompt": self.prompt,
"origin": self.origin,
"reply": self.reply,
"ts": self.ts,
"tools": [t.to_dict() for t in self.tools],
"tokens": self.tokens.to_dict(),
"reqs": self.reqs,
"ctxStart": self.ctxStart,
"ctxPeak": self.ctxPeak,
"ctxEnd": self.ctxEnd,
"direct": self.direct,
"indirect": self.indirect,
"heavy": self.heavy,
"overBudget": self.overBudget,
}
# `guide` is present ONLY when a pattern fires (Phase 2). Silence otherwise.
if self.guide is not None:
out["guide"] = self.guide
return out
# --------------------------------------------------------------------------- #
# Event
# --------------------------------------------------------------------------- #
@dataclass
class Event:
"""A flat, ordered atom of the session timeline (prompt/text/tool_use/tool_result)."""
id: str
turn: int
role: str # 'user' | 'assistant'
kind: str # 'prompt' | 'tool_use' | 'tool_result' | 'text'
ts: Optional[str] = None
tool: Optional[str] = None
input: Any = None
resultText: Optional[str] = None
tokens: Optional[Tokens] = None
mcp: Optional[dict[str, str]] = None
def to_dict(self) -> dict[str, Any]:
out: dict[str, Any] = {
"id": self.id,
"turn": self.turn,
"role": self.role,
"kind": self.kind,
"ts": self.ts,
}
# optional fields — only emit when present, keeps the contract clean
if self.tool is not None:
out["tool"] = self.tool
if self.input is not None:
out["input"] = self.input
if self.resultText is not None:
out["resultText"] = self.resultText
if self.tokens is not None:
out["tokens"] = self.tokens.to_dict()
if self.mcp is not None:
out["mcp"] = self.mcp
return out
# --------------------------------------------------------------------------- #
# Helper
# --------------------------------------------------------------------------- #
def to_jsonable(obj: Any) -> Any:
"""Recursively turn contract dataclasses (and containers of them) into
plain JSON-serializable structures. Used to dump load() output to JSON."""
if hasattr(obj, "to_dict") and callable(obj.to_dict):
return obj.to_dict()
if isinstance(obj, dict):
return {k: to_jsonable(v) for k, v in obj.items()}
if isinstance(obj, (list, tuple)):
return [to_jsonable(v) for v in obj]
return obj
|