File size: 10,851 Bytes
849ee7b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
"""Trace parsing and narrative-message extraction."""

from __future__ import annotations

import json
from pathlib import Path
from typing import Any, Iterable

from schemas import AgentType, NarrativeMessage


TEXT_KEYS = ("text", "message", "summary", "transcript", "output", "body")
TOOLISH_TYPE_FRAGMENTS = (
    "tool",
    "function_call",
    "function_result",
    "command",
    "exec",
    "screenshot",
    "image",
    "patch",
    "diff",
)
TOOLISH_KEYS = (
    "tool_call_id",
    "tool_use_id",
    "tool_calls",
    "tool_results",
    "function_call",
    "arguments",
    "input_json",
    "output_json",
)


class TraceParseError(ValueError):
    """Raised when an uploaded trace cannot be parsed into narrative messages."""


def parse_trace(
    path: str | Path,
    *,
    include_user_context: bool = True,
    ignore_tool_calls: bool = True,
) -> tuple[list[NarrativeMessage], AgentType]:
    """Parse an uploaded trace and return visible narrative messages plus agent guess."""

    trace_path = Path(path)
    records = load_records(trace_path)
    agent_type = guess_agent_type(records, trace_path)

    messages: list[NarrativeMessage] = []
    for raw_index, record in enumerate(records):
        for role, text, timestamp, source in normalize_record(
            record,
            raw_index=raw_index,
            ignore_tool_calls=ignore_tool_calls,
        ):
            cleaned = normalize_whitespace(text)
            if not cleaned:
                continue
            if role == "assistant" or (role == "user" and include_user_context):
                messages.append(
                    NarrativeMessage(
                        index=len(messages),
                        role=role,
                        text=cleaned,
                        timestamp=timestamp,
                        source=source,
                    )
                )

    return messages, agent_type


def load_records(path: Path) -> list[Any]:
    """Load JSONL, JSON, or plain text records from disk."""

    try:
        text = path.read_text(encoding="utf-8", errors="replace")
    except OSError as exc:
        raise TraceParseError(f"Could not read uploaded file: {exc}") from exc

    if not text.strip():
        raise TraceParseError("The uploaded trace is empty.")

    suffix = path.suffix.lower()
    if suffix == ".json":
        try:
            parsed = json.loads(text)
        except json.JSONDecodeError as exc:
            raise TraceParseError(f"Invalid JSON: {exc}") from exc
        return records_from_json(parsed)

    if suffix in {".jsonl", ".log", ".txt", ""}:
        records = try_jsonl(text)
        if records:
            return records
        return records_from_plain_text(text)

    records = try_jsonl(text)
    return records if records else records_from_plain_text(text)


def records_from_json(parsed: Any) -> list[Any]:
    if isinstance(parsed, list):
        return parsed
    if isinstance(parsed, dict):
        for key in ("messages", "turns", "events", "records", "items"):
            value = parsed.get(key)
            if isinstance(value, list):
                return value
        return [parsed]
    return [{"type": "text", "role": "assistant", "content": str(parsed)}]


def try_jsonl(text: str) -> list[Any]:
    records: list[Any] = []
    saw_json = False
    for line in text.splitlines():
        if not line.strip():
            continue
        try:
            records.append(json.loads(line))
            saw_json = True
        except json.JSONDecodeError:
            if saw_json:
                records.append({"type": "text", "role": "assistant", "content": line})
            else:
                return []
    return records if saw_json else []


def records_from_plain_text(text: str) -> list[Any]:
    records: list[Any] = []
    current_role = "assistant"
    buffer: list[str] = []

    def flush() -> None:
        nonlocal buffer
        content = "\n".join(buffer).strip()
        if content:
            records.append({"type": "text", "role": current_role, "content": content})
        buffer = []

    for line in text.splitlines():
        lowered = line.strip().lower()
        if lowered.startswith(("assistant:", "agent:")):
            flush()
            current_role = "assistant"
            buffer.append(line.split(":", 1)[1].strip())
        elif lowered.startswith("user:"):
            flush()
            current_role = "user"
            buffer.append(line.split(":", 1)[1].strip())
        else:
            buffer.append(line)
    flush()

    if not records:
        records.append({"type": "text", "role": "assistant", "content": text})
    return records


def guess_agent_type(records: Iterable[Any], path: Path | None = None) -> AgentType:
    path_text = str(path or "").lower()
    if ".codex" in path_text or "/codex/" in path_text:
        return "codex"
    if ".claude" in path_text or "claude" in path_text:
        return "claude_code"
    if ".pi" in path_text or "/pi/" in path_text:
        return "pi"

    sample = list(records[:20] if isinstance(records, list) else records)
    for record in sample:
        if not isinstance(record, dict):
            continue
        top_type = str(record.get("type", "")).lower()
        payload = record.get("payload")
        message = record.get("message")
        if top_type in {"session_meta", "turn_context", "response_item", "event_msg"}:
            return "codex"
        if isinstance(payload, dict) and (
            payload.get("originator") == "codex_cli"
            or str(payload.get("type", "")).startswith(("agent_", "user_"))
        ):
            return "codex"
        if "parentUuid" in record or "sessionId" in record or "userType" in record:
            return "claude_code"
        if isinstance(message, dict) and "claude" in str(message.get("model", "")).lower():
            return "claude_code"
        if top_type.startswith("pi_") or "pi agent" in json.dumps(record, default=str).lower()[:1000]:
            return "pi"
    return "unknown"


