| from __future__ import annotations |
|
|
| import logging |
| from datetime import datetime, timezone |
| from typing import Iterable |
|
|
| from spooler.store import get_recent_sessions, get_session_activity |
|
|
|
|
| SNAPSHOT_SCHEMA = "openclaw.session.v1" |
| log = logging.getLogger("session_snapshot") |
|
|
|
|
| def _now() -> str: |
| return datetime.now(timezone.utc).isoformat() |
|
|
|
|
| def _parse_ts(value: str | None) -> datetime | None: |
| if not value: |
| return None |
| try: |
| return datetime.fromisoformat(value.replace("Z", "+00:00")) |
| except (TypeError, ValueError): |
| log.debug("invalid session timestamp", extra={"timestamp_value": value}) |
| return None |
|
|
|
|
| def _age_seconds(value: str | None) -> int | None: |
| ts = _parse_ts(value) |
| if not ts: |
| return None |
| if ts.tzinfo is None: |
| ts = ts.replace(tzinfo=timezone.utc) |
| return max(0, int((datetime.now(timezone.utc) - ts).total_seconds())) |
|
|
|
|
| def _classify_activity(row: dict) -> dict: |
| role = row.get("role") or "" |
| tool_name = row.get("tool_name") or "" |
| clean_text = row.get("clean_text") or "" |
| preview = row.get("preview") or clean_text[:300] |
| is_error = bool(row.get("is_error")) |
|
|
| if role == "toolResult": |
| event_type = "tool_error" if is_error else "tool_result" |
| summary = f"✗ {tool_name or 'tool'}" if is_error else f"✓ {tool_name or 'tool'}" |
| elif role == "assistant": |
| event_type = "assistant_text" |
| summary = preview[:140] or "assistant" |
| elif role == "user": |
| event_type = "user_message" |
| summary = preview[:140] or "user" |
| elif role in ("system", "developer"): |
| event_type = role |
| summary = preview[:120] or role |
| else: |
| event_type = "event" |
| summary = preview[:120] or role or "event" |
|
|
| return { |
| "timestamp": row.get("timestamp") or row.get("indexed_at"), |
| "entry_idx": row.get("entry_idx"), |
| "event_type": event_type, |
| "role": role, |
| "tool_name": tool_name, |
| "summary": summary, |
| "is_error": is_error, |
| } |
|
|
|
|
| def _risk_flags(activity: Iterable[dict]) -> list[str]: |
| flags: set[str] = set() |
| for row in activity: |
| text = f"{row.get('clean_text') or ''}\n{row.get('preview') or ''}".lower() |
| if any(token in text for token in ("approve", "approval", "permission")): |
| flags.add("approval_or_permission") |
| if any(token in text for token in ("delete", "rm -rf", "drop table", "destroy", "rollback")): |
| flags.add("destructive_or_rollback_language") |
| if any(token in text for token in ("public deploy", "dns", "domain", "cloud run", "gcloud")): |
| flags.add("external_infra") |
| if any(token in text for token in ("api key", "secret", "password", "token")): |
| flags.add("secret_sensitive") |
| return sorted(flags) |
|
|
|
|
| def _health_from_summary(summary: dict, last_event_age_seconds: int | None) -> dict: |
| error_count = int(summary.get("error_count") or 0) |
| event_count = int(summary.get("event_count") or 0) |
| noisy = int(summary.get("noisy_tool_results") or 0) |
| state = "active" |
| reasons: list[str] = [] |
|
|
| if last_event_age_seconds is not None and last_event_age_seconds > 30 * 24 * 60 * 60: |
| state = "stale" |
| reasons.append("no_recent_activity_30d") |
| elif error_count > 0: |
| state = "warning" |
| reasons.append("errors_present") |
| elif noisy > 2: |
| state = "warning" |
| reasons.append("noisy_tool_outputs") |
| elif event_count == 0: |
| state = "unknown" |
| reasons.append("no_spooled_events") |
|
|
| return { |
| "state": state, |
| "reasons": reasons, |
| "last_event_age_seconds": last_event_age_seconds, |
| } |
|
|
|
|
| def build_session_snapshot(session_summary: dict, activity_limit: int = 80) -> dict: |
| """Build a canonical, backend-neutral OpenClaw session snapshot. |
| |
| This is intentionally sidecar-local and read-only. It gives dashboards and |
| reviewers one stable shape without requiring gateway core changes. |
| """ |
| session_id = session_summary.get("session_id") |
| activity = get_session_activity(session_id, activity_limit) if session_id else [] |
| recent_events = [_classify_activity(row) for row in activity] |
| last_event_at = session_summary.get("last_event_at") |
| age = _age_seconds(last_event_at) |
| tool_result_count = int(session_summary.get("tool_result_count") or 0) |
| event_count = int(session_summary.get("event_count") or 0) |
| error_count = int(session_summary.get("error_count") or 0) |
| tool_ratio = round(tool_result_count / event_count, 3) if event_count else 0.0 |
|
|
| pressure_flags = [] |
| if event_count > 1000: |
| pressure_flags.append("long_running_session") |
| if tool_ratio > 0.7: |
| pressure_flags.append("tool_heavy") |
| if int(session_summary.get("noisy_tool_results") or 0) > 2: |
| pressure_flags.append("noisy_tool_outputs") |
|
|
| return { |
| "schema": SNAPSHOT_SCHEMA, |
| "generated_at": _now(), |
| "id": session_id, |
| "kind": "transcript_session", |
| "owner": {"agent_id": session_summary.get("agent_id")}, |
| "state": _health_from_summary(session_summary, age), |
| "runtime": { |
| "adapter": "session-amplifier-spooler", |
| "source": "spooled_transcript", |
| }, |
| "health": { |
| "event_count": event_count, |
| "tool_result_count": tool_result_count, |
| "error_count": error_count, |
| "tool_ratio": tool_ratio, |
| "pressure_flags": pressure_flags, |
| "hints": session_summary.get("hints") or [], |
| }, |
| "risk": { |
| "flags": _risk_flags(activity), |
| "policy": "signal_only_non_destructive", |
| }, |
| "outputs": { |
| "last_event_at": last_event_at, |
| "last_entry_idx": session_summary.get("last_entry_idx"), |
| "recent_events": recent_events, |
| }, |
| "rollback": { |
| "note": "Snapshot generation is read-only; remove session_snapshot.py and route imports to roll back this sidecar feature.", |
| }, |
| } |
|
|
|
|
| def build_recent_session_snapshots(limit: int = 40, activity_limit: int = 80) -> dict: |
| sessions = get_recent_sessions(limit) |
| return { |
| "schema": f"{SNAPSHOT_SCHEMA}.collection", |
| "generated_at": _now(), |
| "count": len(sessions), |
| "snapshots": [build_session_snapshot(row, activity_limit=activity_limit) for row in sessions], |
| } |
|
|