File size: 8,807 Bytes
5f43c7d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
"""The Event/Turn contract — the seam between loaders and the engine.

Non-negotiable #3: the engine consumes ONLY these normalized shapes; it never
reaches into raw JSONL. Loaders (jsonl now, hf later) emit these dataclasses.

Each dataclass carries a `to_dict()` that emits the CONTRACT JSON exactly:
  Event   = { id, turn, role, kind, tool?, input?, resultText?, tokens?, ts, mcp? }
  Turn    = { i, prompt, origin, reply, ts, tools:[ToolCall], tokens:Tokens,
              reqs, direct, indirect, heavy, guide? }
  ToolCall= { id, name, input, summary, mcp?, provenance, sourceTool, flowValue, errored }
  Tokens  = { in, out, cacheRead, cacheCreate }   # NB: JSON key is "in", not "in_"

Phase-1 note: provenance / direct / indirect / heavy / guide are populated by the
Phase-2 engine. The loader leaves them at their neutral defaults. NO model here.
"""
from __future__ import annotations

from dataclasses import dataclass, field
from typing import Any, Optional


# --------------------------------------------------------------------------- #
# Tokens
# --------------------------------------------------------------------------- #
# Anthropic cost weights, expressed as multiples of the base (uncached) input
# token price. They are UNIFORM across Opus / Sonnet / Haiku (output is 5x input,
# a cache write is 1.25x, a cache read is 0.1x on every Claude model), so a single
# weighted sum is a faithful, model-agnostic proxy for what the run actually costs.
# `cost()` returns "input-token-equivalents": multiply by the model's input $/token
# for dollars. cacheCreate uses the 5-minute-TTL write multiplier (Claude Code's
# default ephemeral cache); a 1-hour write would be 2.0, which the usage object
# does not let us distinguish, so 1.25 is the documented assumption.
COST_WEIGHTS = {"in": 1.0, "cacheCreate": 1.25, "cacheRead": 0.1, "out": 5.0}


@dataclass
class Tokens:
    """Token rollup. Field `in_` serializes to the JSON key "in" (a reserved word)."""

    in_: int = 0
    out: int = 0
    cacheRead: int = 0
    cacheCreate: int = 0

    def cost(self) -> int:
        """Cost-weighted tokens (input-token-equivalents) — the real-money signal,
        NOT raw cacheRead. cacheRead is cheap (0.1x) and re-paid every round-trip; a
        turn's true cost is dominated by generation (5x). Linear in the fields, so
        summing per-turn costs == cost of the summed totals (additive-safe)."""
        return round(
            self.in_ * COST_WEIGHTS["in"]
            + self.cacheCreate * COST_WEIGHTS["cacheCreate"]
            + self.cacheRead * COST_WEIGHTS["cacheRead"]
            + self.out * COST_WEIGHTS["out"]
        )

    def to_dict(self) -> dict[str, int]:
        return {
            "in": self.in_,
            "out": self.out,
            "cacheRead": self.cacheRead,
            "cacheCreate": self.cacheCreate,
            "cost": self.cost(),  # cost-weighted total (the ranking key)
        }

    def add(self, other: "Tokens") -> "Tokens":
        return Tokens(
            in_=self.in_ + other.in_,
            out=self.out + other.out,
            cacheRead=self.cacheRead + other.cacheRead,
            cacheCreate=self.cacheCreate + other.cacheCreate,
        )


# --------------------------------------------------------------------------- #
# ToolCall
# --------------------------------------------------------------------------- #
@dataclass
class ToolCall:
    """One tool_use, linked to its tool_result.

    Internal-only fields (id, result_text, ts, errored, provenance, sourceTool,
    flowValue) may be carried for later phases; to_dict emits the contract fields
    plus `id` (the UI needs the stable node id).
    """

    name: str
    input: Any
    summary: str
    # contract fields the engine/Phase-2 will fill; neutral defaults in Phase 1
    mcp: Optional[dict[str, str]] = None
    provenance: Optional[str] = None  # 'direct' | 'indirect'  (Phase 2)
    sourceTool: Optional[str] = None  # (Phase 2)
    flowValue: Optional[str] = None  # (Phase 2)
    errored: Optional[bool] = None  # (Phase 2)
    # internal carriers (also useful to the UI / Phase 2)
    id: Optional[str] = None
    result_text: Optional[str] = None
    ts: Optional[str] = None

    def to_dict(self) -> dict[str, Any]:
        return {
            "id": self.id,
            "name": self.name,
            "input": self.input,
            "summary": self.summary,
            "mcp": self.mcp,
            "provenance": self.provenance,
            "sourceTool": self.sourceTool,
            "flowValue": self.flowValue,
            "errored": self.errored,
        }


