import datetime import json import os import re import time from collections import Counter, deque from pathlib import Path from zoneinfo import ZoneInfo import streamlit as st if os.name == "nt" or Path("C:/openclaw-biz/state/openclaw").exists(): OPENCLAW_ROOT = Path("C:/openclaw-biz/state/openclaw") else: OPENCLAW_ROOT = Path("/home/node/.openclaw") AGENTS_DIR = OPENCLAW_ROOT / "agents" CRON_DIR = OPENCLAW_ROOT / "cron" STATE_DIR = Path("/home/node/.openclaw/workspace/ops/state") SESSION_AMPLIFIER_BASE = os.environ.get("SESSION_AMPLIFIER_BASE_URL", "http://session-amplifier:8477") LOCAL_TZ = ZoneInfo("America/New_York") def parse_ts_any(ts): if ts is None: return None if isinstance(ts, datetime.datetime): return ts if ts.tzinfo else ts.replace(tzinfo=datetime.timezone.utc) if isinstance(ts, (int, float)): try: value = float(ts) if value > 1e11: value /= 1000.0 return datetime.datetime.fromtimestamp(value, tz=datetime.timezone.utc) except Exception: return None if isinstance(ts, str): text = ts.strip() if not text: return None if text.endswith("Z"): text = text[:-1] + "+00:00" try: dt = datetime.datetime.fromisoformat(text) return dt if dt.tzinfo else dt.replace(tzinfo=datetime.timezone.utc) except Exception: return None return None def fmt_local(ts): dt = parse_ts_any(ts) if dt is None: return "" try: return dt.astimezone(LOCAL_TZ).strftime("%Y-%m-%d %I:%M:%S %p %Z") except Exception: return dt.isoformat() def fmt_duration(seconds): """Format seconds into human-readable duration.""" if seconds < 60: return f"{int(seconds)}s" elif seconds < 3600: return f"{int(seconds/60)}m" elif seconds < 86400: return f"{int(seconds/3600)}h" else: return f"{int(seconds/86400)}d" def read_json(path: Path): try: return json.loads(path.read_text(encoding="utf-8")) except Exception: return None def read_tail_lines(path: Path, count: int = 80): try: with path.open("r", encoding="utf-8", errors="ignore") as fh: return list(deque(fh, maxlen=count)) except Exception: return [] def clean_session_label(value: str, *, fallback: str = "") -> str: text = (value or "").strip() if not text: return fallback text = re.sub(r"^Discord thread\s+#?[^›]+›\s*", "", text) text = re.sub(r"\s+channel id:\d+\s*$", "", text, flags=re.IGNORECASE) text = text.strip(" #") text = text.replace("-", " ") text = re.sub(r"\s+", " ", text) return text[:96] or fallback def session_display_label(row: dict) -> str: label = clean_session_label(row.get("label") or "") if label: return label key = row.get("sessionKey") or row.get("sessionId") or "session" kind = row.get("kind") or "session" return f"{kind}: {str(key)[:32]}" def normalize_exec_tool_label(content_block): try: name = content_block.get("name") or "unknown_tool" if name != "exec": return name args = content_block.get("arguments") cmd = None if isinstance(args, dict): cmd = args.get("command") or args.get("cmd") elif isinstance(args, str): cmd = args if not isinstance(cmd, str): return "exec" if "mcporter" not in cmd.strip().lower(): return "exec" import re match = re.search(r"(?:^|\s)mcporter\s+call\s+([A-Za-z0-9_\-\.]+)", cmd) if match: return f"exec mcporter {match.group(1)}" return "exec mcporter" except Exception: return content_block.get("name") or "unknown_tool" def session_index_rows(): rows = [] session_key_by_agent_and_id = {} if not AGENTS_DIR.exists(): return rows, session_key_by_agent_and_id for agent_dir in sorted(AGENTS_DIR.iterdir()): if not agent_dir.is_dir() or agent_dir.name.startswith("."): continue sessions_file = agent_dir / "sessions" / "sessions.json" data = read_json(sessions_file) if not isinstance(data, dict): continue for session_key, meta in data.items(): if not isinstance(meta, dict): continue updated_raw = meta.get("updatedAt") or meta.get("updatedAtMs") session_id = meta.get("sessionId") or "" delivery = meta.get("deliveryContext") if isinstance(meta.get("deliveryContext"), dict) else {} kind = "session" if ":subagent:" in session_key: kind = "subagent" elif ":cron:" in session_key: kind = "cron" elif ":discord:" in session_key: kind = "discord" origin = meta.get("origin") if isinstance(meta.get("origin"), dict) else {} label = ( meta.get("displayName") or meta.get("derivedTitle") or meta.get("title") or origin.get("label") or meta.get("groupChannel") or "" ) row = { "agent": agent_dir.name, "sessionKey": session_key, "sessionId": session_id, "label": clean_session_label(str(label), fallback=session_key), "updatedAt": parse_ts_any(updated_raw), "updated": fmt_local(updated_raw), "deliveryTo": delivery.get("to") or "", "kind": kind, } rows.append(row) if session_id: session_key_by_agent_and_id[(agent_dir.name, session_id)] = row rows.sort( key=lambda row: row["updatedAt"] or datetime.datetime.min.replace(tzinfo=datetime.timezone.utc), reverse=True, ) return rows, session_key_by_agent_and_id def transcript_activity_rows(session_lookup, max_files=80): rows = [] if not AGENTS_DIR.exists(): return rows jsonl_files = sorted( AGENTS_DIR.rglob("sessions/*.jsonl"), key=lambda path: path.stat().st_mtime if path.exists() else 0, reverse=True, )[:max_files] for path in jsonl_files: agent = path.parts[-3] if len(path.parts) >= 3 else "unknown" session_id = path.stem session_meta = session_lookup.get((agent, session_id)) session_key = session_meta.get("sessionKey") if isinstance(session_meta, dict) else session_meta session_key = session_key or session_id label = session_meta.get("label") if isinstance(session_meta, dict) else "" last_ts = None summary = "" role = "" for line in reversed(read_tail_lines(path, 120)): line = line.strip() if not line: continue try: row = json.loads(line) except Exception: continue msg = row.get("message") if isinstance(row.get("message"), dict) else {} ts = parse_ts_any(row.get("timestamp") or row.get("time") or msg.get("timestamp") or msg.get("time")) if ts and last_ts is None: last_ts = ts if not summary and isinstance(msg, dict): role = msg.get("role") or row.get("type") or "" content = msg.get("content") if isinstance(msg.get("content"), list) else [] for block in content: if not isinstance(block, dict): continue if block.get("type") == "toolCall": summary = f"tool: {normalize_exec_tool_label(block)}" break if block.get("type") == "text": text = (block.get("text") or "").strip().replace("\n", " ") if text: summary = text[:160] break if not summary and msg.get("errorMessage"): summary = str(msg.get("errorMessage"))[:160] if last_ts and summary: break rows.append( { "agent": agent, "sessionKey": session_key, "sessionId": session_id, "label": label or clean_session_label(session_key, fallback=session_id), "updatedAt": last_ts, "updated": fmt_local(last_ts), "role": role, "summary": summary or "(no recent summary)", "file": str(path), } ) rows.sort( key=lambda row: row["updatedAt"] or datetime.datetime.min.replace(tzinfo=datetime.timezone.utc), reverse=True, ) return rows def list_cron_jobs(limit=100): """Enhanced cron job listing with health status and staleness detection.""" jobs_file = CRON_DIR / "jobs.json" data = read_json(jobs_file) jobs = data.get("jobs", []) if isinstance(data, dict) else [] rows = [] now = datetime.datetime.now(datetime.timezone.utc) now_ms = time.time() * 1000 for job in jobs[:limit]: if not isinstance(job, dict): continue state = job.get("state") if isinstance(job.get("state"), dict) else {} schedule = job.get("schedule") if isinstance(job.get("schedule"), dict) else {} payload = job.get("payload") if isinstance(job.get("payload"), dict) else {} last_run_ms = state.get("lastRunAtMs") next_run_ms = state.get("nextRunAtMs") consecutive_errors = state.get("consecutiveErrors", 0) last_status = state.get("lastStatus") or state.get("lastRunStatus") or "" # Calculate staleness staleness = None if last_run_ms and next_run_ms: # If next run was scheduled but hasn't happened yet, check if we're past due if now_ms > next_run_ms + 300000: # 5 min grace period staleness = fmt_duration((now_ms - next_run_ms) / 1000) elif last_run_ms and schedule.get("expr"): # Estimate staleness based on schedule (rough: check if > 2x expected interval) pass # Skip for now - would need cron parsing # Health status health = "healthy" health_reason = "" if not job.get("enabled", True): health = "disabled" elif consecutive_errors >= 3: health = "critical" health_reason = f"{consecutive_errors} consecutive errors" elif consecutive_errors > 0: health = "warning" health_reason = f"{consecutive_errors} error(s)" elif last_status == "error": health = "error" elif staleness: health = "stale" health_reason = f"overdue by {staleness}" elif state.get("runningAtMs"): health = "running" rows.append( { "name": job.get("name") or job.get("id") or "unnamed", "jobId": job.get("id") or "", "enabled": bool(job.get("enabled", True)), "kind": payload.get("kind") or "", "model": payload.get("model") or "", "schedule": schedule.get("expr") or schedule.get("kind") or "", "tz": schedule.get("tz") or "", "nextRunAt": parse_ts_any(state.get("nextRunAtMs") or state.get("nextRunAt")), "nextRun": fmt_local(state.get("nextRunAtMs") or state.get("nextRunAt")), "runningAt": parse_ts_any(state.get("runningAtMs") or state.get("runningAt")), "running": fmt_local(state.get("runningAtMs") or state.get("runningAt")), "lastRunAt": parse_ts_any(state.get("lastRunAtMs") or state.get("lastRunAt")), "lastRun": fmt_local(state.get("lastRunAtMs") or state.get("lastRunAt")), "lastStatus": last_status, "sessionKey": job.get("sessionKey") or "", "consecutiveErrors": consecutive_errors, "health": health, "healthReason": health_reason, "staleness": staleness, } ) rows.sort( key=lambda row: ( {"critical": 0, "error": 1, "warning": 2, "stale": 3, "running": 4, "healthy": 5, "disabled": 6}.get(row["health"], 7), row["runningAt"] or row["nextRunAt"] or row["lastRunAt"] or datetime.datetime.min.replace(tzinfo=datetime.timezone.utc) ), reverse=False, ) return rows def collect_metrics_last_24h(max_files=500): now = datetime.datetime.now(datetime.timezone.utc) cutoff = now - datetime.timedelta(hours=24) jsonl_files = sorted( AGENTS_DIR.rglob("sessions/*.jsonl"), key=lambda path: path.stat().st_mtime if path.exists() else 0, reverse=True, )[:max_files] tool_counts = Counter() error_count = 0 total_events = 0 token_total = 0 input_tokens = 0 output_tokens = 0 cost_total = 0.0 cache_hits = 0 cache_samples = 0 latest_per_agent = {} for path in jsonl_files: agent = path.parts[-3] if len(path.parts) >= 3 else "unknown" try: with path.open("r", encoding="utf-8", errors="ignore") as fh: for line in fh: line = line.strip() if not line: continue try: row = json.loads(line) except Exception: continue msg = row.get("message") if isinstance(row.get("message"), dict) else {} ts = parse_ts_any(row.get("timestamp") or msg.get("timestamp") or row.get("time") or msg.get("time")) if ts is None or ts < cutoff: continue total_events += 1 if msg.get("errorMessage") or msg.get("stopReason") == "error": error_count += 1 summary = None content = msg.get("content") if isinstance(msg.get("content"), list) else [] for block in content: if not isinstance(block, dict): continue if block.get("type") == "toolCall": label = normalize_exec_tool_label(block) tool_counts[label] += 1 summary = summary or f"tool: {label}" elif block.get("type") == "text" and msg.get("role") == "assistant": text = (block.get("text") or "").strip().replace("\n", " ") if text and summary is None: summary = text[:120] usage = msg.get("usage") if isinstance(msg.get("usage"), dict) else {} if usage: # Token extraction - prefer explicit totals total_tok = usage.get("total_tokens") or usage.get("totalTokens") input_tok = usage.get("input_tokens") or usage.get("inputTokens") or usage.get("input") output_tok = usage.get("output_tokens") or usage.get("outputTokens") or usage.get("output") if total_tok: token_total += int(total_tok) elif input_tok or output_tok: token_total += int(input_tok or 0) + int(output_tok or 0) if input_tok: input_tokens += int(input_tok) if output_tok: output_tokens += int(output_tok) for key in ("cost", "total_cost", "usd", "costUsd"): value = usage.get(key) if isinstance(value, (int, float)): cost_total += float(value) cached = usage.get("cache_hit") or usage.get("cacheHit") cached_tokens = usage.get("cached_tokens") or usage.get("cachedTokens") or usage.get("cache_read_tokens") or usage.get("cacheReadTokens") if isinstance(cached, bool): cache_samples += 1 if cached: cache_hits += 1 elif isinstance(cached_tokens, (int, float)): cache_samples += 1 if cached_tokens > 0: cache_hits += 1 latest = latest_per_agent.get(agent) if latest is None or ts > latest["ts"]: latest_per_agent[agent] = {"ts": ts, "summary": summary or "No recent assistant/tool summary"} except Exception: continue active_cutoff = now - datetime.timedelta(minutes=60) active_agents = [ {"agent": agent, "ts": info["ts"], "updated": fmt_local(info["ts"]), "summary": info["summary"]} for agent, info in latest_per_agent.items() if info["ts"] >= active_cutoff ] active_agents.sort(key=lambda row: row["ts"], reverse=True) return { "tool_counts": tool_counts, "error_rate": (error_count / total_events * 100.0) if total_events else 0.0, "error_count": error_count, "total_events": total_events, "token_total": token_total, "input_tokens": input_tokens, "output_tokens": output_tokens, "cost_total": cost_total, "cache_hit_proxy": (cache_hits / cache_samples * 100.0) if cache_samples else None, "cache_samples": cache_samples, "active_summaries": active_agents, } def get_live_session_activity(session_key, agent_name, max_lines=100): """Extract detailed recent activity from a specific session's JSONL.""" if not AGENTS_DIR.exists(): return [] # Find the JSONL file for this session agent_dir = AGENTS_DIR / agent_name if not agent_dir.exists(): return [] # session_key format: agent:name:uuid or just uuid session_id = session_key.split(":")[-1] if ":" in session_key else session_key jsonl_path = agent_dir / "sessions" / f"{session_id}.jsonl" if not jsonl_path.exists(): return [] activities = [] lines = read_tail_lines(jsonl_path, max_lines) for line in lines: line = line.strip() if not line: continue try: row = json.loads(line) except Exception: continue msg = row.get("message") if isinstance(row.get("message"), dict) else {} ts = parse_ts_any(row.get("timestamp") or msg.get("timestamp") or row.get("time") or msg.get("time")) entry = { "timestamp": ts, "time": fmt_local(ts), "role": msg.get("role", ""), "type": "", "summary": "", "details": "", } content = msg.get("content") if isinstance(msg.get("content"), list) else [] for block in content: if not isinstance(block, dict): continue if block.get("type") == "thinking": entry["type"] = "thinking" thinking = block.get("thinking") or "" entry["summary"] = "Thinking..." if thinking else "(empty thinking)" entry["details"] = thinking[:500] if thinking else "" elif block.get("type") == "toolCall": entry["type"] = "tool" label = normalize_exec_tool_label(block) entry["summary"] = f"→ {label}" args = block.get("arguments", {}) if isinstance(args, dict): # Summarize key arguments arg_summary = [] for k, v in list(args.items())[:3]: v_str = str(v)[:50] arg_summary.append(f"{k}={v_str}") entry["details"] = ", ".join(arg_summary) elif isinstance(args, str): entry["details"] = args[:100] elif block.get("type") == "toolResult": entry["type"] = "result" content_data = block.get("content") is_error = block.get("isError") or block.get("error") entry["summary"] = "✗ Error" if is_error else "✓ Result" if isinstance(content_data, str): entry["details"] = content_data[:200] elif isinstance(content_data, list) and content_data: entry["details"] = str(content_data[0])[:200] elif block.get("type") == "text": entry["type"] = "text" text = (block.get("text") or "").strip() entry["summary"] = text[:80].replace("\n", " ") entry["details"] = text[:300] if entry["type"] or entry["summary"]: activities.append(entry) return activities def _load_sidecar_artifact(name: str): """Load a Session Amplifier artifact from the state directory. Returns {} if unavailable.""" path = STATE_DIR / "session_amplifier" / name if not path.exists(): return None try: return json.loads(path.read_text(encoding="utf-8")) except Exception: return None def _sidecar_reachable() -> bool: try: import urllib.request req = urllib.request.Request(f"{SESSION_AMPLIFIER_BASE}/health", method="GET") urllib.request.urlopen(req, timeout=2) return True except Exception: return False def _fetch_sidecar_json(path: str, default=None): """GET a path from the sidecar API. Returns parsed JSON or default on failure.""" try: import urllib.request req = urllib.request.Request(f"{SESSION_AMPLIFIER_BASE}{path}", method="GET") with urllib.request.urlopen(req, timeout=5) as resp: return json.loads(resp.read().decode()) except Exception: return default def _load_ops_json(name: str, default=None): path = STATE_DIR / name try: return json.loads(path.read_text(encoding="utf-8")) except Exception: return default def render_model_ops_tab(session_rows): snapshot = _load_ops_json("model-ops-snapshot-latest.json", {}) or {} merged = _load_ops_json("model-benchmarks-merged-latest.json", {}) or {} cost_rows = _load_ops_json("session-cost-top50-latest.json", []) or [] st.subheader("Model Ops") c1, c2, c3, c4 = st.columns(4) c1.metric("OpenRouter models", snapshot.get("market_model_count", 0)) c2.metric("Merged benchmark models", len(merged.get("models", []))) c3.metric("HF models", len((_load_ops_json("hf-benchmarks-latest.json", {}) or {}).get("models", []))) c4.metric("Top cost rows", len(cost_rows)) st.markdown("### Benchmark sources") src_rows = [] for src in snapshot.get("benchmark_source_status", []): if isinstance(src, dict): src_rows.append({"source": src.get("name"), "status": src.get("status")}) if src_rows: st.dataframe(src_rows, use_container_width=True, hide_index=True) else: st.caption("No benchmark source metadata available") st.markdown("### Cost coverage quality") coverage = Counter((row.get("cost_source") or "unknown") for row in cost_rows) cc1, cc2, cc3, cc4 = st.columns(4) cc1.metric("Observed", coverage.get("observed", 0)) cc2.metric("Estimated", coverage.get("estimated", 0)) cc3.metric("Unknown", coverage.get("unknown", 0) + coverage.get("", 0)) cc4.metric("Trust score", f"{(snapshot.get('cost_trust_score', 0.0) * 100):.1f}%") st.markdown("### Highest current observed session cost") if cost_rows: agent_filter = st.selectbox("Cost rows filter by agent", ["all"] + sorted({r.get('agent_id') or '' for r in cost_rows if r.get('agent_id')}), index=0) filtered_cost = [r for r in cost_rows if agent_filter == 'all' or r.get('agent_id') == agent_filter] st.dataframe(filtered_cost[:15], use_container_width=True, hide_index=True) st.markdown("#### Inspect costly session") lookup = {f"{r.get('agent_id')}/{r.get('session_id')} — {r.get('primary_model')} — ${r.get('total_estimated_usd')}": r for r in filtered_cost[:25]} if lookup: selected = st.selectbox("Jump to session details", list(lookup.keys()), index=0) chosen = lookup[selected] agent = chosen.get('agent_id') session_id = chosen.get('session_id') matching = [r for r in session_rows if r.get('agent') == agent and (r.get('sessionId') == session_id or session_id in (r.get('sessionKey') or ''))] if matching: match = matching[0] st.caption(f"Session key: {match.get('sessionKey')}") st.caption(f"Last updated: {match.get('updated')}") st.caption(f"Delivery: {match.get('deliveryTo')}") recent = get_live_session_activity(match.get('sessionKey'), agent, max_lines=40) if recent: st.dataframe([ { 'time': a.get('time'), 'type': a.get('type'), 'summary': a.get('summary'), 'details': (a.get('details') or '')[:120], } for a in recent[-15:] ], use_container_width=True, hide_index=True) else: st.caption('No recent activity extracted from transcript.') else: st.caption('No matching session found in current session index.') else: st.caption("No session cost artifacts found") show_legacy = st.checkbox("Show legacy / non-orchestration benchmark leaders", value=False) st.markdown("### Top merged Arena Elo") arena_elo = snapshot.get("top_arena_elo", []) if show_legacy: arena_elo = sorted([m for m in merged.get('models', []) if (m.get('arena') or {}).get('elo') is not None], key=lambda m: (m.get('arena') or {}).get('elo') or 0, reverse=True)[:10] if arena_elo: st.dataframe([ {"model": r.get("model"), "elo": (r.get("arena") or {}).get("elo"), "mapping": (r.get("mapping") or {}).get("confidence")} for r in arena_elo ], use_container_width=True, hide_index=True) else: st.caption("No merged arena elo rows") st.markdown("### Top merged Arena winrate") arena_wr = snapshot.get("top_arena_winrate", []) if show_legacy: arena_wr = sorted([m for m in merged.get('models', []) if (m.get('arena') or {}).get('winrate') is not None], key=lambda m: (m.get('arena') or {}).get('winrate') or 0, reverse=True)[:10] if arena_wr: st.dataframe([ { "model": r.get("model"), "winrate": (r.get("arena") or {}).get("winrate"), "appearances": (r.get("arena") or {}).get("appearances"), "mapping": (r.get("mapping") or {}).get("confidence"), } for r in arena_wr ], use_container_width=True, hide_index=True) else: st.caption("No merged arena winrate rows") st.markdown("### Highest heuristic efficiency") eff = snapshot.get("top_efficiency", []) if eff: st.dataframe([ { "model": r.get("id"), "efficiency": r.get("efficiency_score"), "input_per_1m": (r.get("pricing_per_1m") or {}).get("input"), "context": r.get("context_length"), } for r in eff ], use_container_width=True, hide_index=True) st.markdown("### Mapping confidence breakdown") counts = Counter(((m.get("mapping") or {}).get("confidence") or "unknown") for m in merged.get("models", [])) st.json(dict(counts)) def main(): st.set_page_config(page_title="OpenClaw Ops Dashboard", layout="wide") st.title("OpenClaw Ops Dashboard") st.caption(f"Root: {OPENCLAW_ROOT}") # Load Session Amplifier artifacts if available sidecar_status = "unavailable" sidecar_report = _load_sidecar_artifact("review-latest.json") sidecar_skills = _load_sidecar_artifact("skills-latest.json") if sidecar_report or sidecar_skills: sidecar_status = "ok" elif _sidecar_reachable(): sidecar_status = "reachable" session_rows, session_lookup = session_index_rows() activity_rows = transcript_activity_rows(session_lookup) cron_rows = list_cron_jobs() metrics = collect_metrics_last_24h() running_cron = [row for row in cron_rows if row["runningAt"]] subagent_rows = [row for row in session_rows if row["kind"] == "subagent"] # Health summary critical_jobs = [r for r in cron_rows if r["health"] == "critical"] warning_jobs = [r for r in cron_rows if r["health"] == "warning"] stale_jobs = [r for r in cron_rows if r["health"] == "stale"] c1, c2, c3, c4, c5 = st.columns(5) c1.metric("Agents", len({row["agent"] for row in session_rows})) c2.metric("Indexed sessions", len(session_rows)) c3.metric("Running cron", len(running_cron)) c4.metric("Sidecar", sidecar_status.title(), help="Session Amplifier sidecar availability") health_display = "✓" if not (critical_jobs or warning_jobs or stale_jobs) else f"⚠ {len(critical_jobs)}/{len(warning_jobs)}/{len(stale_jobs)}" c5.metric("Health", health_display, help="Critical/Warning/Stale cron jobs") tab_overview, tab_sessions, tab_cron, tab_activity, tab_live, tab_model_ops = st.tabs( ["Overview", "Sessions", "Cron", "Activity", "Live Session", "Model Ops"] ) with tab_overview: left, right = st.columns([1, 1]) with left: st.subheader("Active agent summaries (last 60m)") if metrics["active_summaries"]: st.dataframe(metrics["active_summaries"], use_container_width=True, hide_index=True) else: st.info("No active agent summaries in the last 60 minutes.") st.subheader("Recent sessions") if session_rows: st.dataframe( [ { "agent": row["agent"], "kind": row["kind"], "updated": row["updated"], "sessionKey": row["sessionKey"], "deliveryTo": row["deliveryTo"], } for row in session_rows[:20] ], use_container_width=True, hide_index=True, ) else: st.info("No indexed sessions found.") with right: st.subheader("Last 24h metrics") m1, m2, m3, m4 = st.columns(4) m1.metric("Error rate", f"{metrics['error_rate']:.2f}%") m2.metric("Errors / events", f"{metrics['error_count']} / {metrics['total_events']}") # Show tokens instead of cost (more reliable) token_display = f"{metrics['token_total']:,}" if metrics['token_total'] else "0" m3.metric("Tokens", token_display) m4.metric("Cost", f"${metrics['cost_total']:.4f}") st.caption(f"Input: {metrics['input_tokens']:,} | Output: {metrics['output_tokens']:,}") if metrics["cache_hit_proxy"] is None: st.caption("Cache-hit proxy unavailable in recent usage fields.") else: st.caption(f"Cache-hit proxy: {metrics['cache_hit_proxy']:.2f}% across {metrics['cache_samples']} samples.") st.subheader("Top tools (last 24h)") top_tools = [{"tool": tool, "count": count} for tool, count in metrics["tool_counts"].most_common(20)] if top_tools: st.dataframe(top_tools, use_container_width=True, hide_index=True) else: st.info("No tool invocation events found in the last 24 hours.") # Session Amplifier findings st.subheader("Session Amplifier") if sidecar_report: patterns = sidecar_report.get("failure_patterns", []) if patterns: st.warning(f"{len(patterns)} failure pattern(s) detected — see Activity tab for details") for p in patterns[:5]: st.caption(f"• {p.get('description', p.get('pattern', '?'))} ({p.get('count', 0)})") else: st.success("No failure patterns detected") if sidecar_report.get("sessions_reviewed"): st.caption(f"Reviewed {sidecar_report['sessions_reviewed']} sessions") elif sidecar_status == "reachable": st.info("Sidecar reachable but not yet warmed up — run spool to populate") else: st.caption("Session Amplifier unavailable — see docs to deploy sidecar/session-amplifier/") if sidecar_skills: missing = sidecar_skills.get("mcps_missing_skill_surface", []) if missing: st.warning(f"{len(missing)} MCP(s) without skill surface: {', '.join(missing[:5])}") else: st.caption("All registered MCPs have skill surfaces ✓") with tab_sessions: # Prefer sidecar session list when available sidecar_sessions = _fetch_sidecar_json("/sessions/recent?limit=50") if sidecar_sessions and sidecar_sessions.get("sessions"): st.subheader("Recent sessions (Session Amplifier)") st.caption(f"Fetched at {fmt_local(datetime.datetime.now(datetime.timezone.utc).isoformat())}") sess_rows = [] for sr in sidecar_sessions["sessions"]: health = sr.get("health", "ok") hints = sr.get("hints", []) hint_str = "; ".join(hints) if hints else "" sess_rows.append({ "agent": sr.get("agent_id", "?"), "session_id": sr.get("session_id", "?")[:32], "events": sr.get("event_count", 0), "tools": sr.get("tool_result_count", 0), "errors": sr.get("error_count", 0), "health": health.upper(), "hints": hint_str, "last_event": fmt_local(sr.get("last_event_at")), }) st.dataframe(sess_rows, use_container_width=True, hide_index=True) else: st.subheader("Session index") agent_names = ["all"] + sorted({row["agent"] for row in session_rows}) selected_agent = st.selectbox("Filter agent", agent_names, index=0) selected_kind = st.selectbox("Filter kind", ["all", "session", "discord", "cron", "subagent"], index=0) filtered_rows = [] for row in session_rows: if selected_agent != "all" and row["agent"] != selected_agent: continue if selected_kind != "all" and row["kind"] != selected_kind: continue filtered_rows.append( { "agent": row["agent"], "kind": row["kind"], "updated": row["updated"], "sessionKey": row["sessionKey"], "sessionId": row["sessionId"], "deliveryTo": row["deliveryTo"], } ) st.dataframe(filtered_rows, use_container_width=True, hide_index=True) st.subheader("Recent child/subagent sessions") st.dataframe( [ { "agent": row["agent"], "updated": row["updated"], "sessionKey": row["sessionKey"], "sessionId": row["sessionId"], } for row in subagent_rows[:50] ], use_container_width=True, hide_index=True, ) with tab_cron: # Health filter health_filter = st.selectbox( "Filter by health", ["all", "critical", "error", "warning", "stale", "running", "healthy", "disabled"], index=0 ) filtered_cron = cron_rows if health_filter != "all": filtered_cron = [r for r in cron_rows if r["health"] == health_filter] st.subheader(f"Cron jobs ({len(filtered_cron)} shown)") # Color-coded health display def health_color(health): return { "critical": "🔴", "error": "🟠", "warning": "🟡", "stale": "⚪", "running": "🟢", "healthy": "✓", "disabled": "⊘", }.get(health, "?") display_rows = [] for row in filtered_cron: display_rows.append({ "health": f"{health_color(row['health'])} {row['health']}", "name": row["name"], "enabled": "✓" if row["enabled"] else "✗", "schedule": row["schedule"], "lastRun": row["lastRun"], "lastStatus": row["lastStatus"], "nextRun": row["nextRun"], "errors": row["consecutiveErrors"] if row["consecutiveErrors"] > 0 else "", "reason": row["healthReason"], }) st.dataframe(display_rows, use_container_width=True, hide_index=True) # Quick stats if critical_jobs or warning_jobs or stale_jobs: st.error(f"**{len(critical_jobs)}** critical, **{len(warning_jobs)}** warning, **{len(stale_jobs)}** stale jobs need attention") else: st.success("All monitored cron jobs are healthy") st.subheader("Running now") if running_cron: st.dataframe(running_cron, use_container_width=True, hide_index=True) else: st.info("No cron jobs currently marked running.") with tab_activity: st.subheader("Recent transcript activity") st.dataframe( [ { "agent": row["agent"], "updated": row["updated"], "role": row["role"], "sessionKey": row["sessionKey"], "summary": row["summary"], "file": row["file"], } for row in activity_rows[:50] ], use_container_width=True, hide_index=True, ) with tab_live: st.subheader("Live Session Activity Stream") # Prefer sidecar-based session list for selection when available sidecar_sessions = _fetch_sidecar_json("/sessions/recent?limit=30") or {} sidecar_session_list = sidecar_sessions.get("sessions") or [] # Show spooler health / readiness if sidecar_status == "reachable": st.info("Session Amplifier reachable but not yet warmed up — spool may be empty") elif sidecar_status == "unavailable": st.warning("Session Amplifier unavailable. Deploy: cd sidecar/session-amplifier && docker compose up -d") st.caption("Normalized activity feed from Session Amplifier. " "For continuous streaming, use: python scripts/session_amplifier_live_monitor.py") # Build unified session options: sidecar sessions first, then fallback to dashboard sessions if sidecar_session_list: sidecar_opts = [ f"[SA] {s.get('agent_id','?')}: {clean_session_label(s.get('display_title') or '', fallback=s.get('session_id','?')[:32])}" for s in sidecar_session_list[:20] ] sidecar_map = {opt: s for opt, s in zip(sidecar_opts, sidecar_session_list[:20])} use_sidecar = True else: recent = [r for r in session_rows if r["updatedAt"] and r["updatedAt"] > datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=24)] sidecar_opts = [] sidecar_map = {} use_sidecar = False # Fallback: dashboard-based session list recent_sessions = [r for r in session_rows if r["updatedAt"] and r["updatedAt"] > datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=24)] dash_opts = [f"{r['agent']}: {session_display_label(r)}" for r in recent_sessions[:20]] dash_map = {opt: r for opt, r in zip(dash_opts, recent_sessions[:20])} # Combine: sidecar sessions first, then dashboard sessions all_opts = sidecar_opts + dash_opts all_map = {**sidecar_map, **dash_map} if all_opts: selected = st.selectbox("Select session to monitor", all_opts, index=0) selected_item = all_map[selected] if use_sidecar and selected in sidecar_map: sel_session_id = selected_item["session_id"] sel_agent = selected_item["agent_id"] sel_title = clean_session_label(selected_item.get("display_title") or "", fallback=sel_session_id) sel_updated = fmt_local(selected_item.get("last_event_at")) sel_health = selected_item.get("health", "ok").upper() sel_hints = "; ".join(selected_item.get("hints", []) or []) st.caption(f"Health: {sel_health} | Events: {selected_item.get('event_count',0)} | Tools: {selected_item.get('tool_result_count',0)} | Errors: {selected_item.get('error_count',0)}") if sel_hints: st.caption(f"Hints: {sel_hints}") # Fetch activity from sidecar activity_data = _fetch_sidecar_json(f"/session/{sel_session_id}/activity?limit=200") activities = activity_data.get("activity", []) if activity_data else [] else: if isinstance(selected_item, dict) and "sessionKey" in selected_item: sel_session_id = selected_item["sessionKey"] sel_agent = selected_item["agent"] sel_title = session_display_label(selected_item) sel_updated = selected_item["updated"] else: sel_session_id = "?" sel_agent = "?" sel_title = "?" sel_updated = "?" st.caption(f"Dashboard session — may not appear in Session Amplifier spool") activities = get_live_session_activity(sel_session_id, sel_agent, max_lines=200) col1, col2, col3 = st.columns([1, 1, 1]) with col1: st.write(f"**Agent:** {sel_agent}") with col2: st.write(f"**Session:** {sel_title}") with col3: st.write(f"**Last event:** {sel_updated}") if activities: current_bucket = None for act in reversed(activities[-50:]): ts = act.get("timestamp") if ts: try: bucket = ts.replace(minute=(ts.minute // 5) * 5, second=0, microsecond=0) if bucket != current_bucket: current_bucket = bucket st.divider() st.caption(f"📍 {fmt_local(bucket)}") except Exception: pass evtype = act.get("type") or act.get("event_type", "") ts_str = act.get("time") or "" if evtype == "thinking": with st.expander(f"🧠 {ts_str} — Thinking", expanded=False): st.text(act.get("details", "")[:1000] or "(no details)") elif evtype == "tool": st.markdown(f"**🔧 {ts_str}** — {act.get('summary', '')}") if act.get("details"): st.code(act["details"][:500], language="bash") elif evtype == "tool_call": st.markdown(f"**⚙ {ts_str}** → {act.get('tool_name', '')} — {act.get('summary', '')}") elif evtype == "tool_result" or evtype == "result": icon = "❌" if act.get("is_error") else "✅" st.markdown(f"**{icon} {ts_str}** — {act.get('summary', '')}") if act.get("details"): st.text(act["details"][:300]) elif evtype == "tool_error": st.error(f"⚠ {ts_str} — {act.get('summary', 'tool error')}") if act.get("details"): st.text(act["details"][:300]) elif evtype == "assistant_meta": st.markdown(f"**💡 {ts_str}** — {act.get('summary', '')[:120]}") elif evtype == "assistant_thinking": with st.expander(f"🧠 {ts_str} — Thinking", expanded=False): st.text(act.get("details", "")[:500] or "(no details)") elif evtype == "assistant_text": st.markdown(f"**💬 {ts_str}** — {act.get('summary', '')[:120]}") elif evtype == "user_message": with st.expander(f"👤 {ts_str} — User message", expanded=False): st.text(act.get("details", "")[:300]) else: st.caption(f"[{evtype or '?'}] {ts_str} — {act.get('summary', '')[:80]}") else: st.info("No activity found for this session (may be archived or not yet written)") else: st.info("No recent sessions found in the last 24 hours") with tab_model_ops: render_model_ops_tab(session_rows) if __name__ == "__main__": main()