| import datetime |
| import json |
| import os |
| import re |
| import time |
| from collections import Counter, deque |
| from pathlib import Path |
| from zoneinfo import ZoneInfo |
|
|
| import streamlit as st |
|
|
|
|
| if os.name == "nt" or Path("C:/openclaw-biz/state/openclaw").exists(): |
| OPENCLAW_ROOT = Path("C:/openclaw-biz/state/openclaw") |
| else: |
| OPENCLAW_ROOT = Path("/home/node/.openclaw") |
|
|
| AGENTS_DIR = OPENCLAW_ROOT / "agents" |
| CRON_DIR = OPENCLAW_ROOT / "cron" |
| STATE_DIR = Path("/home/node/.openclaw/workspace/ops/state") |
| SESSION_AMPLIFIER_BASE = os.environ.get("SESSION_AMPLIFIER_BASE_URL", "http://session-amplifier:8477") |
| LOCAL_TZ = ZoneInfo("America/New_York") |
|
|
|
|
| def parse_ts_any(ts): |
| if ts is None: |
| return None |
| if isinstance(ts, datetime.datetime): |
| return ts if ts.tzinfo else ts.replace(tzinfo=datetime.timezone.utc) |
| if isinstance(ts, (int, float)): |
| try: |
| value = float(ts) |
| if value > 1e11: |
| value /= 1000.0 |
| return datetime.datetime.fromtimestamp(value, tz=datetime.timezone.utc) |
| except Exception: |
| return None |
| if isinstance(ts, str): |
| text = ts.strip() |
| if not text: |
| return None |
| if text.endswith("Z"): |
| text = text[:-1] + "+00:00" |
| try: |
| dt = datetime.datetime.fromisoformat(text) |
| return dt if dt.tzinfo else dt.replace(tzinfo=datetime.timezone.utc) |
| except Exception: |
| return None |
| return None |
|
|
|
|
| def fmt_local(ts): |
| dt = parse_ts_any(ts) |
| if dt is None: |
| return "" |
| try: |
| return dt.astimezone(LOCAL_TZ).strftime("%Y-%m-%d %I:%M:%S %p %Z") |
| except Exception: |
| return dt.isoformat() |
|
|
|
|
| def fmt_duration(seconds): |
| """Format seconds into human-readable duration.""" |
| if seconds < 60: |
| return f"{int(seconds)}s" |
| elif seconds < 3600: |
| return f"{int(seconds/60)}m" |
| elif seconds < 86400: |
| return f"{int(seconds/3600)}h" |
| else: |
| return f"{int(seconds/86400)}d" |
|
|
|
|
| def read_json(path: Path): |
| try: |
| return json.loads(path.read_text(encoding="utf-8")) |
| except Exception: |
| return None |
|
|
|
|
| def read_tail_lines(path: Path, count: int = 80): |
| try: |
| with path.open("r", encoding="utf-8", errors="ignore") as fh: |
| return list(deque(fh, maxlen=count)) |
| except Exception: |
| return [] |
|
|
|
|
| def clean_session_label(value: str, *, fallback: str = "") -> str: |
| text = (value or "").strip() |
| if not text: |
| return fallback |
| text = re.sub(r"^Discord thread\s+#?[^βΊ]+βΊ\s*", "", text) |
| text = re.sub(r"\s+channel id:\d+\s*$", "", text, flags=re.IGNORECASE) |
| text = text.strip(" #") |
| text = text.replace("-", " ") |
| text = re.sub(r"\s+", " ", text) |
| return text[:96] or fallback |
|
|
|
|
| def session_display_label(row: dict) -> str: |
| label = clean_session_label(row.get("label") or "") |
| if label: |
| return label |
| key = row.get("sessionKey") or row.get("sessionId") or "session" |
| kind = row.get("kind") or "session" |
| return f"{kind}: {str(key)[:32]}" |
|
|
|
|
| def normalize_exec_tool_label(content_block): |
| try: |
| name = content_block.get("name") or "unknown_tool" |
| if name != "exec": |
| return name |
| args = content_block.get("arguments") |
| cmd = None |
| if isinstance(args, dict): |
| cmd = args.get("command") or args.get("cmd") |
| elif isinstance(args, str): |
| cmd = args |
| if not isinstance(cmd, str): |
| return "exec" |
| if "mcporter" not in cmd.strip().lower(): |
| return "exec" |
| import re |
|
|
| match = re.search(r"(?:^|\s)mcporter\s+call\s+([A-Za-z0-9_\-\.]+)", cmd) |
| if match: |
| return f"exec mcporter {match.group(1)}" |
| return "exec mcporter" |
| except Exception: |
| return content_block.get("name") or "unknown_tool" |
|
|
|
|
| def session_index_rows(): |
| rows = [] |
| session_key_by_agent_and_id = {} |
| if not AGENTS_DIR.exists(): |
| return rows, session_key_by_agent_and_id |
|
|
| for agent_dir in sorted(AGENTS_DIR.iterdir()): |
| if not agent_dir.is_dir() or agent_dir.name.startswith("."): |
| continue |
| sessions_file = agent_dir / "sessions" / "sessions.json" |
| data = read_json(sessions_file) |
| if not isinstance(data, dict): |
| continue |
| for session_key, meta in data.items(): |
| if not isinstance(meta, dict): |
| continue |
| updated_raw = meta.get("updatedAt") or meta.get("updatedAtMs") |
| session_id = meta.get("sessionId") or "" |
| delivery = meta.get("deliveryContext") if isinstance(meta.get("deliveryContext"), dict) else {} |
| kind = "session" |
| if ":subagent:" in session_key: |
| kind = "subagent" |
| elif ":cron:" in session_key: |
| kind = "cron" |
| elif ":discord:" in session_key: |
| kind = "discord" |
| origin = meta.get("origin") if isinstance(meta.get("origin"), dict) else {} |
| label = ( |
| meta.get("displayName") |
| or meta.get("derivedTitle") |
| or meta.get("title") |
| or origin.get("label") |
| or meta.get("groupChannel") |
| or "" |
| ) |
| row = { |
| "agent": agent_dir.name, |
| "sessionKey": session_key, |
| "sessionId": session_id, |
| "label": clean_session_label(str(label), fallback=session_key), |
| "updatedAt": parse_ts_any(updated_raw), |
| "updated": fmt_local(updated_raw), |
| "deliveryTo": delivery.get("to") or "", |
| "kind": kind, |
| } |
| rows.append(row) |
| if session_id: |
| session_key_by_agent_and_id[(agent_dir.name, session_id)] = row |
|
|
| rows.sort( |
| key=lambda row: row["updatedAt"] or datetime.datetime.min.replace(tzinfo=datetime.timezone.utc), |
| reverse=True, |
| ) |
| return rows, session_key_by_agent_and_id |
|
|
|
|
| def transcript_activity_rows(session_lookup, max_files=80): |
| rows = [] |
| if not AGENTS_DIR.exists(): |
| return rows |
|
|
| jsonl_files = sorted( |
| AGENTS_DIR.rglob("sessions/*.jsonl"), |
| key=lambda path: path.stat().st_mtime if path.exists() else 0, |
| reverse=True, |
| )[:max_files] |
|
|
| for path in jsonl_files: |
| agent = path.parts[-3] if len(path.parts) >= 3 else "unknown" |
| session_id = path.stem |
| session_meta = session_lookup.get((agent, session_id)) |
| session_key = session_meta.get("sessionKey") if isinstance(session_meta, dict) else session_meta |
| session_key = session_key or session_id |
| label = session_meta.get("label") if isinstance(session_meta, dict) else "" |
| last_ts = None |
| summary = "" |
| role = "" |
| for line in reversed(read_tail_lines(path, 120)): |
| line = line.strip() |
| if not line: |
| continue |
| try: |
| row = json.loads(line) |
| except Exception: |
| continue |
| msg = row.get("message") if isinstance(row.get("message"), dict) else {} |
| ts = parse_ts_any(row.get("timestamp") or row.get("time") or msg.get("timestamp") or msg.get("time")) |
| if ts and last_ts is None: |
| last_ts = ts |
| if not summary and isinstance(msg, dict): |
| role = msg.get("role") or row.get("type") or "" |
| content = msg.get("content") if isinstance(msg.get("content"), list) else [] |
| for block in content: |
| if not isinstance(block, dict): |
| continue |
| if block.get("type") == "toolCall": |
| summary = f"tool: {normalize_exec_tool_label(block)}" |
| break |
| if block.get("type") == "text": |
| text = (block.get("text") or "").strip().replace("\n", " ") |
| if text: |
| summary = text[:160] |
| break |
| if not summary and msg.get("errorMessage"): |
| summary = str(msg.get("errorMessage"))[:160] |
| if last_ts and summary: |
| break |
| rows.append( |
| { |
| "agent": agent, |
| "sessionKey": session_key, |
| "sessionId": session_id, |
| "label": label or clean_session_label(session_key, fallback=session_id), |
| "updatedAt": last_ts, |
| "updated": fmt_local(last_ts), |
| "role": role, |
| "summary": summary or "(no recent summary)", |
| "file": str(path), |
| } |
| ) |
|
|
| rows.sort( |
| key=lambda row: row["updatedAt"] or datetime.datetime.min.replace(tzinfo=datetime.timezone.utc), |
| reverse=True, |
| ) |
| return rows |
|
|
|
|
| def list_cron_jobs(limit=100): |
| """Enhanced cron job listing with health status and staleness detection.""" |
| jobs_file = CRON_DIR / "jobs.json" |
| data = read_json(jobs_file) |
| jobs = data.get("jobs", []) if isinstance(data, dict) else [] |
| rows = [] |
| now = datetime.datetime.now(datetime.timezone.utc) |
| now_ms = time.time() * 1000 |
| |
| for job in jobs[:limit]: |
| if not isinstance(job, dict): |
| continue |
| state = job.get("state") if isinstance(job.get("state"), dict) else {} |
| schedule = job.get("schedule") if isinstance(job.get("schedule"), dict) else {} |
| payload = job.get("payload") if isinstance(job.get("payload"), dict) else {} |
| |
| last_run_ms = state.get("lastRunAtMs") |
| next_run_ms = state.get("nextRunAtMs") |
| consecutive_errors = state.get("consecutiveErrors", 0) |
| last_status = state.get("lastStatus") or state.get("lastRunStatus") or "" |
| |
| |
| staleness = None |
| if last_run_ms and next_run_ms: |
| |
| if now_ms > next_run_ms + 300000: |
| staleness = fmt_duration((now_ms - next_run_ms) / 1000) |
| elif last_run_ms and schedule.get("expr"): |
| |
| pass |
| |
| |
| health = "healthy" |
| health_reason = "" |
| if not job.get("enabled", True): |
| health = "disabled" |
| elif consecutive_errors >= 3: |
| health = "critical" |
| health_reason = f"{consecutive_errors} consecutive errors" |
| elif consecutive_errors > 0: |
| health = "warning" |
| health_reason = f"{consecutive_errors} error(s)" |
| elif last_status == "error": |
| health = "error" |
| elif staleness: |
| health = "stale" |
| health_reason = f"overdue by {staleness}" |
| elif state.get("runningAtMs"): |
| health = "running" |
| |
| rows.append( |
| { |
| "name": job.get("name") or job.get("id") or "unnamed", |
| "jobId": job.get("id") or "", |
| "enabled": bool(job.get("enabled", True)), |
| "kind": payload.get("kind") or "", |
| "model": payload.get("model") or "", |
| "schedule": schedule.get("expr") or schedule.get("kind") or "", |
| "tz": schedule.get("tz") or "", |
| "nextRunAt": parse_ts_any(state.get("nextRunAtMs") or state.get("nextRunAt")), |
| "nextRun": fmt_local(state.get("nextRunAtMs") or state.get("nextRunAt")), |
| "runningAt": parse_ts_any(state.get("runningAtMs") or state.get("runningAt")), |
| "running": fmt_local(state.get("runningAtMs") or state.get("runningAt")), |
| "lastRunAt": parse_ts_any(state.get("lastRunAtMs") or state.get("lastRunAt")), |
| "lastRun": fmt_local(state.get("lastRunAtMs") or state.get("lastRunAt")), |
| "lastStatus": last_status, |
| "sessionKey": job.get("sessionKey") or "", |
| "consecutiveErrors": consecutive_errors, |
| "health": health, |
| "healthReason": health_reason, |
| "staleness": staleness, |
| } |
| ) |
| rows.sort( |
| key=lambda row: ( |
| {"critical": 0, "error": 1, "warning": 2, "stale": 3, "running": 4, "healthy": 5, "disabled": 6}.get(row["health"], 7), |
| row["runningAt"] or row["nextRunAt"] or row["lastRunAt"] or datetime.datetime.min.replace(tzinfo=datetime.timezone.utc) |
| ), |
| reverse=False, |
| ) |
| return rows |
|
|
|
|
| def collect_metrics_last_24h(max_files=500): |
| now = datetime.datetime.now(datetime.timezone.utc) |
| cutoff = now - datetime.timedelta(hours=24) |
| jsonl_files = sorted( |
| AGENTS_DIR.rglob("sessions/*.jsonl"), |
| key=lambda path: path.stat().st_mtime if path.exists() else 0, |
| reverse=True, |
| )[:max_files] |
|
|
| tool_counts = Counter() |
| error_count = 0 |
| total_events = 0 |
| token_total = 0 |
| input_tokens = 0 |
| output_tokens = 0 |
| cost_total = 0.0 |
| cache_hits = 0 |
| cache_samples = 0 |
| latest_per_agent = {} |
|
|
| for path in jsonl_files: |
| agent = path.parts[-3] if len(path.parts) >= 3 else "unknown" |
| try: |
| with path.open("r", encoding="utf-8", errors="ignore") as fh: |
| for line in fh: |
| line = line.strip() |
| if not line: |
| continue |
| try: |
| row = json.loads(line) |
| except Exception: |
| continue |
| msg = row.get("message") if isinstance(row.get("message"), dict) else {} |
| ts = parse_ts_any(row.get("timestamp") or msg.get("timestamp") or row.get("time") or msg.get("time")) |
| if ts is None or ts < cutoff: |
| continue |
| total_events += 1 |
| if msg.get("errorMessage") or msg.get("stopReason") == "error": |
| error_count += 1 |
| summary = None |
| content = msg.get("content") if isinstance(msg.get("content"), list) else [] |
| for block in content: |
| if not isinstance(block, dict): |
| continue |
| if block.get("type") == "toolCall": |
| label = normalize_exec_tool_label(block) |
| tool_counts[label] += 1 |
| summary = summary or f"tool: {label}" |
| elif block.get("type") == "text" and msg.get("role") == "assistant": |
| text = (block.get("text") or "").strip().replace("\n", " ") |
| if text and summary is None: |
| summary = text[:120] |
| usage = msg.get("usage") if isinstance(msg.get("usage"), dict) else {} |
| if usage: |
| |
| total_tok = usage.get("total_tokens") or usage.get("totalTokens") |
| input_tok = usage.get("input_tokens") or usage.get("inputTokens") or usage.get("input") |
| output_tok = usage.get("output_tokens") or usage.get("outputTokens") or usage.get("output") |
| |
| if total_tok: |
| token_total += int(total_tok) |
| elif input_tok or output_tok: |
| token_total += int(input_tok or 0) + int(output_tok or 0) |
| |
| if input_tok: |
| input_tokens += int(input_tok) |
| if output_tok: |
| output_tokens += int(output_tok) |
| |
| for key in ("cost", "total_cost", "usd", "costUsd"): |
| value = usage.get(key) |
| if isinstance(value, (int, float)): |
| cost_total += float(value) |
| cached = usage.get("cache_hit") or usage.get("cacheHit") |
| cached_tokens = usage.get("cached_tokens") or usage.get("cachedTokens") or usage.get("cache_read_tokens") or usage.get("cacheReadTokens") |
| if isinstance(cached, bool): |
| cache_samples += 1 |
| if cached: |
| cache_hits += 1 |
| elif isinstance(cached_tokens, (int, float)): |
| cache_samples += 1 |
| if cached_tokens > 0: |
| cache_hits += 1 |
| latest = latest_per_agent.get(agent) |
| if latest is None or ts > latest["ts"]: |
| latest_per_agent[agent] = {"ts": ts, "summary": summary or "No recent assistant/tool summary"} |
| except Exception: |
| continue |
|
|
| active_cutoff = now - datetime.timedelta(minutes=60) |
| active_agents = [ |
| {"agent": agent, "ts": info["ts"], "updated": fmt_local(info["ts"]), "summary": info["summary"]} |
| for agent, info in latest_per_agent.items() |
| if info["ts"] >= active_cutoff |
| ] |
| active_agents.sort(key=lambda row: row["ts"], reverse=True) |
| return { |
| "tool_counts": tool_counts, |
| "error_rate": (error_count / total_events * 100.0) if total_events else 0.0, |
| "error_count": error_count, |
| "total_events": total_events, |
| "token_total": token_total, |
| "input_tokens": input_tokens, |
| "output_tokens": output_tokens, |
| "cost_total": cost_total, |
| "cache_hit_proxy": (cache_hits / cache_samples * 100.0) if cache_samples else None, |
| "cache_samples": cache_samples, |
| "active_summaries": active_agents, |
| } |
|
|
|
|
| def get_live_session_activity(session_key, agent_name, max_lines=100): |
| """Extract detailed recent activity from a specific session's JSONL.""" |
| if not AGENTS_DIR.exists(): |
| return [] |
| |
| |
| agent_dir = AGENTS_DIR / agent_name |
| if not agent_dir.exists(): |
| return [] |
| |
| |
| session_id = session_key.split(":")[-1] if ":" in session_key else session_key |
| jsonl_path = agent_dir / "sessions" / f"{session_id}.jsonl" |
| |
| if not jsonl_path.exists(): |
| return [] |
| |
| activities = [] |
| lines = read_tail_lines(jsonl_path, max_lines) |
| |
| for line in lines: |
| line = line.strip() |
| if not line: |
| continue |
| try: |
| row = json.loads(line) |
| except Exception: |
| continue |
| |
| msg = row.get("message") if isinstance(row.get("message"), dict) else {} |
| ts = parse_ts_any(row.get("timestamp") or msg.get("timestamp") or row.get("time") or msg.get("time")) |
| |
| entry = { |
| "timestamp": ts, |
| "time": fmt_local(ts), |
| "role": msg.get("role", ""), |
| "type": "", |
| "summary": "", |
| "details": "", |
| } |
| |
| content = msg.get("content") if isinstance(msg.get("content"), list) else [] |
| for block in content: |
| if not isinstance(block, dict): |
| continue |
| |
| if block.get("type") == "thinking": |
| entry["type"] = "thinking" |
| thinking = block.get("thinking") or "" |
| entry["summary"] = "Thinking..." if thinking else "(empty thinking)" |
| entry["details"] = thinking[:500] if thinking else "" |
| |
| elif block.get("type") == "toolCall": |
| entry["type"] = "tool" |
| label = normalize_exec_tool_label(block) |
| entry["summary"] = f"β {label}" |
| args = block.get("arguments", {}) |
| if isinstance(args, dict): |
| |
| arg_summary = [] |
| for k, v in list(args.items())[:3]: |
| v_str = str(v)[:50] |
| arg_summary.append(f"{k}={v_str}") |
| entry["details"] = ", ".join(arg_summary) |
| elif isinstance(args, str): |
| entry["details"] = args[:100] |
| |
| elif block.get("type") == "toolResult": |
| entry["type"] = "result" |
| content_data = block.get("content") |
| is_error = block.get("isError") or block.get("error") |
| entry["summary"] = "β Error" if is_error else "β Result" |
| if isinstance(content_data, str): |
| entry["details"] = content_data[:200] |
| elif isinstance(content_data, list) and content_data: |
| entry["details"] = str(content_data[0])[:200] |
| |
| elif block.get("type") == "text": |
| entry["type"] = "text" |
| text = (block.get("text") or "").strip() |
| entry["summary"] = text[:80].replace("\n", " ") |
| entry["details"] = text[:300] |
| |
| if entry["type"] or entry["summary"]: |
| activities.append(entry) |
| |
| return activities |
|
|
|
|
| def _load_sidecar_artifact(name: str): |
| """Load a Session Amplifier artifact from the state directory. Returns {} if unavailable.""" |
| path = STATE_DIR / "session_amplifier" / name |
| if not path.exists(): |
| return None |
| try: |
| return json.loads(path.read_text(encoding="utf-8")) |
| except Exception: |
| return None |
|
|
|
|
| def _sidecar_reachable() -> bool: |
| try: |
| import urllib.request |
| req = urllib.request.Request(f"{SESSION_AMPLIFIER_BASE}/health", method="GET") |
| urllib.request.urlopen(req, timeout=2) |
| return True |
| except Exception: |
| return False |
|
|
|
|
| def _fetch_sidecar_json(path: str, default=None): |
| """GET a path from the sidecar API. Returns parsed JSON or default on failure.""" |
| try: |
| import urllib.request |
| req = urllib.request.Request(f"{SESSION_AMPLIFIER_BASE}{path}", method="GET") |
| with urllib.request.urlopen(req, timeout=5) as resp: |
| return json.loads(resp.read().decode()) |
| except Exception: |
| return default |
|
|
|
|
| def _load_ops_json(name: str, default=None): |
| path = STATE_DIR / name |
| try: |
| return json.loads(path.read_text(encoding="utf-8")) |
| except Exception: |
| return default |
|
|
|
|
| def render_model_ops_tab(session_rows): |
| snapshot = _load_ops_json("model-ops-snapshot-latest.json", {}) or {} |
| merged = _load_ops_json("model-benchmarks-merged-latest.json", {}) or {} |
| cost_rows = _load_ops_json("session-cost-top50-latest.json", []) or [] |
|
|
| st.subheader("Model Ops") |
| c1, c2, c3, c4 = st.columns(4) |
| c1.metric("OpenRouter models", snapshot.get("market_model_count", 0)) |
| c2.metric("Merged benchmark models", len(merged.get("models", []))) |
| c3.metric("HF models", len((_load_ops_json("hf-benchmarks-latest.json", {}) or {}).get("models", []))) |
| c4.metric("Top cost rows", len(cost_rows)) |
|
|
| st.markdown("### Benchmark sources") |
| src_rows = [] |
| for src in snapshot.get("benchmark_source_status", []): |
| if isinstance(src, dict): |
| src_rows.append({"source": src.get("name"), "status": src.get("status")}) |
| if src_rows: |
| st.dataframe(src_rows, use_container_width=True, hide_index=True) |
| else: |
| st.caption("No benchmark source metadata available") |
|
|
| st.markdown("### Cost coverage quality") |
| coverage = Counter((row.get("cost_source") or "unknown") for row in cost_rows) |
| cc1, cc2, cc3, cc4 = st.columns(4) |
| cc1.metric("Observed", coverage.get("observed", 0)) |
| cc2.metric("Estimated", coverage.get("estimated", 0)) |
| cc3.metric("Unknown", coverage.get("unknown", 0) + coverage.get("", 0)) |
| cc4.metric("Trust score", f"{(snapshot.get('cost_trust_score', 0.0) * 100):.1f}%") |
|
|
| st.markdown("### Highest current observed session cost") |
| if cost_rows: |
| agent_filter = st.selectbox("Cost rows filter by agent", ["all"] + sorted({r.get('agent_id') or '' for r in cost_rows if r.get('agent_id')}), index=0) |
| filtered_cost = [r for r in cost_rows if agent_filter == 'all' or r.get('agent_id') == agent_filter] |
| st.dataframe(filtered_cost[:15], use_container_width=True, hide_index=True) |
|
|
| st.markdown("#### Inspect costly session") |
| lookup = {f"{r.get('agent_id')}/{r.get('session_id')} β {r.get('primary_model')} β ${r.get('total_estimated_usd')}": r for r in filtered_cost[:25]} |
| if lookup: |
| selected = st.selectbox("Jump to session details", list(lookup.keys()), index=0) |
| chosen = lookup[selected] |
| agent = chosen.get('agent_id') |
| session_id = chosen.get('session_id') |
| matching = [r for r in session_rows if r.get('agent') == agent and (r.get('sessionId') == session_id or session_id in (r.get('sessionKey') or ''))] |
| if matching: |
| match = matching[0] |
| st.caption(f"Session key: {match.get('sessionKey')}") |
| st.caption(f"Last updated: {match.get('updated')}") |
| st.caption(f"Delivery: {match.get('deliveryTo')}") |
| recent = get_live_session_activity(match.get('sessionKey'), agent, max_lines=40) |
| if recent: |
| st.dataframe([ |
| { |
| 'time': a.get('time'), |
| 'type': a.get('type'), |
| 'summary': a.get('summary'), |
| 'details': (a.get('details') or '')[:120], |
| } for a in recent[-15:] |
| ], use_container_width=True, hide_index=True) |
| else: |
| st.caption('No recent activity extracted from transcript.') |
| else: |
| st.caption('No matching session found in current session index.') |
| else: |
| st.caption("No session cost artifacts found") |
|
|
| show_legacy = st.checkbox("Show legacy / non-orchestration benchmark leaders", value=False) |
| st.markdown("### Top merged Arena Elo") |
| arena_elo = snapshot.get("top_arena_elo", []) |
| if show_legacy: |
| arena_elo = sorted([m for m in merged.get('models', []) if (m.get('arena') or {}).get('elo') is not None], key=lambda m: (m.get('arena') or {}).get('elo') or 0, reverse=True)[:10] |
| if arena_elo: |
| st.dataframe([ |
| {"model": r.get("model"), "elo": (r.get("arena") or {}).get("elo"), "mapping": (r.get("mapping") or {}).get("confidence")} |
| for r in arena_elo |
| ], use_container_width=True, hide_index=True) |
| else: |
| st.caption("No merged arena elo rows") |
|
|
| st.markdown("### Top merged Arena winrate") |
| arena_wr = snapshot.get("top_arena_winrate", []) |
| if show_legacy: |
| arena_wr = sorted([m for m in merged.get('models', []) if (m.get('arena') or {}).get('winrate') is not None], key=lambda m: (m.get('arena') or {}).get('winrate') or 0, reverse=True)[:10] |
| if arena_wr: |
| st.dataframe([ |
| { |
| "model": r.get("model"), |
| "winrate": (r.get("arena") or {}).get("winrate"), |
| "appearances": (r.get("arena") or {}).get("appearances"), |
| "mapping": (r.get("mapping") or {}).get("confidence"), |
| } |
| for r in arena_wr |
| ], use_container_width=True, hide_index=True) |
| else: |
| st.caption("No merged arena winrate rows") |
|
|
| st.markdown("### Highest heuristic efficiency") |
| eff = snapshot.get("top_efficiency", []) |
| if eff: |
| st.dataframe([ |
| { |
| "model": r.get("id"), |
| "efficiency": r.get("efficiency_score"), |
| "input_per_1m": (r.get("pricing_per_1m") or {}).get("input"), |
| "context": r.get("context_length"), |
| } |
| for r in eff |
| ], use_container_width=True, hide_index=True) |
|
|
| st.markdown("### Mapping confidence breakdown") |
| counts = Counter(((m.get("mapping") or {}).get("confidence") or "unknown") for m in merged.get("models", [])) |
| st.json(dict(counts)) |
|
|
|
|
| def main(): |
| st.set_page_config(page_title="OpenClaw Ops Dashboard", layout="wide") |
| st.title("OpenClaw Ops Dashboard") |
| st.caption(f"Root: {OPENCLAW_ROOT}") |
|
|
| |
| sidecar_status = "unavailable" |
| sidecar_report = _load_sidecar_artifact("review-latest.json") |
| sidecar_skills = _load_sidecar_artifact("skills-latest.json") |
| if sidecar_report or sidecar_skills: |
| sidecar_status = "ok" |
| elif _sidecar_reachable(): |
| sidecar_status = "reachable" |
|
|
| session_rows, session_lookup = session_index_rows() |
| activity_rows = transcript_activity_rows(session_lookup) |
| cron_rows = list_cron_jobs() |
| metrics = collect_metrics_last_24h() |
|
|
| running_cron = [row for row in cron_rows if row["runningAt"]] |
| subagent_rows = [row for row in session_rows if row["kind"] == "subagent"] |
| |
| |
| critical_jobs = [r for r in cron_rows if r["health"] == "critical"] |
| warning_jobs = [r for r in cron_rows if r["health"] == "warning"] |
| stale_jobs = [r for r in cron_rows if r["health"] == "stale"] |
|
|
| c1, c2, c3, c4, c5 = st.columns(5) |
| c1.metric("Agents", len({row["agent"] for row in session_rows})) |
| c2.metric("Indexed sessions", len(session_rows)) |
| c3.metric("Running cron", len(running_cron)) |
| c4.metric("Sidecar", sidecar_status.title(), help="Session Amplifier sidecar availability") |
| health_display = "β" if not (critical_jobs or warning_jobs or stale_jobs) else f"β {len(critical_jobs)}/{len(warning_jobs)}/{len(stale_jobs)}" |
| c5.metric("Health", health_display, help="Critical/Warning/Stale cron jobs") |
|
|
| tab_overview, tab_sessions, tab_cron, tab_activity, tab_live, tab_model_ops = st.tabs( |
| ["Overview", "Sessions", "Cron", "Activity", "Live Session", "Model Ops"] |
| ) |
|
|
| with tab_overview: |
| left, right = st.columns([1, 1]) |
| with left: |
| st.subheader("Active agent summaries (last 60m)") |
| if metrics["active_summaries"]: |
| st.dataframe(metrics["active_summaries"], use_container_width=True, hide_index=True) |
| else: |
| st.info("No active agent summaries in the last 60 minutes.") |
|
|
| st.subheader("Recent sessions") |
| if session_rows: |
| st.dataframe( |
| [ |
| { |
| "agent": row["agent"], |
| "kind": row["kind"], |
| "updated": row["updated"], |
| "sessionKey": row["sessionKey"], |
| "deliveryTo": row["deliveryTo"], |
| } |
| for row in session_rows[:20] |
| ], |
| use_container_width=True, |
| hide_index=True, |
| ) |
| else: |
| st.info("No indexed sessions found.") |
|
|
| with right: |
| st.subheader("Last 24h metrics") |
| m1, m2, m3, m4 = st.columns(4) |
| m1.metric("Error rate", f"{metrics['error_rate']:.2f}%") |
| m2.metric("Errors / events", f"{metrics['error_count']} / {metrics['total_events']}") |
| |
| |
| token_display = f"{metrics['token_total']:,}" if metrics['token_total'] else "0" |
| m3.metric("Tokens", token_display) |
| m4.metric("Cost", f"${metrics['cost_total']:.4f}") |
| |
| st.caption(f"Input: {metrics['input_tokens']:,} | Output: {metrics['output_tokens']:,}") |
| |
| if metrics["cache_hit_proxy"] is None: |
| st.caption("Cache-hit proxy unavailable in recent usage fields.") |
| else: |
| st.caption(f"Cache-hit proxy: {metrics['cache_hit_proxy']:.2f}% across {metrics['cache_samples']} samples.") |
|
|
| st.subheader("Top tools (last 24h)") |
| top_tools = [{"tool": tool, "count": count} for tool, count in metrics["tool_counts"].most_common(20)] |
| if top_tools: |
| st.dataframe(top_tools, use_container_width=True, hide_index=True) |
| else: |
| st.info("No tool invocation events found in the last 24 hours.") |
|
|
| |
| st.subheader("Session Amplifier") |
| if sidecar_report: |
| patterns = sidecar_report.get("failure_patterns", []) |
| if patterns: |
| st.warning(f"{len(patterns)} failure pattern(s) detected β see Activity tab for details") |
| for p in patterns[:5]: |
| st.caption(f"β’ {p.get('description', p.get('pattern', '?'))} ({p.get('count', 0)})") |
| else: |
| st.success("No failure patterns detected") |
| if sidecar_report.get("sessions_reviewed"): |
| st.caption(f"Reviewed {sidecar_report['sessions_reviewed']} sessions") |
| elif sidecar_status == "reachable": |
| st.info("Sidecar reachable but not yet warmed up β run spool to populate") |
| else: |
| st.caption("Session Amplifier unavailable β see docs to deploy sidecar/session-amplifier/") |
|
|
| if sidecar_skills: |
| missing = sidecar_skills.get("mcps_missing_skill_surface", []) |
| if missing: |
| st.warning(f"{len(missing)} MCP(s) without skill surface: {', '.join(missing[:5])}") |
| else: |
| st.caption("All registered MCPs have skill surfaces β") |
|
|
| with tab_sessions: |
| |
| sidecar_sessions = _fetch_sidecar_json("/sessions/recent?limit=50") |
| if sidecar_sessions and sidecar_sessions.get("sessions"): |
| st.subheader("Recent sessions (Session Amplifier)") |
| st.caption(f"Fetched at {fmt_local(datetime.datetime.now(datetime.timezone.utc).isoformat())}") |
| sess_rows = [] |
| for sr in sidecar_sessions["sessions"]: |
| health = sr.get("health", "ok") |
| hints = sr.get("hints", []) |
| hint_str = "; ".join(hints) if hints else "" |
| sess_rows.append({ |
| "agent": sr.get("agent_id", "?"), |
| "session_id": sr.get("session_id", "?")[:32], |
| "events": sr.get("event_count", 0), |
| "tools": sr.get("tool_result_count", 0), |
| "errors": sr.get("error_count", 0), |
| "health": health.upper(), |
| "hints": hint_str, |
| "last_event": fmt_local(sr.get("last_event_at")), |
| }) |
| st.dataframe(sess_rows, use_container_width=True, hide_index=True) |
| else: |
| st.subheader("Session index") |
| agent_names = ["all"] + sorted({row["agent"] for row in session_rows}) |
| selected_agent = st.selectbox("Filter agent", agent_names, index=0) |
| selected_kind = st.selectbox("Filter kind", ["all", "session", "discord", "cron", "subagent"], index=0) |
| filtered_rows = [] |
| for row in session_rows: |
| if selected_agent != "all" and row["agent"] != selected_agent: |
| continue |
| if selected_kind != "all" and row["kind"] != selected_kind: |
| continue |
| filtered_rows.append( |
| { |
| "agent": row["agent"], |
| "kind": row["kind"], |
| "updated": row["updated"], |
| "sessionKey": row["sessionKey"], |
| "sessionId": row["sessionId"], |
| "deliveryTo": row["deliveryTo"], |
| } |
| ) |
| st.dataframe(filtered_rows, use_container_width=True, hide_index=True) |
|
|
| st.subheader("Recent child/subagent sessions") |
| st.dataframe( |
| [ |
| { |
| "agent": row["agent"], |
| "updated": row["updated"], |
| "sessionKey": row["sessionKey"], |
| "sessionId": row["sessionId"], |
| } |
| for row in subagent_rows[:50] |
| ], |
| use_container_width=True, |
| hide_index=True, |
| ) |
|
|
| with tab_cron: |
| |
| health_filter = st.selectbox( |
| "Filter by health", |
| ["all", "critical", "error", "warning", "stale", "running", "healthy", "disabled"], |
| index=0 |
| ) |
| |
| filtered_cron = cron_rows |
| if health_filter != "all": |
| filtered_cron = [r for r in cron_rows if r["health"] == health_filter] |
| |
| st.subheader(f"Cron jobs ({len(filtered_cron)} shown)") |
| |
| |
| def health_color(health): |
| return { |
| "critical": "π΄", |
| "error": "π ", |
| "warning": "π‘", |
| "stale": "βͺ", |
| "running": "π’", |
| "healthy": "β", |
| "disabled": "β", |
| }.get(health, "?") |
| |
| display_rows = [] |
| for row in filtered_cron: |
| display_rows.append({ |
| "health": f"{health_color(row['health'])} {row['health']}", |
| "name": row["name"], |
| "enabled": "β" if row["enabled"] else "β", |
| "schedule": row["schedule"], |
| "lastRun": row["lastRun"], |
| "lastStatus": row["lastStatus"], |
| "nextRun": row["nextRun"], |
| "errors": row["consecutiveErrors"] if row["consecutiveErrors"] > 0 else "", |
| "reason": row["healthReason"], |
| }) |
| |
| st.dataframe(display_rows, use_container_width=True, hide_index=True) |
| |
| |
| if critical_jobs or warning_jobs or stale_jobs: |
| st.error(f"**{len(critical_jobs)}** critical, **{len(warning_jobs)}** warning, **{len(stale_jobs)}** stale jobs need attention") |
| else: |
| st.success("All monitored cron jobs are healthy") |
| |
| st.subheader("Running now") |
| if running_cron: |
| st.dataframe(running_cron, use_container_width=True, hide_index=True) |
| else: |
| st.info("No cron jobs currently marked running.") |
|
|
| with tab_activity: |
| st.subheader("Recent transcript activity") |
| st.dataframe( |
| [ |
| { |
| "agent": row["agent"], |
| "updated": row["updated"], |
| "role": row["role"], |
| "sessionKey": row["sessionKey"], |
| "summary": row["summary"], |
| "file": row["file"], |
| } |
| for row in activity_rows[:50] |
| ], |
| use_container_width=True, |
| hide_index=True, |
| ) |
|
|
| with tab_live: |
| st.subheader("Live Session Activity Stream") |
|
|
| |
| sidecar_sessions = _fetch_sidecar_json("/sessions/recent?limit=30") or {} |
| sidecar_session_list = sidecar_sessions.get("sessions") or [] |
|
|
| |
| if sidecar_status == "reachable": |
| st.info("Session Amplifier reachable but not yet warmed up β spool may be empty") |
| elif sidecar_status == "unavailable": |
| st.warning("Session Amplifier unavailable. Deploy: cd sidecar/session-amplifier && docker compose up -d") |
|
|
| st.caption("Normalized activity feed from Session Amplifier. " |
| "For continuous streaming, use: python scripts/session_amplifier_live_monitor.py") |
|
|
| |
| if sidecar_session_list: |
| sidecar_opts = [ |
| f"[SA] {s.get('agent_id','?')}: {clean_session_label(s.get('display_title') or '', fallback=s.get('session_id','?')[:32])}" |
| for s in sidecar_session_list[:20] |
| ] |
| sidecar_map = {opt: s for opt, s in zip(sidecar_opts, sidecar_session_list[:20])} |
| use_sidecar = True |
| else: |
| recent = [r for r in session_rows if r["updatedAt"] and r["updatedAt"] > datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=24)] |
| sidecar_opts = [] |
| sidecar_map = {} |
| use_sidecar = False |
|
|
| |
| recent_sessions = [r for r in session_rows if r["updatedAt"] and r["updatedAt"] > datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=24)] |
| dash_opts = [f"{r['agent']}: {session_display_label(r)}" for r in recent_sessions[:20]] |
| dash_map = {opt: r for opt, r in zip(dash_opts, recent_sessions[:20])} |
|
|
| |
| all_opts = sidecar_opts + dash_opts |
| all_map = {**sidecar_map, **dash_map} |
|
|
| if all_opts: |
| selected = st.selectbox("Select session to monitor", all_opts, index=0) |
| selected_item = all_map[selected] |
|
|
| if use_sidecar and selected in sidecar_map: |
| sel_session_id = selected_item["session_id"] |
| sel_agent = selected_item["agent_id"] |
| sel_title = clean_session_label(selected_item.get("display_title") or "", fallback=sel_session_id) |
| sel_updated = fmt_local(selected_item.get("last_event_at")) |
| sel_health = selected_item.get("health", "ok").upper() |
| sel_hints = "; ".join(selected_item.get("hints", []) or []) |
| st.caption(f"Health: {sel_health} | Events: {selected_item.get('event_count',0)} | Tools: {selected_item.get('tool_result_count',0)} | Errors: {selected_item.get('error_count',0)}") |
| if sel_hints: |
| st.caption(f"Hints: {sel_hints}") |
|
|
| |
| activity_data = _fetch_sidecar_json(f"/session/{sel_session_id}/activity?limit=200") |
| activities = activity_data.get("activity", []) if activity_data else [] |
| else: |
| if isinstance(selected_item, dict) and "sessionKey" in selected_item: |
| sel_session_id = selected_item["sessionKey"] |
| sel_agent = selected_item["agent"] |
| sel_title = session_display_label(selected_item) |
| sel_updated = selected_item["updated"] |
| else: |
| sel_session_id = "?" |
| sel_agent = "?" |
| sel_title = "?" |
| sel_updated = "?" |
| st.caption(f"Dashboard session β may not appear in Session Amplifier spool") |
| activities = get_live_session_activity(sel_session_id, sel_agent, max_lines=200) |
|
|
| col1, col2, col3 = st.columns([1, 1, 1]) |
| with col1: |
| st.write(f"**Agent:** {sel_agent}") |
| with col2: |
| st.write(f"**Session:** {sel_title}") |
| with col3: |
| st.write(f"**Last event:** {sel_updated}") |
|
|
| if activities: |
| current_bucket = None |
| for act in reversed(activities[-50:]): |
| ts = act.get("timestamp") |
| if ts: |
| try: |
| bucket = ts.replace(minute=(ts.minute // 5) * 5, second=0, microsecond=0) |
| if bucket != current_bucket: |
| current_bucket = bucket |
| st.divider() |
| st.caption(f"π {fmt_local(bucket)}") |
| except Exception: |
| pass |
|
|
| evtype = act.get("type") or act.get("event_type", "") |
| ts_str = act.get("time") or "" |
|
|
| if evtype == "thinking": |
| with st.expander(f"π§ {ts_str} β Thinking", expanded=False): |
| st.text(act.get("details", "")[:1000] or "(no details)") |
| elif evtype == "tool": |
| st.markdown(f"**π§ {ts_str}** β {act.get('summary', '')}") |
| if act.get("details"): |
| st.code(act["details"][:500], language="bash") |
| elif evtype == "tool_call": |
| st.markdown(f"**β {ts_str}** β {act.get('tool_name', '')} β {act.get('summary', '')}") |
| elif evtype == "tool_result" or evtype == "result": |
| icon = "β" if act.get("is_error") else "β
" |
| st.markdown(f"**{icon} {ts_str}** β {act.get('summary', '')}") |
| if act.get("details"): |
| st.text(act["details"][:300]) |
| elif evtype == "tool_error": |
| st.error(f"β {ts_str} β {act.get('summary', 'tool error')}") |
| if act.get("details"): |
| st.text(act["details"][:300]) |
| elif evtype == "assistant_meta": |
| st.markdown(f"**π‘ {ts_str}** β {act.get('summary', '')[:120]}") |
| elif evtype == "assistant_thinking": |
| with st.expander(f"π§ {ts_str} β Thinking", expanded=False): |
| st.text(act.get("details", "")[:500] or "(no details)") |
| elif evtype == "assistant_text": |
| st.markdown(f"**π¬ {ts_str}** β {act.get('summary', '')[:120]}") |
| elif evtype == "user_message": |
| with st.expander(f"π€ {ts_str} β User message", expanded=False): |
| st.text(act.get("details", "")[:300]) |
| else: |
| st.caption(f"[{evtype or '?'}] {ts_str} β {act.get('summary', '')[:80]}") |
| else: |
| st.info("No activity found for this session (may be archived or not yet written)") |
| else: |
| st.info("No recent sessions found in the last 24 hours") |
|
|
| with tab_model_ops: |
| render_model_ops_tab(session_rows) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|