Ordo
Initial public release
6425ec7
import datetime
import json
import os
import re
import time
from collections import Counter, deque
from pathlib import Path
from zoneinfo import ZoneInfo
import streamlit as st
if os.name == "nt" or Path("C:/openclaw-biz/state/openclaw").exists():
OPENCLAW_ROOT = Path("C:/openclaw-biz/state/openclaw")
else:
OPENCLAW_ROOT = Path("/home/node/.openclaw")
AGENTS_DIR = OPENCLAW_ROOT / "agents"
CRON_DIR = OPENCLAW_ROOT / "cron"
STATE_DIR = Path("/home/node/.openclaw/workspace/ops/state")
SESSION_AMPLIFIER_BASE = os.environ.get("SESSION_AMPLIFIER_BASE_URL", "http://session-amplifier:8477")
LOCAL_TZ = ZoneInfo("America/New_York")
def parse_ts_any(ts):
if ts is None:
return None
if isinstance(ts, datetime.datetime):
return ts if ts.tzinfo else ts.replace(tzinfo=datetime.timezone.utc)
if isinstance(ts, (int, float)):
try:
value = float(ts)
if value > 1e11:
value /= 1000.0
return datetime.datetime.fromtimestamp(value, tz=datetime.timezone.utc)
except Exception:
return None
if isinstance(ts, str):
text = ts.strip()
if not text:
return None
if text.endswith("Z"):
text = text[:-1] + "+00:00"
try:
dt = datetime.datetime.fromisoformat(text)
return dt if dt.tzinfo else dt.replace(tzinfo=datetime.timezone.utc)
except Exception:
return None
return None
def fmt_local(ts):
dt = parse_ts_any(ts)
if dt is None:
return ""
try:
return dt.astimezone(LOCAL_TZ).strftime("%Y-%m-%d %I:%M:%S %p %Z")
except Exception:
return dt.isoformat()
def fmt_duration(seconds):
"""Format seconds into human-readable duration."""
if seconds < 60:
return f"{int(seconds)}s"
elif seconds < 3600:
return f"{int(seconds/60)}m"
elif seconds < 86400:
return f"{int(seconds/3600)}h"
else:
return f"{int(seconds/86400)}d"
def read_json(path: Path):
try:
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
return None
def read_tail_lines(path: Path, count: int = 80):
try:
with path.open("r", encoding="utf-8", errors="ignore") as fh:
return list(deque(fh, maxlen=count))
except Exception:
return []
def clean_session_label(value: str, *, fallback: str = "") -> str:
text = (value or "").strip()
if not text:
return fallback
text = re.sub(r"^Discord thread\s+#?[^β€Ί]+β€Ί\s*", "", text)
text = re.sub(r"\s+channel id:\d+\s*$", "", text, flags=re.IGNORECASE)
text = text.strip(" #")
text = text.replace("-", " ")
text = re.sub(r"\s+", " ", text)
return text[:96] or fallback
def session_display_label(row: dict) -> str:
label = clean_session_label(row.get("label") or "")
if label:
return label
key = row.get("sessionKey") or row.get("sessionId") or "session"
kind = row.get("kind") or "session"
return f"{kind}: {str(key)[:32]}"
def normalize_exec_tool_label(content_block):
try:
name = content_block.get("name") or "unknown_tool"
if name != "exec":
return name
args = content_block.get("arguments")
cmd = None
if isinstance(args, dict):
cmd = args.get("command") or args.get("cmd")
elif isinstance(args, str):
cmd = args
if not isinstance(cmd, str):
return "exec"
if "mcporter" not in cmd.strip().lower():
return "exec"
import re
match = re.search(r"(?:^|\s)mcporter\s+call\s+([A-Za-z0-9_\-\.]+)", cmd)
if match:
return f"exec mcporter {match.group(1)}"
return "exec mcporter"
except Exception:
return content_block.get("name") or "unknown_tool"
def session_index_rows():
rows = []
session_key_by_agent_and_id = {}
if not AGENTS_DIR.exists():
return rows, session_key_by_agent_and_id
for agent_dir in sorted(AGENTS_DIR.iterdir()):
if not agent_dir.is_dir() or agent_dir.name.startswith("."):
continue
sessions_file = agent_dir / "sessions" / "sessions.json"
data = read_json(sessions_file)
if not isinstance(data, dict):
continue
for session_key, meta in data.items():
if not isinstance(meta, dict):
continue
updated_raw = meta.get("updatedAt") or meta.get("updatedAtMs")
session_id = meta.get("sessionId") or ""
delivery = meta.get("deliveryContext") if isinstance(meta.get("deliveryContext"), dict) else {}
kind = "session"
if ":subagent:" in session_key:
kind = "subagent"
elif ":cron:" in session_key:
kind = "cron"
elif ":discord:" in session_key:
kind = "discord"
origin = meta.get("origin") if isinstance(meta.get("origin"), dict) else {}
label = (
meta.get("displayName")
or meta.get("derivedTitle")
or meta.get("title")
or origin.get("label")
or meta.get("groupChannel")
or ""
)
row = {
"agent": agent_dir.name,
"sessionKey": session_key,
"sessionId": session_id,
"label": clean_session_label(str(label), fallback=session_key),
"updatedAt": parse_ts_any(updated_raw),
"updated": fmt_local(updated_raw),
"deliveryTo": delivery.get("to") or "",
"kind": kind,
}
rows.append(row)
if session_id:
session_key_by_agent_and_id[(agent_dir.name, session_id)] = row
rows.sort(
key=lambda row: row["updatedAt"] or datetime.datetime.min.replace(tzinfo=datetime.timezone.utc),
reverse=True,
)
return rows, session_key_by_agent_and_id
def transcript_activity_rows(session_lookup, max_files=80):
rows = []
if not AGENTS_DIR.exists():
return rows
jsonl_files = sorted(
AGENTS_DIR.rglob("sessions/*.jsonl"),
key=lambda path: path.stat().st_mtime if path.exists() else 0,
reverse=True,
)[:max_files]
for path in jsonl_files:
agent = path.parts[-3] if len(path.parts) >= 3 else "unknown"
session_id = path.stem
session_meta = session_lookup.get((agent, session_id))
session_key = session_meta.get("sessionKey") if isinstance(session_meta, dict) else session_meta
session_key = session_key or session_id
label = session_meta.get("label") if isinstance(session_meta, dict) else ""
last_ts = None
summary = ""
role = ""
for line in reversed(read_tail_lines(path, 120)):
line = line.strip()
if not line:
continue
try:
row = json.loads(line)
except Exception:
continue
msg = row.get("message") if isinstance(row.get("message"), dict) else {}
ts = parse_ts_any(row.get("timestamp") or row.get("time") or msg.get("timestamp") or msg.get("time"))
if ts and last_ts is None:
last_ts = ts
if not summary and isinstance(msg, dict):
role = msg.get("role") or row.get("type") or ""
content = msg.get("content") if isinstance(msg.get("content"), list) else []
for block in content:
if not isinstance(block, dict):
continue
if block.get("type") == "toolCall":
summary = f"tool: {normalize_exec_tool_label(block)}"
break
if block.get("type") == "text":
text = (block.get("text") or "").strip().replace("\n", " ")
if text:
summary = text[:160]
break
if not summary and msg.get("errorMessage"):
summary = str(msg.get("errorMessage"))[:160]
if last_ts and summary:
break
rows.append(
{
"agent": agent,
"sessionKey": session_key,
"sessionId": session_id,
"label": label or clean_session_label(session_key, fallback=session_id),
"updatedAt": last_ts,
"updated": fmt_local(last_ts),
"role": role,
"summary": summary or "(no recent summary)",
"file": str(path),
}
)
rows.sort(
key=lambda row: row["updatedAt"] or datetime.datetime.min.replace(tzinfo=datetime.timezone.utc),
reverse=True,
)
return rows
def list_cron_jobs(limit=100):
"""Enhanced cron job listing with health status and staleness detection."""
jobs_file = CRON_DIR / "jobs.json"
data = read_json(jobs_file)
jobs = data.get("jobs", []) if isinstance(data, dict) else []
rows = []
now = datetime.datetime.now(datetime.timezone.utc)
now_ms = time.time() * 1000
for job in jobs[:limit]:
if not isinstance(job, dict):
continue
state = job.get("state") if isinstance(job.get("state"), dict) else {}
schedule = job.get("schedule") if isinstance(job.get("schedule"), dict) else {}
payload = job.get("payload") if isinstance(job.get("payload"), dict) else {}
last_run_ms = state.get("lastRunAtMs")
next_run_ms = state.get("nextRunAtMs")
consecutive_errors = state.get("consecutiveErrors", 0)
last_status = state.get("lastStatus") or state.get("lastRunStatus") or ""
# Calculate staleness
staleness = None
if last_run_ms and next_run_ms:
# If next run was scheduled but hasn't happened yet, check if we're past due
if now_ms > next_run_ms + 300000: # 5 min grace period
staleness = fmt_duration((now_ms - next_run_ms) / 1000)
elif last_run_ms and schedule.get("expr"):
# Estimate staleness based on schedule (rough: check if > 2x expected interval)
pass # Skip for now - would need cron parsing
# Health status
health = "healthy"
health_reason = ""
if not job.get("enabled", True):
health = "disabled"
elif consecutive_errors >= 3:
health = "critical"
health_reason = f"{consecutive_errors} consecutive errors"
elif consecutive_errors > 0:
health = "warning"
health_reason = f"{consecutive_errors} error(s)"
elif last_status == "error":
health = "error"
elif staleness:
health = "stale"
health_reason = f"overdue by {staleness}"
elif state.get("runningAtMs"):
health = "running"
rows.append(
{
"name": job.get("name") or job.get("id") or "unnamed",
"jobId": job.get("id") or "",
"enabled": bool(job.get("enabled", True)),
"kind": payload.get("kind") or "",
"model": payload.get("model") or "",
"schedule": schedule.get("expr") or schedule.get("kind") or "",
"tz": schedule.get("tz") or "",
"nextRunAt": parse_ts_any(state.get("nextRunAtMs") or state.get("nextRunAt")),
"nextRun": fmt_local(state.get("nextRunAtMs") or state.get("nextRunAt")),
"runningAt": parse_ts_any(state.get("runningAtMs") or state.get("runningAt")),
"running": fmt_local(state.get("runningAtMs") or state.get("runningAt")),
"lastRunAt": parse_ts_any(state.get("lastRunAtMs") or state.get("lastRunAt")),
"lastRun": fmt_local(state.get("lastRunAtMs") or state.get("lastRunAt")),
"lastStatus": last_status,
"sessionKey": job.get("sessionKey") or "",
"consecutiveErrors": consecutive_errors,
"health": health,
"healthReason": health_reason,
"staleness": staleness,
}
)
rows.sort(
key=lambda row: (
{"critical": 0, "error": 1, "warning": 2, "stale": 3, "running": 4, "healthy": 5, "disabled": 6}.get(row["health"], 7),
row["runningAt"] or row["nextRunAt"] or row["lastRunAt"] or datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)
),
reverse=False,
)
return rows
def collect_metrics_last_24h(max_files=500):
now = datetime.datetime.now(datetime.timezone.utc)
cutoff = now - datetime.timedelta(hours=24)
jsonl_files = sorted(
AGENTS_DIR.rglob("sessions/*.jsonl"),
key=lambda path: path.stat().st_mtime if path.exists() else 0,
reverse=True,
)[:max_files]
tool_counts = Counter()
error_count = 0
total_events = 0
token_total = 0
input_tokens = 0
output_tokens = 0
cost_total = 0.0
cache_hits = 0
cache_samples = 0
latest_per_agent = {}
for path in jsonl_files:
agent = path.parts[-3] if len(path.parts) >= 3 else "unknown"
try:
with path.open("r", encoding="utf-8", errors="ignore") as fh:
for line in fh:
line = line.strip()
if not line:
continue
try:
row = json.loads(line)
except Exception:
continue
msg = row.get("message") if isinstance(row.get("message"), dict) else {}
ts = parse_ts_any(row.get("timestamp") or msg.get("timestamp") or row.get("time") or msg.get("time"))
if ts is None or ts < cutoff:
continue
total_events += 1
if msg.get("errorMessage") or msg.get("stopReason") == "error":
error_count += 1
summary = None
content = msg.get("content") if isinstance(msg.get("content"), list) else []
for block in content:
if not isinstance(block, dict):
continue
if block.get("type") == "toolCall":
label = normalize_exec_tool_label(block)
tool_counts[label] += 1
summary = summary or f"tool: {label}"
elif block.get("type") == "text" and msg.get("role") == "assistant":
text = (block.get("text") or "").strip().replace("\n", " ")
if text and summary is None:
summary = text[:120]
usage = msg.get("usage") if isinstance(msg.get("usage"), dict) else {}
if usage:
# Token extraction - prefer explicit totals
total_tok = usage.get("total_tokens") or usage.get("totalTokens")
input_tok = usage.get("input_tokens") or usage.get("inputTokens") or usage.get("input")
output_tok = usage.get("output_tokens") or usage.get("outputTokens") or usage.get("output")
if total_tok:
token_total += int(total_tok)
elif input_tok or output_tok:
token_total += int(input_tok or 0) + int(output_tok or 0)
if input_tok:
input_tokens += int(input_tok)
if output_tok:
output_tokens += int(output_tok)
for key in ("cost", "total_cost", "usd", "costUsd"):
value = usage.get(key)
if isinstance(value, (int, float)):
cost_total += float(value)
cached = usage.get("cache_hit") or usage.get("cacheHit")
cached_tokens = usage.get("cached_tokens") or usage.get("cachedTokens") or usage.get("cache_read_tokens") or usage.get("cacheReadTokens")
if isinstance(cached, bool):
cache_samples += 1
if cached:
cache_hits += 1
elif isinstance(cached_tokens, (int, float)):
cache_samples += 1
if cached_tokens > 0:
cache_hits += 1
latest = latest_per_agent.get(agent)
if latest is None or ts > latest["ts"]:
latest_per_agent[agent] = {"ts": ts, "summary": summary or "No recent assistant/tool summary"}
except Exception:
continue
active_cutoff = now - datetime.timedelta(minutes=60)
active_agents = [
{"agent": agent, "ts": info["ts"], "updated": fmt_local(info["ts"]), "summary": info["summary"]}
for agent, info in latest_per_agent.items()
if info["ts"] >= active_cutoff
]
active_agents.sort(key=lambda row: row["ts"], reverse=True)
return {
"tool_counts": tool_counts,
"error_rate": (error_count / total_events * 100.0) if total_events else 0.0,
"error_count": error_count,
"total_events": total_events,
"token_total": token_total,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"cost_total": cost_total,
"cache_hit_proxy": (cache_hits / cache_samples * 100.0) if cache_samples else None,
"cache_samples": cache_samples,
"active_summaries": active_agents,
}
def get_live_session_activity(session_key, agent_name, max_lines=100):
"""Extract detailed recent activity from a specific session's JSONL."""
if not AGENTS_DIR.exists():
return []
# Find the JSONL file for this session
agent_dir = AGENTS_DIR / agent_name
if not agent_dir.exists():
return []
# session_key format: agent:name:uuid or just uuid
session_id = session_key.split(":")[-1] if ":" in session_key else session_key
jsonl_path = agent_dir / "sessions" / f"{session_id}.jsonl"
if not jsonl_path.exists():
return []
activities = []
lines = read_tail_lines(jsonl_path, max_lines)
for line in lines:
line = line.strip()
if not line:
continue
try:
row = json.loads(line)
except Exception:
continue
msg = row.get("message") if isinstance(row.get("message"), dict) else {}
ts = parse_ts_any(row.get("timestamp") or msg.get("timestamp") or row.get("time") or msg.get("time"))
entry = {
"timestamp": ts,
"time": fmt_local(ts),
"role": msg.get("role", ""),
"type": "",
"summary": "",
"details": "",
}
content = msg.get("content") if isinstance(msg.get("content"), list) else []
for block in content:
if not isinstance(block, dict):
continue
if block.get("type") == "thinking":
entry["type"] = "thinking"
thinking = block.get("thinking") or ""
entry["summary"] = "Thinking..." if thinking else "(empty thinking)"
entry["details"] = thinking[:500] if thinking else ""
elif block.get("type") == "toolCall":
entry["type"] = "tool"
label = normalize_exec_tool_label(block)
entry["summary"] = f"β†’ {label}"
args = block.get("arguments", {})
if isinstance(args, dict):
# Summarize key arguments
arg_summary = []
for k, v in list(args.items())[:3]:
v_str = str(v)[:50]
arg_summary.append(f"{k}={v_str}")
entry["details"] = ", ".join(arg_summary)
elif isinstance(args, str):
entry["details"] = args[:100]
elif block.get("type") == "toolResult":
entry["type"] = "result"
content_data = block.get("content")
is_error = block.get("isError") or block.get("error")
entry["summary"] = "βœ— Error" if is_error else "βœ“ Result"
if isinstance(content_data, str):
entry["details"] = content_data[:200]
elif isinstance(content_data, list) and content_data:
entry["details"] = str(content_data[0])[:200]
elif block.get("type") == "text":
entry["type"] = "text"
text = (block.get("text") or "").strip()
entry["summary"] = text[:80].replace("\n", " ")
entry["details"] = text[:300]
if entry["type"] or entry["summary"]:
activities.append(entry)
return activities
def _load_sidecar_artifact(name: str):
"""Load a Session Amplifier artifact from the state directory. Returns {} if unavailable."""
path = STATE_DIR / "session_amplifier" / name
if not path.exists():
return None
try:
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
return None
def _sidecar_reachable() -> bool:
try:
import urllib.request
req = urllib.request.Request(f"{SESSION_AMPLIFIER_BASE}/health", method="GET")
urllib.request.urlopen(req, timeout=2)
return True
except Exception:
return False
def _fetch_sidecar_json(path: str, default=None):
"""GET a path from the sidecar API. Returns parsed JSON or default on failure."""
try:
import urllib.request
req = urllib.request.Request(f"{SESSION_AMPLIFIER_BASE}{path}", method="GET")
with urllib.request.urlopen(req, timeout=5) as resp:
return json.loads(resp.read().decode())
except Exception:
return default
def _load_ops_json(name: str, default=None):
path = STATE_DIR / name
try:
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
return default
def render_model_ops_tab(session_rows):
snapshot = _load_ops_json("model-ops-snapshot-latest.json", {}) or {}
merged = _load_ops_json("model-benchmarks-merged-latest.json", {}) or {}
cost_rows = _load_ops_json("session-cost-top50-latest.json", []) or []
st.subheader("Model Ops")
c1, c2, c3, c4 = st.columns(4)
c1.metric("OpenRouter models", snapshot.get("market_model_count", 0))
c2.metric("Merged benchmark models", len(merged.get("models", [])))
c3.metric("HF models", len((_load_ops_json("hf-benchmarks-latest.json", {}) or {}).get("models", [])))
c4.metric("Top cost rows", len(cost_rows))
st.markdown("### Benchmark sources")
src_rows = []
for src in snapshot.get("benchmark_source_status", []):
if isinstance(src, dict):
src_rows.append({"source": src.get("name"), "status": src.get("status")})
if src_rows:
st.dataframe(src_rows, use_container_width=True, hide_index=True)
else:
st.caption("No benchmark source metadata available")
st.markdown("### Cost coverage quality")
coverage = Counter((row.get("cost_source") or "unknown") for row in cost_rows)
cc1, cc2, cc3, cc4 = st.columns(4)
cc1.metric("Observed", coverage.get("observed", 0))
cc2.metric("Estimated", coverage.get("estimated", 0))
cc3.metric("Unknown", coverage.get("unknown", 0) + coverage.get("", 0))
cc4.metric("Trust score", f"{(snapshot.get('cost_trust_score', 0.0) * 100):.1f}%")
st.markdown("### Highest current observed session cost")
if cost_rows:
agent_filter = st.selectbox("Cost rows filter by agent", ["all"] + sorted({r.get('agent_id') or '' for r in cost_rows if r.get('agent_id')}), index=0)
filtered_cost = [r for r in cost_rows if agent_filter == 'all' or r.get('agent_id') == agent_filter]
st.dataframe(filtered_cost[:15], use_container_width=True, hide_index=True)
st.markdown("#### Inspect costly session")
lookup = {f"{r.get('agent_id')}/{r.get('session_id')} β€” {r.get('primary_model')} β€” ${r.get('total_estimated_usd')}": r for r in filtered_cost[:25]}
if lookup:
selected = st.selectbox("Jump to session details", list(lookup.keys()), index=0)
chosen = lookup[selected]
agent = chosen.get('agent_id')
session_id = chosen.get('session_id')
matching = [r for r in session_rows if r.get('agent') == agent and (r.get('sessionId') == session_id or session_id in (r.get('sessionKey') or ''))]
if matching:
match = matching[0]
st.caption(f"Session key: {match.get('sessionKey')}")
st.caption(f"Last updated: {match.get('updated')}")
st.caption(f"Delivery: {match.get('deliveryTo')}")
recent = get_live_session_activity(match.get('sessionKey'), agent, max_lines=40)
if recent:
st.dataframe([
{
'time': a.get('time'),
'type': a.get('type'),
'summary': a.get('summary'),
'details': (a.get('details') or '')[:120],
} for a in recent[-15:]
], use_container_width=True, hide_index=True)
else:
st.caption('No recent activity extracted from transcript.')
else:
st.caption('No matching session found in current session index.')
else:
st.caption("No session cost artifacts found")
show_legacy = st.checkbox("Show legacy / non-orchestration benchmark leaders", value=False)
st.markdown("### Top merged Arena Elo")
arena_elo = snapshot.get("top_arena_elo", [])
if show_legacy:
arena_elo = sorted([m for m in merged.get('models', []) if (m.get('arena') or {}).get('elo') is not None], key=lambda m: (m.get('arena') or {}).get('elo') or 0, reverse=True)[:10]
if arena_elo:
st.dataframe([
{"model": r.get("model"), "elo": (r.get("arena") or {}).get("elo"), "mapping": (r.get("mapping") or {}).get("confidence")}
for r in arena_elo
], use_container_width=True, hide_index=True)
else:
st.caption("No merged arena elo rows")
st.markdown("### Top merged Arena winrate")
arena_wr = snapshot.get("top_arena_winrate", [])
if show_legacy:
arena_wr = sorted([m for m in merged.get('models', []) if (m.get('arena') or {}).get('winrate') is not None], key=lambda m: (m.get('arena') or {}).get('winrate') or 0, reverse=True)[:10]
if arena_wr:
st.dataframe([
{
"model": r.get("model"),
"winrate": (r.get("arena") or {}).get("winrate"),
"appearances": (r.get("arena") or {}).get("appearances"),
"mapping": (r.get("mapping") or {}).get("confidence"),
}
for r in arena_wr
], use_container_width=True, hide_index=True)
else:
st.caption("No merged arena winrate rows")
st.markdown("### Highest heuristic efficiency")
eff = snapshot.get("top_efficiency", [])
if eff:
st.dataframe([
{
"model": r.get("id"),
"efficiency": r.get("efficiency_score"),
"input_per_1m": (r.get("pricing_per_1m") or {}).get("input"),
"context": r.get("context_length"),
}
for r in eff
], use_container_width=True, hide_index=True)
st.markdown("### Mapping confidence breakdown")
counts = Counter(((m.get("mapping") or {}).get("confidence") or "unknown") for m in merged.get("models", []))
st.json(dict(counts))
def main():
st.set_page_config(page_title="OpenClaw Ops Dashboard", layout="wide")
st.title("OpenClaw Ops Dashboard")
st.caption(f"Root: {OPENCLAW_ROOT}")
# Load Session Amplifier artifacts if available
sidecar_status = "unavailable"
sidecar_report = _load_sidecar_artifact("review-latest.json")
sidecar_skills = _load_sidecar_artifact("skills-latest.json")
if sidecar_report or sidecar_skills:
sidecar_status = "ok"
elif _sidecar_reachable():
sidecar_status = "reachable"
session_rows, session_lookup = session_index_rows()
activity_rows = transcript_activity_rows(session_lookup)
cron_rows = list_cron_jobs()
metrics = collect_metrics_last_24h()
running_cron = [row for row in cron_rows if row["runningAt"]]
subagent_rows = [row for row in session_rows if row["kind"] == "subagent"]
# Health summary
critical_jobs = [r for r in cron_rows if r["health"] == "critical"]
warning_jobs = [r for r in cron_rows if r["health"] == "warning"]
stale_jobs = [r for r in cron_rows if r["health"] == "stale"]
c1, c2, c3, c4, c5 = st.columns(5)
c1.metric("Agents", len({row["agent"] for row in session_rows}))
c2.metric("Indexed sessions", len(session_rows))
c3.metric("Running cron", len(running_cron))
c4.metric("Sidecar", sidecar_status.title(), help="Session Amplifier sidecar availability")
health_display = "βœ“" if not (critical_jobs or warning_jobs or stale_jobs) else f"⚠ {len(critical_jobs)}/{len(warning_jobs)}/{len(stale_jobs)}"
c5.metric("Health", health_display, help="Critical/Warning/Stale cron jobs")
tab_overview, tab_sessions, tab_cron, tab_activity, tab_live, tab_model_ops = st.tabs(
["Overview", "Sessions", "Cron", "Activity", "Live Session", "Model Ops"]
)
with tab_overview:
left, right = st.columns([1, 1])
with left:
st.subheader("Active agent summaries (last 60m)")
if metrics["active_summaries"]:
st.dataframe(metrics["active_summaries"], use_container_width=True, hide_index=True)
else:
st.info("No active agent summaries in the last 60 minutes.")
st.subheader("Recent sessions")
if session_rows:
st.dataframe(
[
{
"agent": row["agent"],
"kind": row["kind"],
"updated": row["updated"],
"sessionKey": row["sessionKey"],
"deliveryTo": row["deliveryTo"],
}
for row in session_rows[:20]
],
use_container_width=True,
hide_index=True,
)
else:
st.info("No indexed sessions found.")
with right:
st.subheader("Last 24h metrics")
m1, m2, m3, m4 = st.columns(4)
m1.metric("Error rate", f"{metrics['error_rate']:.2f}%")
m2.metric("Errors / events", f"{metrics['error_count']} / {metrics['total_events']}")
# Show tokens instead of cost (more reliable)
token_display = f"{metrics['token_total']:,}" if metrics['token_total'] else "0"
m3.metric("Tokens", token_display)
m4.metric("Cost", f"${metrics['cost_total']:.4f}")
st.caption(f"Input: {metrics['input_tokens']:,} | Output: {metrics['output_tokens']:,}")
if metrics["cache_hit_proxy"] is None:
st.caption("Cache-hit proxy unavailable in recent usage fields.")
else:
st.caption(f"Cache-hit proxy: {metrics['cache_hit_proxy']:.2f}% across {metrics['cache_samples']} samples.")
st.subheader("Top tools (last 24h)")
top_tools = [{"tool": tool, "count": count} for tool, count in metrics["tool_counts"].most_common(20)]
if top_tools:
st.dataframe(top_tools, use_container_width=True, hide_index=True)
else:
st.info("No tool invocation events found in the last 24 hours.")
# Session Amplifier findings
st.subheader("Session Amplifier")
if sidecar_report:
patterns = sidecar_report.get("failure_patterns", [])
if patterns:
st.warning(f"{len(patterns)} failure pattern(s) detected β€” see Activity tab for details")
for p in patterns[:5]:
st.caption(f"β€’ {p.get('description', p.get('pattern', '?'))} ({p.get('count', 0)})")
else:
st.success("No failure patterns detected")
if sidecar_report.get("sessions_reviewed"):
st.caption(f"Reviewed {sidecar_report['sessions_reviewed']} sessions")
elif sidecar_status == "reachable":
st.info("Sidecar reachable but not yet warmed up β€” run spool to populate")
else:
st.caption("Session Amplifier unavailable β€” see docs to deploy sidecar/session-amplifier/")
if sidecar_skills:
missing = sidecar_skills.get("mcps_missing_skill_surface", [])
if missing:
st.warning(f"{len(missing)} MCP(s) without skill surface: {', '.join(missing[:5])}")
else:
st.caption("All registered MCPs have skill surfaces βœ“")
with tab_sessions:
# Prefer sidecar session list when available
sidecar_sessions = _fetch_sidecar_json("/sessions/recent?limit=50")
if sidecar_sessions and sidecar_sessions.get("sessions"):
st.subheader("Recent sessions (Session Amplifier)")
st.caption(f"Fetched at {fmt_local(datetime.datetime.now(datetime.timezone.utc).isoformat())}")
sess_rows = []
for sr in sidecar_sessions["sessions"]:
health = sr.get("health", "ok")
hints = sr.get("hints", [])
hint_str = "; ".join(hints) if hints else ""
sess_rows.append({
"agent": sr.get("agent_id", "?"),
"session_id": sr.get("session_id", "?")[:32],
"events": sr.get("event_count", 0),
"tools": sr.get("tool_result_count", 0),
"errors": sr.get("error_count", 0),
"health": health.upper(),
"hints": hint_str,
"last_event": fmt_local(sr.get("last_event_at")),
})
st.dataframe(sess_rows, use_container_width=True, hide_index=True)
else:
st.subheader("Session index")
agent_names = ["all"] + sorted({row["agent"] for row in session_rows})
selected_agent = st.selectbox("Filter agent", agent_names, index=0)
selected_kind = st.selectbox("Filter kind", ["all", "session", "discord", "cron", "subagent"], index=0)
filtered_rows = []
for row in session_rows:
if selected_agent != "all" and row["agent"] != selected_agent:
continue
if selected_kind != "all" and row["kind"] != selected_kind:
continue
filtered_rows.append(
{
"agent": row["agent"],
"kind": row["kind"],
"updated": row["updated"],
"sessionKey": row["sessionKey"],
"sessionId": row["sessionId"],
"deliveryTo": row["deliveryTo"],
}
)
st.dataframe(filtered_rows, use_container_width=True, hide_index=True)
st.subheader("Recent child/subagent sessions")
st.dataframe(
[
{
"agent": row["agent"],
"updated": row["updated"],
"sessionKey": row["sessionKey"],
"sessionId": row["sessionId"],
}
for row in subagent_rows[:50]
],
use_container_width=True,
hide_index=True,
)
with tab_cron:
# Health filter
health_filter = st.selectbox(
"Filter by health",
["all", "critical", "error", "warning", "stale", "running", "healthy", "disabled"],
index=0
)
filtered_cron = cron_rows
if health_filter != "all":
filtered_cron = [r for r in cron_rows if r["health"] == health_filter]
st.subheader(f"Cron jobs ({len(filtered_cron)} shown)")
# Color-coded health display
def health_color(health):
return {
"critical": "πŸ”΄",
"error": "🟠",
"warning": "🟑",
"stale": "βšͺ",
"running": "🟒",
"healthy": "βœ“",
"disabled": "⊘",
}.get(health, "?")
display_rows = []
for row in filtered_cron:
display_rows.append({
"health": f"{health_color(row['health'])} {row['health']}",
"name": row["name"],
"enabled": "βœ“" if row["enabled"] else "βœ—",
"schedule": row["schedule"],
"lastRun": row["lastRun"],
"lastStatus": row["lastStatus"],
"nextRun": row["nextRun"],
"errors": row["consecutiveErrors"] if row["consecutiveErrors"] > 0 else "",
"reason": row["healthReason"],
})
st.dataframe(display_rows, use_container_width=True, hide_index=True)
# Quick stats
if critical_jobs or warning_jobs or stale_jobs:
st.error(f"**{len(critical_jobs)}** critical, **{len(warning_jobs)}** warning, **{len(stale_jobs)}** stale jobs need attention")
else:
st.success("All monitored cron jobs are healthy")
st.subheader("Running now")
if running_cron:
st.dataframe(running_cron, use_container_width=True, hide_index=True)
else:
st.info("No cron jobs currently marked running.")
with tab_activity:
st.subheader("Recent transcript activity")
st.dataframe(
[
{
"agent": row["agent"],
"updated": row["updated"],
"role": row["role"],
"sessionKey": row["sessionKey"],
"summary": row["summary"],
"file": row["file"],
}
for row in activity_rows[:50]
],
use_container_width=True,
hide_index=True,
)
with tab_live:
st.subheader("Live Session Activity Stream")
# Prefer sidecar-based session list for selection when available
sidecar_sessions = _fetch_sidecar_json("/sessions/recent?limit=30") or {}
sidecar_session_list = sidecar_sessions.get("sessions") or []
# Show spooler health / readiness
if sidecar_status == "reachable":
st.info("Session Amplifier reachable but not yet warmed up β€” spool may be empty")
elif sidecar_status == "unavailable":
st.warning("Session Amplifier unavailable. Deploy: cd sidecar/session-amplifier && docker compose up -d")
st.caption("Normalized activity feed from Session Amplifier. "
"For continuous streaming, use: python scripts/session_amplifier_live_monitor.py")
# Build unified session options: sidecar sessions first, then fallback to dashboard sessions
if sidecar_session_list:
sidecar_opts = [
f"[SA] {s.get('agent_id','?')}: {clean_session_label(s.get('display_title') or '', fallback=s.get('session_id','?')[:32])}"
for s in sidecar_session_list[:20]
]
sidecar_map = {opt: s for opt, s in zip(sidecar_opts, sidecar_session_list[:20])}
use_sidecar = True
else:
recent = [r for r in session_rows if r["updatedAt"] and r["updatedAt"] > datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=24)]
sidecar_opts = []
sidecar_map = {}
use_sidecar = False
# Fallback: dashboard-based session list
recent_sessions = [r for r in session_rows if r["updatedAt"] and r["updatedAt"] > datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=24)]
dash_opts = [f"{r['agent']}: {session_display_label(r)}" for r in recent_sessions[:20]]
dash_map = {opt: r for opt, r in zip(dash_opts, recent_sessions[:20])}
# Combine: sidecar sessions first, then dashboard sessions
all_opts = sidecar_opts + dash_opts
all_map = {**sidecar_map, **dash_map}
if all_opts:
selected = st.selectbox("Select session to monitor", all_opts, index=0)
selected_item = all_map[selected]
if use_sidecar and selected in sidecar_map:
sel_session_id = selected_item["session_id"]
sel_agent = selected_item["agent_id"]
sel_title = clean_session_label(selected_item.get("display_title") or "", fallback=sel_session_id)
sel_updated = fmt_local(selected_item.get("last_event_at"))
sel_health = selected_item.get("health", "ok").upper()
sel_hints = "; ".join(selected_item.get("hints", []) or [])
st.caption(f"Health: {sel_health} | Events: {selected_item.get('event_count',0)} | Tools: {selected_item.get('tool_result_count',0)} | Errors: {selected_item.get('error_count',0)}")
if sel_hints:
st.caption(f"Hints: {sel_hints}")
# Fetch activity from sidecar
activity_data = _fetch_sidecar_json(f"/session/{sel_session_id}/activity?limit=200")
activities = activity_data.get("activity", []) if activity_data else []
else:
if isinstance(selected_item, dict) and "sessionKey" in selected_item:
sel_session_id = selected_item["sessionKey"]
sel_agent = selected_item["agent"]
sel_title = session_display_label(selected_item)
sel_updated = selected_item["updated"]
else:
sel_session_id = "?"
sel_agent = "?"
sel_title = "?"
sel_updated = "?"
st.caption(f"Dashboard session β€” may not appear in Session Amplifier spool")
activities = get_live_session_activity(sel_session_id, sel_agent, max_lines=200)
col1, col2, col3 = st.columns([1, 1, 1])
with col1:
st.write(f"**Agent:** {sel_agent}")
with col2:
st.write(f"**Session:** {sel_title}")
with col3:
st.write(f"**Last event:** {sel_updated}")
if activities:
current_bucket = None
for act in reversed(activities[-50:]):
ts = act.get("timestamp")
if ts:
try:
bucket = ts.replace(minute=(ts.minute // 5) * 5, second=0, microsecond=0)
if bucket != current_bucket:
current_bucket = bucket
st.divider()
st.caption(f"πŸ“ {fmt_local(bucket)}")
except Exception:
pass
evtype = act.get("type") or act.get("event_type", "")
ts_str = act.get("time") or ""
if evtype == "thinking":
with st.expander(f"🧠 {ts_str} β€” Thinking", expanded=False):
st.text(act.get("details", "")[:1000] or "(no details)")
elif evtype == "tool":
st.markdown(f"**πŸ”§ {ts_str}** β€” {act.get('summary', '')}")
if act.get("details"):
st.code(act["details"][:500], language="bash")
elif evtype == "tool_call":
st.markdown(f"**βš™ {ts_str}** β†’ {act.get('tool_name', '')} β€” {act.get('summary', '')}")
elif evtype == "tool_result" or evtype == "result":
icon = "❌" if act.get("is_error") else "βœ…"
st.markdown(f"**{icon} {ts_str}** β€” {act.get('summary', '')}")
if act.get("details"):
st.text(act["details"][:300])
elif evtype == "tool_error":
st.error(f"⚠ {ts_str} β€” {act.get('summary', 'tool error')}")
if act.get("details"):
st.text(act["details"][:300])
elif evtype == "assistant_meta":
st.markdown(f"**πŸ’‘ {ts_str}** β€” {act.get('summary', '')[:120]}")
elif evtype == "assistant_thinking":
with st.expander(f"🧠 {ts_str} β€” Thinking", expanded=False):
st.text(act.get("details", "")[:500] or "(no details)")
elif evtype == "assistant_text":
st.markdown(f"**πŸ’¬ {ts_str}** β€” {act.get('summary', '')[:120]}")
elif evtype == "user_message":
with st.expander(f"πŸ‘€ {ts_str} β€” User message", expanded=False):
st.text(act.get("details", "")[:300])
else:
st.caption(f"[{evtype or '?'}] {ts_str} β€” {act.get('summary', '')[:80]}")
else:
st.info("No activity found for this session (may be archived or not yet written)")
else:
st.info("No recent sessions found in the last 24 hours")
with tab_model_ops:
render_model_ops_tab(session_rows)
if __name__ == "__main__":
main()