from __future__ import annotations import logging from datetime import datetime, timezone from typing import Iterable from spooler.store import get_recent_sessions, get_session_activity SNAPSHOT_SCHEMA = "openclaw.session.v1" log = logging.getLogger("session_snapshot") def _now() -> str: return datetime.now(timezone.utc).isoformat() def _parse_ts(value: str | None) -> datetime | None: if not value: return None try: return datetime.fromisoformat(value.replace("Z", "+00:00")) except (TypeError, ValueError): log.debug("invalid session timestamp", extra={"timestamp_value": value}) return None def _age_seconds(value: str | None) -> int | None: ts = _parse_ts(value) if not ts: return None if ts.tzinfo is None: ts = ts.replace(tzinfo=timezone.utc) return max(0, int((datetime.now(timezone.utc) - ts).total_seconds())) def _classify_activity(row: dict) -> dict: role = row.get("role") or "" tool_name = row.get("tool_name") or "" clean_text = row.get("clean_text") or "" preview = row.get("preview") or clean_text[:300] is_error = bool(row.get("is_error")) if role == "toolResult": event_type = "tool_error" if is_error else "tool_result" summary = f"✗ {tool_name or 'tool'}" if is_error else f"✓ {tool_name or 'tool'}" elif role == "assistant": event_type = "assistant_text" summary = preview[:140] or "assistant" elif role == "user": event_type = "user_message" summary = preview[:140] or "user" elif role in ("system", "developer"): event_type = role summary = preview[:120] or role else: event_type = "event" summary = preview[:120] or role or "event" return { "timestamp": row.get("timestamp") or row.get("indexed_at"), "entry_idx": row.get("entry_idx"), "event_type": event_type, "role": role, "tool_name": tool_name, "summary": summary, "is_error": is_error, } def _risk_flags(activity: Iterable[dict]) -> list[str]: flags: set[str] = set() for row in activity: text = f"{row.get('clean_text') or ''}\n{row.get('preview') or ''}".lower() if any(token in text for token in ("approve", "approval", "permission")): flags.add("approval_or_permission") if any(token in text for token in ("delete", "rm -rf", "drop table", "destroy", "rollback")): flags.add("destructive_or_rollback_language") if any(token in text for token in ("public deploy", "dns", "domain", "cloud run", "gcloud")): flags.add("external_infra") if any(token in text for token in ("api key", "secret", "password", "token")): flags.add("secret_sensitive") return sorted(flags) def _health_from_summary(summary: dict, last_event_age_seconds: int | None) -> dict: error_count = int(summary.get("error_count") or 0) event_count = int(summary.get("event_count") or 0) noisy = int(summary.get("noisy_tool_results") or 0) state = "active" reasons: list[str] = [] if last_event_age_seconds is not None and last_event_age_seconds > 30 * 24 * 60 * 60: state = "stale" reasons.append("no_recent_activity_30d") elif error_count > 0: state = "warning" reasons.append("errors_present") elif noisy > 2: state = "warning" reasons.append("noisy_tool_outputs") elif event_count == 0: state = "unknown" reasons.append("no_spooled_events") return { "state": state, "reasons": reasons, "last_event_age_seconds": last_event_age_seconds, } def build_session_snapshot(session_summary: dict, activity_limit: int = 80) -> dict: """Build a canonical, backend-neutral OpenClaw session snapshot. This is intentionally sidecar-local and read-only. It gives dashboards and reviewers one stable shape without requiring gateway core changes. """ session_id = session_summary.get("session_id") activity = get_session_activity(session_id, activity_limit) if session_id else [] recent_events = [_classify_activity(row) for row in activity] last_event_at = session_summary.get("last_event_at") age = _age_seconds(last_event_at) tool_result_count = int(session_summary.get("tool_result_count") or 0) event_count = int(session_summary.get("event_count") or 0) error_count = int(session_summary.get("error_count") or 0) tool_ratio = round(tool_result_count / event_count, 3) if event_count else 0.0 pressure_flags = [] if event_count > 1000: pressure_flags.append("long_running_session") if tool_ratio > 0.7: pressure_flags.append("tool_heavy") if int(session_summary.get("noisy_tool_results") or 0) > 2: pressure_flags.append("noisy_tool_outputs") return { "schema": SNAPSHOT_SCHEMA, "generated_at": _now(), "id": session_id, "kind": "transcript_session", "owner": {"agent_id": session_summary.get("agent_id")}, "state": _health_from_summary(session_summary, age), "runtime": { "adapter": "session-amplifier-spooler", "source": "spooled_transcript", }, "health": { "event_count": event_count, "tool_result_count": tool_result_count, "error_count": error_count, "tool_ratio": tool_ratio, "pressure_flags": pressure_flags, "hints": session_summary.get("hints") or [], }, "risk": { "flags": _risk_flags(activity), "policy": "signal_only_non_destructive", }, "outputs": { "last_event_at": last_event_at, "last_entry_idx": session_summary.get("last_entry_idx"), "recent_events": recent_events, }, "rollback": { "note": "Snapshot generation is read-only; remove session_snapshot.py and route imports to roll back this sidecar feature.", }, } def build_recent_session_snapshots(limit: int = 40, activity_limit: int = 80) -> dict: sessions = get_recent_sessions(limit) return { "schema": f"{SNAPSHOT_SCHEMA}.collection", "generated_at": _now(), "count": len(sessions), "snapshots": [build_session_snapshot(row, activity_limit=activity_limit) for row in sessions], }