"""Append-only trace event log for live UI updates. Tool call args/results are captured by wrapping LiteForge tool callables. LiteForge's agent history only exposes step kinds, not tool I/O. """ from __future__ import annotations import json import re from dataclasses import dataclass, field from typing import Any, Literal TraceKind = Literal["tool_call", "tool_result", "tier_escalation", "final", "error"] _REDACTED = "[REDACTED]" _PREFIXES = ("sk-", "ghp_", "gho_", "ghs_", "github_pat_", "xoxb-", "xoxp-", "AKIA", "AIza", "glpat-") _SENSITIVE_KEYS = ( "api_key", "apikey", "token", "secret", "password", "passwd", "access_key", "client_secret", "private_key", ) @dataclass class TraceEvent: kind: TraceKind name: str detail: str step: int | None = None duration_ms: int | None = None tokens: int | None = None @dataclass class TraceCollector: """Thread-safe enough for asyncio single-task agent runs.""" events: list[TraceEvent] = field(default_factory=list) _tool_step: int = 0 def record(self, kind: TraceKind, name: str, detail: str, **meta) -> None: self.events.append(TraceEvent(kind=kind, name=name, detail=detail, **meta)) def record_tool_call(self, name: str, args: dict[str, Any]) -> None: self.record("tool_call", name, _format_payload(args), step=self._tool_step) def record_tool_result(self, name: str, result: dict[str, Any]) -> None: self.record("tool_result", name, _format_payload(result), step=self._tool_step) self._tool_step += 1 def record_escalation(self, from_tier: str, to_tier: str) -> None: self.record("tier_escalation", to_tier, f"escalated from {from_tier}") def record_final(self, text: str) -> None: self.record("final", "response", redact(text)) def record_error(self, text: str) -> None: self.record("error", "error", redact(text)) def snapshot(self) -> list[TraceEvent]: return list(self.events) def redact(text: str) -> str: """Conservative secret redaction for UI display.""" lines = [] for line in text.splitlines(keepends=True): content, nl = (line[:-1], "\n") if line.endswith("\n") else (line, "") lines.append(_redact_line(content) + nl) return "".join(lines) def _redact_line(line: str) -> str: out: list[str] = [] i = 0 while i < len(line): ch = line[i] if ch in "\"'`" or ch.isalnum() or ch in "_-": j = i while j < len(line) and not line[j].isspace() and line[j] not in ",;)]}": j += 1 token = line[i:j] if _looks_secret(token): out.append(_REDACTED) else: out.append(token) i = j continue if ch == "=" and i + 1 < len(line): key_start = i while key_start > 0 and (line[key_start - 1].isalnum() or line[key_start - 1] in "_-"): key_start -= 1 key = line[key_start:i].lower() if any(s in key for s in _SENSITIVE_KEYS): out.append(line[i : i + 1]) i += 1 j = i while j < len(line) and not line[j].isspace(): j += 1 out.append(_REDACTED) i = j continue out.append(ch) i += 1 return "".join(out) def _looks_secret(token: str) -> bool: for prefix in _PREFIXES: if token.startswith(prefix) and len(token) >= len(prefix) + 8: return True if len(token) >= 32 and re.fullmatch(r"[A-Za-z0-9_\-+/=]+", token): upper = sum(1 for c in token if c.isupper()) lower = sum(1 for c in token if c.islower()) digit = sum(1 for c in token if c.isdigit()) if upper >= 4 and lower >= 4 and digit >= 2: return True return False def _format_payload(data: dict[str, Any], *, max_content: int = 600) -> str: """JSON-format tool args/results, truncating large file content.""" out = dict(data) if "content" in out and isinstance(out["content"], str): text = out["content"] if len(text) > max_content: out["content"] = text[:max_content] + f"\n… ({len(text)} chars total)" raw = json.dumps(out, indent=2, ensure_ascii=False) return redact(raw)