def normalize_record(
    record: Any,
    *,
    raw_index: int,
    ignore_tool_calls: bool,
) -> list[tuple[str, str, str | None, str]]:
    """Return zero or more role/text/timestamp/source tuples from one raw record."""

    if isinstance(record, str):
        return [("assistant", record, None, "plain_text")]
    if not isinstance(record, dict):
        return [("assistant", str(record), None, "plain_text")]

    timestamp = find_timestamp(record)
    candidates: list[tuple[str | None, Any, str]] = []

    payload = record.get("payload")
    if isinstance(payload, dict):
        role = normalize_role(payload.get("role"))
        if role is None and str(payload.get("type", "")).lower().startswith("agent"):
            role = "assistant"
        if role is None and str(payload.get("type", "")).lower().startswith("user"):
            role = "user"
        for key in ("content", "message", "summary", "text"):
            if key in payload:
                candidates.append((role, payload[key], f"payload.{key}"))

    message = record.get("message")
    if isinstance(message, dict):
        role = normalize_role(message.get("role")) or normalize_role(record.get("type"))
        for key in ("content", "text", "message"):
            if key in message:
                candidates.append((role, message[key], f"message.{key}"))
    elif message is not None:
        role = normalize_role(record.get("role")) or normalize_role(record.get("type"))
        candidates.append((role, message, "message"))

    role = normalize_role(record.get("role")) or normalize_role(record.get("type"))
    for key in ("content", "text", "summary", "body"):
        if key in record:
            candidates.append((role, record[key], key))

    normalized: list[tuple[str, str, str | None, str]] = []
    seen: set[tuple[str, str]] = set()
    for maybe_role, content, source in candidates:
        role = maybe_role or "assistant"
        if role not in {"assistant", "user"}:
            continue
        text = extract_text(content, ignore_tool_calls=ignore_tool_calls)
        if not text:
            continue
        key = (role, text)
        if key in seen:
            continue
        seen.add(key)
        normalized.append((role, text, timestamp, source))

    return normalized


def normalize_role(value: Any) -> str | None:
    role = str(value or "").lower()
    if role in {"assistant", "agent", "agent_message", "response_item"}:
        return "assistant"
    if role in {"user", "human", "user_message"}:
        return "user"
    return None


def find_timestamp(record: dict[str, Any]) -> str | None:
    for key in ("timestamp", "created_at", "time", "date"):
        value = record.get(key)
        if isinstance(value, str) and value.strip():
            return value.strip()
    for key in ("payload", "message", "snapshot"):
        value = record.get(key)
        if isinstance(value, dict):
            nested = find_timestamp(value)
            if nested:
                return nested
    return None


def extract_text(content: Any, *, ignore_tool_calls: bool) -> str:
    """Extract visible prose from known chat content shapes."""

    if content is None:
        return ""
    if isinstance(content, str):
        return content
    if isinstance(content, (int, float, bool)):
        return str(content)
    if isinstance(content, list):
        parts = [extract_text(item, ignore_tool_calls=ignore_tool_calls) for item in content]
        return "\n\n".join(part for part in parts if part.strip())
    if isinstance(content, dict):
        if ignore_tool_calls and is_toolish(content):
            return ""
        for key in TEXT_KEYS:
            value = content.get(key)
            if value is not None:
                text = extract_text(value, ignore_tool_calls=ignore_tool_calls)
                if text.strip():
                    return text
        if "content" in content:
            return extract_text(content["content"], ignore_tool_calls=ignore_tool_calls)
    return ""


def is_toolish(item: dict[str, Any]) -> bool:
    item_type = str(item.get("type", "")).lower()
    role = str(item.get("role", "")).lower()
    name = str(item.get("name", "")).lower()
    if role == "tool":
        return True
    if any(fragment in item_type for fragment in TOOLISH_TYPE_FRAGMENTS):
        return True
    if any(fragment in name for fragment in TOOLISH_TYPE_FRAGMENTS):
        return True
    return any(key in item for key in TOOLISH_KEYS)


def normalize_whitespace(text: str) -> str:
    lines = [line.rstrip() for line in text.replace("\r\n", "\n").replace("\r", "\n").split("\n")]
    while lines and not lines[0].strip():
        lines.pop(0)
    while lines and not lines[-1].strip():
        lines.pop()
    return "\n".join(lines)