trace-field-notes / parser.py
JacobLinCool's picture
feat: implement trace field notes mvp
849ee7b verified
Raw
History Blame Contribute Delete
10.9 kB
"""Trace parsing and narrative-message extraction."""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any, Iterable
from schemas import AgentType, NarrativeMessage
TEXT_KEYS = ("text", "message", "summary", "transcript", "output", "body")
TOOLISH_TYPE_FRAGMENTS = (
"tool",
"function_call",
"function_result",
"command",
"exec",
"screenshot",
"image",
"patch",
"diff",
)
TOOLISH_KEYS = (
"tool_call_id",
"tool_use_id",
"tool_calls",
"tool_results",
"function_call",
"arguments",
"input_json",
"output_json",
)
class TraceParseError(ValueError):
"""Raised when an uploaded trace cannot be parsed into narrative messages."""
def parse_trace(
path: str | Path,
*,
include_user_context: bool = True,
ignore_tool_calls: bool = True,
) -> tuple[list[NarrativeMessage], AgentType]:
"""Parse an uploaded trace and return visible narrative messages plus agent guess."""
trace_path = Path(path)
records = load_records(trace_path)
agent_type = guess_agent_type(records, trace_path)
messages: list[NarrativeMessage] = []
for raw_index, record in enumerate(records):
for role, text, timestamp, source in normalize_record(
record,
raw_index=raw_index,
ignore_tool_calls=ignore_tool_calls,
):
cleaned = normalize_whitespace(text)
if not cleaned:
continue
if role == "assistant" or (role == "user" and include_user_context):
messages.append(
NarrativeMessage(
index=len(messages),
role=role,
text=cleaned,
timestamp=timestamp,
source=source,
)
)
return messages, agent_type
def load_records(path: Path) -> list[Any]:
"""Load JSONL, JSON, or plain text records from disk."""
try:
text = path.read_text(encoding="utf-8", errors="replace")
except OSError as exc:
raise TraceParseError(f"Could not read uploaded file: {exc}") from exc
if not text.strip():
raise TraceParseError("The uploaded trace is empty.")
suffix = path.suffix.lower()
if suffix == ".json":
try:
parsed = json.loads(text)
except json.JSONDecodeError as exc:
raise TraceParseError(f"Invalid JSON: {exc}") from exc
return records_from_json(parsed)
if suffix in {".jsonl", ".log", ".txt", ""}:
records = try_jsonl(text)
if records:
return records
return records_from_plain_text(text)
records = try_jsonl(text)
return records if records else records_from_plain_text(text)
def records_from_json(parsed: Any) -> list[Any]:
if isinstance(parsed, list):
return parsed
if isinstance(parsed, dict):
for key in ("messages", "turns", "events", "records", "items"):
value = parsed.get(key)
if isinstance(value, list):
return value
return [parsed]
return [{"type": "text", "role": "assistant", "content": str(parsed)}]
def try_jsonl(text: str) -> list[Any]:
records: list[Any] = []
saw_json = False
for line in text.splitlines():
if not line.strip():
continue
try:
records.append(json.loads(line))
saw_json = True
except json.JSONDecodeError:
if saw_json:
records.append({"type": "text", "role": "assistant", "content": line})
else:
return []
return records if saw_json else []
def records_from_plain_text(text: str) -> list[Any]:
records: list[Any] = []
current_role = "assistant"
buffer: list[str] = []
def flush() -> None:
nonlocal buffer
content = "\n".join(buffer).strip()
if content:
records.append({"type": "text", "role": current_role, "content": content})
buffer = []
for line in text.splitlines():
lowered = line.strip().lower()
if lowered.startswith(("assistant:", "agent:")):
flush()
current_role = "assistant"
buffer.append(line.split(":", 1)[1].strip())
elif lowered.startswith("user:"):
flush()
current_role = "user"
buffer.append(line.split(":", 1)[1].strip())
else:
buffer.append(line)
flush()
if not records:
records.append({"type": "text", "role": "assistant", "content": text})
return records
def guess_agent_type(records: Iterable[Any], path: Path | None = None) -> AgentType:
path_text = str(path or "").lower()
if ".codex" in path_text or "/codex/" in path_text:
return "codex"
if ".claude" in path_text or "claude" in path_text:
return "claude_code"
if ".pi" in path_text or "/pi/" in path_text:
return "pi"
sample = list(records[:20] if isinstance(records, list) else records)
for record in sample:
if not isinstance(record, dict):
continue
top_type = str(record.get("type", "")).lower()
payload = record.get("payload")
message = record.get("message")
if top_type in {"session_meta", "turn_context", "response_item", "event_msg"}:
return "codex"
if isinstance(payload, dict) and (
payload.get("originator") == "codex_cli"
or str(payload.get("type", "")).startswith(("agent_", "user_"))
):
return "codex"
if "parentUuid" in record or "sessionId" in record or "userType" in record:
return "claude_code"
if isinstance(message, dict) and "claude" in str(message.get("model", "")).lower():
return "claude_code"
if top_type.startswith("pi_") or "pi agent" in json.dumps(record, default=str).lower()[:1000]:
return "pi"
return "unknown"
def normalize_record(
record: Any,
*,
raw_index: int,
ignore_tool_calls: bool,
) -> list[tuple[str, str, str | None, str]]:
"""Return zero or more role/text/timestamp/source tuples from one raw record."""
if isinstance(record, str):
return [("assistant", record, None, "plain_text")]
if not isinstance(record, dict):
return [("assistant", str(record), None, "plain_text")]
timestamp = find_timestamp(record)
candidates: list[tuple[str | None, Any, str]] = []
payload = record.get("payload")
if isinstance(payload, dict):
role = normalize_role(payload.get("role"))
if role is None and str(payload.get("type", "")).lower().startswith("agent"):
role = "assistant"
if role is None and str(payload.get("type", "")).lower().startswith("user"):
role = "user"
for key in ("content", "message", "summary", "text"):
if key in payload:
candidates.append((role, payload[key], f"payload.{key}"))
message = record.get("message")
if isinstance(message, dict):
role = normalize_role(message.get("role")) or normalize_role(record.get("type"))
for key in ("content", "text", "message"):
if key in message:
candidates.append((role, message[key], f"message.{key}"))
elif message is not None:
role = normalize_role(record.get("role")) or normalize_role(record.get("type"))
candidates.append((role, message, "message"))
role = normalize_role(record.get("role")) or normalize_role(record.get("type"))
for key in ("content", "text", "summary", "body"):
if key in record:
candidates.append((role, record[key], key))
normalized: list[tuple[str, str, str | None, str]] = []
seen: set[tuple[str, str]] = set()
for maybe_role, content, source in candidates:
role = maybe_role or "assistant"
if role not in {"assistant", "user"}:
continue
text = extract_text(content, ignore_tool_calls=ignore_tool_calls)
if not text:
continue
key = (role, text)
if key in seen:
continue
seen.add(key)
normalized.append((role, text, timestamp, source))
return normalized
def normalize_role(value: Any) -> str | None:
role = str(value or "").lower()
if role in {"assistant", "agent", "agent_message", "response_item"}:
return "assistant"
if role in {"user", "human", "user_message"}:
return "user"
return None
def find_timestamp(record: dict[str, Any]) -> str | None:
for key in ("timestamp", "created_at", "time", "date"):
value = record.get(key)
if isinstance(value, str) and value.strip():
return value.strip()
for key in ("payload", "message", "snapshot"):
value = record.get(key)
if isinstance(value, dict):
nested = find_timestamp(value)
if nested:
return nested
return None
def extract_text(content: Any, *, ignore_tool_calls: bool) -> str:
"""Extract visible prose from known chat content shapes."""
if content is None:
return ""
if isinstance(content, str):
return content
if isinstance(content, (int, float, bool)):
return str(content)
if isinstance(content, list):
parts = [extract_text(item, ignore_tool_calls=ignore_tool_calls) for item in content]
return "\n\n".join(part for part in parts if part.strip())
if isinstance(content, dict):
if ignore_tool_calls and is_toolish(content):
return ""
for key in TEXT_KEYS:
value = content.get(key)
if value is not None:
text = extract_text(value, ignore_tool_calls=ignore_tool_calls)
if text.strip():
return text
if "content" in content:
return extract_text(content["content"], ignore_tool_calls=ignore_tool_calls)
return ""
def is_toolish(item: dict[str, Any]) -> bool:
item_type = str(item.get("type", "")).lower()
role = str(item.get("role", "")).lower()
name = str(item.get("name", "")).lower()
if role == "tool":
return True
if any(fragment in item_type for fragment in TOOLISH_TYPE_FRAGMENTS):
return True
if any(fragment in name for fragment in TOOLISH_TYPE_FRAGMENTS):
return True
return any(key in item for key in TOOLISH_KEYS)
def normalize_whitespace(text: str) -> str:
lines = [line.rstrip() for line in text.replace("\r\n", "\n").replace("\r", "\n").split("\n")]
while lines and not lines[0].strip():
lines.pop(0)
while lines and not lines[-1].strip():
lines.pop()
return "\n".join(lines)