"""Trace parsing and narrative-message extraction.""" from __future__ import annotations import json from pathlib import Path from typing import Any, Iterable from schemas import AgentType, NarrativeMessage TEXT_KEYS = ("text", "message", "summary", "transcript", "output", "body") TOOLISH_TYPE_FRAGMENTS = ( "tool", "function_call", "function_result", "command", "exec", "screenshot", "image", "patch", "diff", ) TOOLISH_KEYS = ( "tool_call_id", "tool_use_id", "tool_calls", "tool_results", "function_call", "arguments", "input_json", "output_json", ) class TraceParseError(ValueError): """Raised when an uploaded trace cannot be parsed into narrative messages.""" def parse_trace( path: str | Path, *, include_user_context: bool = True, ignore_tool_calls: bool = True, ) -> tuple[list[NarrativeMessage], AgentType]: """Parse an uploaded trace and return visible narrative messages plus agent guess.""" trace_path = Path(path) records = load_records(trace_path) agent_type = guess_agent_type(records, trace_path) messages: list[NarrativeMessage] = [] for raw_index, record in enumerate(records): for role, text, timestamp, source in normalize_record( record, raw_index=raw_index, ignore_tool_calls=ignore_tool_calls, ): cleaned = normalize_whitespace(text) if not cleaned: continue if role == "assistant" or (role == "user" and include_user_context): messages.append( NarrativeMessage( index=len(messages), role=role, text=cleaned, timestamp=timestamp, source=source, ) ) return messages, agent_type def load_records(path: Path) -> list[Any]: """Load JSONL, JSON, or plain text records from disk.""" try: text = path.read_text(encoding="utf-8", errors="replace") except OSError as exc: raise TraceParseError(f"Could not read uploaded file: {exc}") from exc if not text.strip(): raise TraceParseError("The uploaded trace is empty.") suffix = path.suffix.lower() if suffix == ".json": try: parsed = json.loads(text) except json.JSONDecodeError as exc: raise TraceParseError(f"Invalid JSON: {exc}") from exc return records_from_json(parsed) if suffix in {".jsonl", ".log", ".txt", ""}: records = try_jsonl(text) if records: return records return records_from_plain_text(text) records = try_jsonl(text) return records if records else records_from_plain_text(text) def records_from_json(parsed: Any) -> list[Any]: if isinstance(parsed, list): return parsed if isinstance(parsed, dict): for key in ("messages", "turns", "events", "records", "items"): value = parsed.get(key) if isinstance(value, list): return value return [parsed] return [{"type": "text", "role": "assistant", "content": str(parsed)}] def try_jsonl(text: str) -> list[Any]: records: list[Any] = [] saw_json = False for line in text.splitlines(): if not line.strip(): continue try: records.append(json.loads(line)) saw_json = True except json.JSONDecodeError: if saw_json: records.append({"type": "text", "role": "assistant", "content": line}) else: return [] return records if saw_json else [] def records_from_plain_text(text: str) -> list[Any]: records: list[Any] = [] current_role = "assistant" buffer: list[str] = [] def flush() -> None: nonlocal buffer content = "\n".join(buffer).strip() if content: records.append({"type": "text", "role": current_role, "content": content}) buffer = [] for line in text.splitlines(): lowered = line.strip().lower() if lowered.startswith(("assistant:", "agent:")): flush() current_role = "assistant" buffer.append(line.split(":", 1)[1].strip()) elif lowered.startswith("user:"): flush() current_role = "user" buffer.append(line.split(":", 1)[1].strip()) else: buffer.append(line) flush() if not records: records.append({"type": "text", "role": "assistant", "content": text}) return records def guess_agent_type(records: Iterable[Any], path: Path | None = None) -> AgentType: path_text = str(path or "").lower() if ".codex" in path_text or "/codex/" in path_text: return "codex" if ".claude" in path_text or "claude" in path_text: return "claude_code" if ".pi" in path_text or "/pi/" in path_text: return "pi" sample = list(records[:20] if isinstance(records, list) else records) for record in sample: if not isinstance(record, dict): continue top_type = str(record.get("type", "")).lower() payload = record.get("payload") message = record.get("message") if top_type in {"session_meta", "turn_context", "response_item", "event_msg"}: return "codex" if isinstance(payload, dict) and ( payload.get("originator") == "codex_cli" or str(payload.get("type", "")).startswith(("agent_", "user_")) ): return "codex" if "parentUuid" in record or "sessionId" in record or "userType" in record: return "claude_code" if isinstance(message, dict) and "claude" in str(message.get("model", "")).lower(): return "claude_code" if top_type.startswith("pi_") or "pi agent" in json.dumps(record, default=str).lower()[:1000]: return "pi" return "unknown" def normalize_record( record: Any, *, raw_index: int, ignore_tool_calls: bool, ) -> list[tuple[str, str, str | None, str]]: """Return zero or more role/text/timestamp/source tuples from one raw record.""" if isinstance(record, str): return [("assistant", record, None, "plain_text")] if not isinstance(record, dict): return [("assistant", str(record), None, "plain_text")] timestamp = find_timestamp(record) candidates: list[tuple[str | None, Any, str]] = [] payload = record.get("payload") if isinstance(payload, dict): role = normalize_role(payload.get("role")) if role is None and str(payload.get("type", "")).lower().startswith("agent"): role = "assistant" if role is None and str(payload.get("type", "")).lower().startswith("user"): role = "user" for key in ("content", "message", "summary", "text"): if key in payload: candidates.append((role, payload[key], f"payload.{key}")) message = record.get("message") if isinstance(message, dict): role = normalize_role(message.get("role")) or normalize_role(record.get("type")) for key in ("content", "text", "message"): if key in message: candidates.append((role, message[key], f"message.{key}")) elif message is not None: role = normalize_role(record.get("role")) or normalize_role(record.get("type")) candidates.append((role, message, "message")) role = normalize_role(record.get("role")) or normalize_role(record.get("type")) for key in ("content", "text", "summary", "body"): if key in record: candidates.append((role, record[key], key)) normalized: list[tuple[str, str, str | None, str]] = [] seen: set[tuple[str, str]] = set() for maybe_role, content, source in candidates: role = maybe_role or "assistant" if role not in {"assistant", "user"}: continue text = extract_text(content, ignore_tool_calls=ignore_tool_calls) if not text: continue key = (role, text) if key in seen: continue seen.add(key) normalized.append((role, text, timestamp, source)) return normalized def normalize_role(value: Any) -> str | None: role = str(value or "").lower() if role in {"assistant", "agent", "agent_message", "response_item"}: return "assistant" if role in {"user", "human", "user_message"}: return "user" return None def find_timestamp(record: dict[str, Any]) -> str | None: for key in ("timestamp", "created_at", "time", "date"): value = record.get(key) if isinstance(value, str) and value.strip(): return value.strip() for key in ("payload", "message", "snapshot"): value = record.get(key) if isinstance(value, dict): nested = find_timestamp(value) if nested: return nested return None def extract_text(content: Any, *, ignore_tool_calls: bool) -> str: """Extract visible prose from known chat content shapes.""" if content is None: return "" if isinstance(content, str): return content if isinstance(content, (int, float, bool)): return str(content) if isinstance(content, list): parts = [extract_text(item, ignore_tool_calls=ignore_tool_calls) for item in content] return "\n\n".join(part for part in parts if part.strip()) if isinstance(content, dict): if ignore_tool_calls and is_toolish(content): return "" for key in TEXT_KEYS: value = content.get(key) if value is not None: text = extract_text(value, ignore_tool_calls=ignore_tool_calls) if text.strip(): return text if "content" in content: return extract_text(content["content"], ignore_tool_calls=ignore_tool_calls) return "" def is_toolish(item: dict[str, Any]) -> bool: item_type = str(item.get("type", "")).lower() role = str(item.get("role", "")).lower() name = str(item.get("name", "")).lower() if role == "tool": return True if any(fragment in item_type for fragment in TOOLISH_TYPE_FRAGMENTS): return True if any(fragment in name for fragment in TOOLISH_TYPE_FRAGMENTS): return True return any(key in item for key in TOOLISH_KEYS) def normalize_whitespace(text: str) -> str: lines = [line.rstrip() for line in text.replace("\r\n", "\n").replace("\r", "\n").split("\n")] while lines and not lines[0].strip(): lines.pop(0) while lines and not lines[-1].strip(): lines.pop() return "\n".join(lines)