File size: 6,459 Bytes
63c75d5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 | from __future__ import annotations
import logging
from datetime import datetime, timezone
from typing import Iterable
from spooler.store import get_recent_sessions, get_session_activity
SNAPSHOT_SCHEMA = "openclaw.session.v1"
log = logging.getLogger("session_snapshot")
def _now() -> str:
return datetime.now(timezone.utc).isoformat()
def _parse_ts(value: str | None) -> datetime | None:
if not value:
return None
try:
return datetime.fromisoformat(value.replace("Z", "+00:00"))
except (TypeError, ValueError):
log.debug("invalid session timestamp", extra={"timestamp_value": value})
return None
def _age_seconds(value: str | None) -> int | None:
ts = _parse_ts(value)
if not ts:
return None
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
return max(0, int((datetime.now(timezone.utc) - ts).total_seconds()))
def _classify_activity(row: dict) -> dict:
role = row.get("role") or ""
tool_name = row.get("tool_name") or ""
clean_text = row.get("clean_text") or ""
preview = row.get("preview") or clean_text[:300]
is_error = bool(row.get("is_error"))
if role == "toolResult":
event_type = "tool_error" if is_error else "tool_result"
summary = f"✗ {tool_name or 'tool'}" if is_error else f"✓ {tool_name or 'tool'}"
elif role == "assistant":
event_type = "assistant_text"
summary = preview[:140] or "assistant"
elif role == "user":
event_type = "user_message"
summary = preview[:140] or "user"
elif role in ("system", "developer"):
event_type = role
summary = preview[:120] or role
else:
event_type = "event"
summary = preview[:120] or role or "event"
return {
"timestamp": row.get("timestamp") or row.get("indexed_at"),
"entry_idx": row.get("entry_idx"),
"event_type": event_type,
"role": role,
"tool_name": tool_name,
"summary": summary,
"is_error": is_error,
}
def _risk_flags(activity: Iterable[dict]) -> list[str]:
flags: set[str] = set()
for row in activity:
text = f"{row.get('clean_text') or ''}\n{row.get('preview') or ''}".lower()
if any(token in text for token in ("approve", "approval", "permission")):
flags.add("approval_or_permission")
if any(token in text for token in ("delete", "rm -rf", "drop table", "destroy", "rollback")):
flags.add("destructive_or_rollback_language")
if any(token in text for token in ("public deploy", "dns", "domain", "cloud run", "gcloud")):
flags.add("external_infra")
if any(token in text for token in ("api key", "secret", "password", "token")):
flags.add("secret_sensitive")
return sorted(flags)
def _health_from_summary(summary: dict, last_event_age_seconds: int | None) -> dict:
error_count = int(summary.get("error_count") or 0)
event_count = int(summary.get("event_count") or 0)
noisy = int(summary.get("noisy_tool_results") or 0)
state = "active"
reasons: list[str] = []
if last_event_age_seconds is not None and last_event_age_seconds > 30 * 24 * 60 * 60:
state = "stale"
reasons.append("no_recent_activity_30d")
elif error_count > 0:
state = "warning"
reasons.append("errors_present")
elif noisy > 2:
state = "warning"
reasons.append("noisy_tool_outputs")
elif event_count == 0:
state = "unknown"
reasons.append("no_spooled_events")
return {
"state": state,
"reasons": reasons,
"last_event_age_seconds": last_event_age_seconds,
}
def build_session_snapshot(session_summary: dict, activity_limit: int = 80) -> dict:
"""Build a canonical, backend-neutral OpenClaw session snapshot.
This is intentionally sidecar-local and read-only. It gives dashboards and
reviewers one stable shape without requiring gateway core changes.
"""
session_id = session_summary.get("session_id")
activity = get_session_activity(session_id, activity_limit) if session_id else []
recent_events = [_classify_activity(row) for row in activity]
last_event_at = session_summary.get("last_event_at")
age = _age_seconds(last_event_at)
tool_result_count = int(session_summary.get("tool_result_count") or 0)
event_count = int(session_summary.get("event_count") or 0)
error_count = int(session_summary.get("error_count") or 0)
tool_ratio = round(tool_result_count / event_count, 3) if event_count else 0.0
pressure_flags = []
if event_count > 1000:
pressure_flags.append("long_running_session")
if tool_ratio > 0.7:
pressure_flags.append("tool_heavy")
if int(session_summary.get("noisy_tool_results") or 0) > 2:
pressure_flags.append("noisy_tool_outputs")
return {
"schema": SNAPSHOT_SCHEMA,
"generated_at": _now(),
"id": session_id,
"kind": "transcript_session",
"owner": {"agent_id": session_summary.get("agent_id")},
"state": _health_from_summary(session_summary, age),
"runtime": {
"adapter": "session-amplifier-spooler",
"source": "spooled_transcript",
},
"health": {
"event_count": event_count,
"tool_result_count": tool_result_count,
"error_count": error_count,
"tool_ratio": tool_ratio,
"pressure_flags": pressure_flags,
"hints": session_summary.get("hints") or [],
},
"risk": {
"flags": _risk_flags(activity),
"policy": "signal_only_non_destructive",
},
"outputs": {
"last_event_at": last_event_at,
"last_entry_idx": session_summary.get("last_entry_idx"),
"recent_events": recent_events,
},
"rollback": {
"note": "Snapshot generation is read-only; remove session_snapshot.py and route imports to roll back this sidecar feature.",
},
}
def build_recent_session_snapshots(limit: int = 40, activity_limit: int = 80) -> dict:
sessions = get_recent_sessions(limit)
return {
"schema": f"{SNAPSHOT_SCHEMA}.collection",
"generated_at": _now(),
"count": len(sessions),
"snapshots": [build_session_snapshot(row, activity_limit=activity_limit) for row in sessions],
}
|