openclaw-session-amplifier / reviewer /session_snapshot.py
Ordo
Initial public release
63c75d5
from __future__ import annotations
import logging
from datetime import datetime, timezone
from typing import Iterable
from spooler.store import get_recent_sessions, get_session_activity
SNAPSHOT_SCHEMA = "openclaw.session.v1"
log = logging.getLogger("session_snapshot")
def _now() -> str:
return datetime.now(timezone.utc).isoformat()
def _parse_ts(value: str | None) -> datetime | None:
if not value:
return None
try:
return datetime.fromisoformat(value.replace("Z", "+00:00"))
except (TypeError, ValueError):
log.debug("invalid session timestamp", extra={"timestamp_value": value})
return None
def _age_seconds(value: str | None) -> int | None:
ts = _parse_ts(value)
if not ts:
return None
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
return max(0, int((datetime.now(timezone.utc) - ts).total_seconds()))
def _classify_activity(row: dict) -> dict:
role = row.get("role") or ""
tool_name = row.get("tool_name") or ""
clean_text = row.get("clean_text") or ""
preview = row.get("preview") or clean_text[:300]
is_error = bool(row.get("is_error"))
if role == "toolResult":
event_type = "tool_error" if is_error else "tool_result"
summary = f"✗ {tool_name or 'tool'}" if is_error else f"✓ {tool_name or 'tool'}"
elif role == "assistant":
event_type = "assistant_text"
summary = preview[:140] or "assistant"
elif role == "user":
event_type = "user_message"
summary = preview[:140] or "user"
elif role in ("system", "developer"):
event_type = role
summary = preview[:120] or role
else:
event_type = "event"
summary = preview[:120] or role or "event"
return {
"timestamp": row.get("timestamp") or row.get("indexed_at"),
"entry_idx": row.get("entry_idx"),
"event_type": event_type,
"role": role,
"tool_name": tool_name,
"summary": summary,
"is_error": is_error,
}
def _risk_flags(activity: Iterable[dict]) -> list[str]:
flags: set[str] = set()
for row in activity:
text = f"{row.get('clean_text') or ''}\n{row.get('preview') or ''}".lower()
if any(token in text for token in ("approve", "approval", "permission")):
flags.add("approval_or_permission")
if any(token in text for token in ("delete", "rm -rf", "drop table", "destroy", "rollback")):
flags.add("destructive_or_rollback_language")
if any(token in text for token in ("public deploy", "dns", "domain", "cloud run", "gcloud")):
flags.add("external_infra")
if any(token in text for token in ("api key", "secret", "password", "token")):
flags.add("secret_sensitive")
return sorted(flags)
def _health_from_summary(summary: dict, last_event_age_seconds: int | None) -> dict:
error_count = int(summary.get("error_count") or 0)
event_count = int(summary.get("event_count") or 0)
noisy = int(summary.get("noisy_tool_results") or 0)
state = "active"
reasons: list[str] = []
if last_event_age_seconds is not None and last_event_age_seconds > 30 * 24 * 60 * 60:
state = "stale"
reasons.append("no_recent_activity_30d")
elif error_count > 0:
state = "warning"
reasons.append("errors_present")
elif noisy > 2:
state = "warning"
reasons.append("noisy_tool_outputs")
elif event_count == 0:
state = "unknown"
reasons.append("no_spooled_events")
return {
"state": state,
"reasons": reasons,
"last_event_age_seconds": last_event_age_seconds,
}
def build_session_snapshot(session_summary: dict, activity_limit: int = 80) -> dict:
"""Build a canonical, backend-neutral OpenClaw session snapshot.
This is intentionally sidecar-local and read-only. It gives dashboards and
reviewers one stable shape without requiring gateway core changes.
"""
session_id = session_summary.get("session_id")
activity = get_session_activity(session_id, activity_limit) if session_id else []
recent_events = [_classify_activity(row) for row in activity]
last_event_at = session_summary.get("last_event_at")
age = _age_seconds(last_event_at)
tool_result_count = int(session_summary.get("tool_result_count") or 0)
event_count = int(session_summary.get("event_count") or 0)
error_count = int(session_summary.get("error_count") or 0)
tool_ratio = round(tool_result_count / event_count, 3) if event_count else 0.0
pressure_flags = []
if event_count > 1000:
pressure_flags.append("long_running_session")
if tool_ratio > 0.7:
pressure_flags.append("tool_heavy")
if int(session_summary.get("noisy_tool_results") or 0) > 2:
pressure_flags.append("noisy_tool_outputs")
return {
"schema": SNAPSHOT_SCHEMA,
"generated_at": _now(),
"id": session_id,
"kind": "transcript_session",
"owner": {"agent_id": session_summary.get("agent_id")},
"state": _health_from_summary(session_summary, age),
"runtime": {
"adapter": "session-amplifier-spooler",
"source": "spooled_transcript",
},
"health": {
"event_count": event_count,
"tool_result_count": tool_result_count,
"error_count": error_count,
"tool_ratio": tool_ratio,
"pressure_flags": pressure_flags,
"hints": session_summary.get("hints") or [],
},
"risk": {
"flags": _risk_flags(activity),
"policy": "signal_only_non_destructive",
},
"outputs": {
"last_event_at": last_event_at,
"last_entry_idx": session_summary.get("last_entry_idx"),
"recent_events": recent_events,
},
"rollback": {
"note": "Snapshot generation is read-only; remove session_snapshot.py and route imports to roll back this sidecar feature.",
},
}
def build_recent_session_snapshots(limit: int = 40, activity_limit: int = 80) -> dict:
sessions = get_recent_sessions(limit)
return {
"schema": f"{SNAPSHOT_SCHEMA}.collection",
"generated_at": _now(),
"count": len(sessions),
"snapshots": [build_session_snapshot(row, activity_limit=activity_limit) for row in sessions],
}