# --------------------------------------------------------------------------- #
# Turn
# --------------------------------------------------------------------------- #
@dataclass
class Turn:
    """A reconstructed query: one real prompt + everything it caused."""

    i: int
    prompt: str
    origin: str  # 'human' | 'system'
    reply: str = ""
    ts: Optional[str] = None
    tools: list[ToolCall] = field(default_factory=list)
    tokens: Tokens = field(default_factory=Tokens)
    reqs: int = 0
    # Point-in-time context-WINDOW occupancy (input + cacheRead + cacheCreate of a
    # single request) — the "fuel gauge", NOT the cumulative token sums above. Bounded
    # by the model's window (≤1M on Opus 4.8); a single request can never exceed it.
    # Excludes sidechain (sub-agent) requests — those run in their own window. ctxStart/
    # ctxEnd/ctxPeak are the first/last/max occupancy across this turn's main-thread
    # requests; a sharp drop between turns signals a compaction. 0 == no usage seen.
    ctxStart: int = 0
    ctxPeak: int = 0
    ctxEnd: int = 0
    # Phase-2 (engine) fields — neutral defaults in Phase 1
    direct: int = 0
    indirect: int = 0
    heavy: bool = False  # top-N by COST (relative "heaviest" — drives the graph glow)
    overBudget: bool = False  # cost >= an absolute floor (every expensive turn, not just top-N)
    guide: Optional[dict[str, str]] = None

    def to_dict(self) -> dict[str, Any]:
        out: dict[str, Any] = {
            "i": self.i,
            "prompt": self.prompt,
            "origin": self.origin,
            "reply": self.reply,
            "ts": self.ts,
            "tools": [t.to_dict() for t in self.tools],
            "tokens": self.tokens.to_dict(),
            "reqs": self.reqs,
            "ctxStart": self.ctxStart,
            "ctxPeak": self.ctxPeak,
            "ctxEnd": self.ctxEnd,
            "direct": self.direct,
            "indirect": self.indirect,
            "heavy": self.heavy,
            "overBudget": self.overBudget,
        }
        # `guide` is present ONLY when a pattern fires (Phase 2). Silence otherwise.
        if self.guide is not None:
            out["guide"] = self.guide
        return out


# --------------------------------------------------------------------------- #
# Event
# --------------------------------------------------------------------------- #
@dataclass
class Event:
    """A flat, ordered atom of the session timeline (prompt/text/tool_use/tool_result)."""

    id: str
    turn: int
    role: str  # 'user' | 'assistant'
    kind: str  # 'prompt' | 'tool_use' | 'tool_result' | 'text'
    ts: Optional[str] = None
    tool: Optional[str] = None
    input: Any = None
    resultText: Optional[str] = None
    tokens: Optional[Tokens] = None
    mcp: Optional[dict[str, str]] = None

    def to_dict(self) -> dict[str, Any]:
        out: dict[str, Any] = {
            "id": self.id,
            "turn": self.turn,
            "role": self.role,
            "kind": self.kind,
            "ts": self.ts,
        }
        # optional fields — only emit when present, keeps the contract clean
        if self.tool is not None:
            out["tool"] = self.tool
        if self.input is not None:
            out["input"] = self.input
        if self.resultText is not None:
            out["resultText"] = self.resultText
        if self.tokens is not None:
            out["tokens"] = self.tokens.to_dict()
        if self.mcp is not None:
            out["mcp"] = self.mcp
        return out


# --------------------------------------------------------------------------- #
# Helper
# --------------------------------------------------------------------------- #
def to_jsonable(obj: Any) -> Any:
    """Recursively turn contract dataclasses (and containers of them) into
    plain JSON-serializable structures. Used to dump load() output to JSON."""
    if hasattr(obj, "to_dict") and callable(obj.to_dict):
        return obj.to_dict()
    if isinstance(obj, dict):
        return {k: to_jsonable(v) for k, v in obj.items()}
    if isinstance(obj, (list, tuple)):
        return [to_jsonable(v) for v in obj]
    return obj