Spaces:
Paused
Paused
| """Append-only trace event log for live UI updates. | |
| Tool call args/results are captured by wrapping LiteForge tool callables. | |
| LiteForge's agent history only exposes step kinds, not tool I/O. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import re | |
| from dataclasses import dataclass, field | |
| from typing import Any, Literal | |
| TraceKind = Literal["tool_call", "tool_result", "tier_escalation", "final", "error"] | |
| _REDACTED = "[REDACTED]" | |
| _PREFIXES = ("sk-", "ghp_", "gho_", "ghs_", "github_pat_", "xoxb-", "xoxp-", "AKIA", "AIza", "glpat-") | |
| _SENSITIVE_KEYS = ( | |
| "api_key", "apikey", "token", "secret", "password", "passwd", | |
| "access_key", "client_secret", "private_key", | |
| ) | |
| class TraceEvent: | |
| kind: TraceKind | |
| name: str | |
| detail: str | |
| step: int | None = None | |
| duration_ms: int | None = None | |
| tokens: int | None = None | |
| class TraceCollector: | |
| """Thread-safe enough for asyncio single-task agent runs.""" | |
| events: list[TraceEvent] = field(default_factory=list) | |
| _tool_step: int = 0 | |
| def record(self, kind: TraceKind, name: str, detail: str, **meta) -> None: | |
| self.events.append(TraceEvent(kind=kind, name=name, detail=detail, **meta)) | |
| def record_tool_call(self, name: str, args: dict[str, Any]) -> None: | |
| self.record("tool_call", name, _format_payload(args), step=self._tool_step) | |
| def record_tool_result(self, name: str, result: dict[str, Any]) -> None: | |
| self.record("tool_result", name, _format_payload(result), step=self._tool_step) | |
| self._tool_step += 1 | |
| def record_escalation(self, from_tier: str, to_tier: str) -> None: | |
| self.record("tier_escalation", to_tier, f"escalated from {from_tier}") | |
| def record_final(self, text: str) -> None: | |
| self.record("final", "response", redact(text)) | |
| def record_error(self, text: str) -> None: | |
| self.record("error", "error", redact(text)) | |
| def snapshot(self) -> list[TraceEvent]: | |
| return list(self.events) | |
| def redact(text: str) -> str: | |
| """Conservative secret redaction for UI display.""" | |
| lines = [] | |
| for line in text.splitlines(keepends=True): | |
| content, nl = (line[:-1], "\n") if line.endswith("\n") else (line, "") | |
| lines.append(_redact_line(content) + nl) | |
| return "".join(lines) | |
| def _redact_line(line: str) -> str: | |
| out: list[str] = [] | |
| i = 0 | |
| while i < len(line): | |
| ch = line[i] | |
| if ch in "\"'`" or ch.isalnum() or ch in "_-": | |
| j = i | |
| while j < len(line) and not line[j].isspace() and line[j] not in ",;)]}": | |
| j += 1 | |
| token = line[i:j] | |
| if _looks_secret(token): | |
| out.append(_REDACTED) | |
| else: | |
| out.append(token) | |
| i = j | |
| continue | |
| if ch == "=" and i + 1 < len(line): | |
| key_start = i | |
| while key_start > 0 and (line[key_start - 1].isalnum() or line[key_start - 1] in "_-"): | |
| key_start -= 1 | |
| key = line[key_start:i].lower() | |
| if any(s in key for s in _SENSITIVE_KEYS): | |
| out.append(line[i : i + 1]) | |
| i += 1 | |
| j = i | |
| while j < len(line) and not line[j].isspace(): | |
| j += 1 | |
| out.append(_REDACTED) | |
| i = j | |
| continue | |
| out.append(ch) | |
| i += 1 | |
| return "".join(out) | |
| def _looks_secret(token: str) -> bool: | |
| for prefix in _PREFIXES: | |
| if token.startswith(prefix) and len(token) >= len(prefix) + 8: | |
| return True | |
| if len(token) >= 32 and re.fullmatch(r"[A-Za-z0-9_\-+/=]+", token): | |
| upper = sum(1 for c in token if c.isupper()) | |
| lower = sum(1 for c in token if c.islower()) | |
| digit = sum(1 for c in token if c.isdigit()) | |
| if upper >= 4 and lower >= 4 and digit >= 2: | |
| return True | |
| return False | |
| def _format_payload(data: dict[str, Any], *, max_content: int = 600) -> str: | |
| """JSON-format tool args/results, truncating large file content.""" | |
| out = dict(data) | |
| if "content" in out and isinstance(out["content"], str): | |
| text = out["content"] | |
| if len(text) > max_content: | |
| out["content"] = text[:max_content] + f"\n… ({len(text)} chars total)" | |
| raw = json.dumps(out, indent=2, ensure_ascii=False) | |
| return redact(raw) | |