Ordo commited on
Commit
63c75d5
·
0 Parent(s):

Initial public release

Browse files
.env.example ADDED
@@ -0,0 +1,11 @@
 
 
 
 
 
 
 
 
 
 
 
 
1
+ OPENCLAW_AGENTS_ROOT=~/.openclaw/agents
2
+ OPENCLAW_STATE_DIR=~/.openclaw/workspace/ops/state
3
+ SESSION_AMPLIFIER_DB_PATH=~/.openclaw/workspace/ops/state/session_amplifier.sqlite
4
+ AGENTS_ALLOWLIST=
5
+ SESSION_GLOB_EXCLUDE=
6
+ MAX_TOOLRESULT_CHARS=2000
7
+ SPOOLER_BATCH_SIZE=100
8
+ SPOOLER_REDACT_PATTERNS=api_key,path,base64
9
+ TOOL_NOISE_PATTERNS=ENOENT,no output,command exited
10
+ REVIEW_CONFIDENCE_THRESHOLD=0.5
11
+ API_PORT=8477
.gitignore ADDED
@@ -0,0 +1,14 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ .env
2
+ .env.*
3
+ !.env.example
4
+ __pycache__/
5
+ *.py[cod]
6
+ .pytest_cache/
7
+ .mypy_cache/
8
+ .ruff_cache/
9
+ .venv/
10
+ venv/
11
+ *.db
12
+ *.sqlite
13
+ *.log
14
+ /state/
Dockerfile ADDED
@@ -0,0 +1,19 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ FROM python:3.12-slim
2
+
3
+ WORKDIR /app
4
+
5
+ COPY requirements.txt .
6
+ RUN pip install --no-cache-dir -r requirements.txt
7
+
8
+ COPY . .
9
+
10
+ ENV OPENCLAW_AGENTS_ROOT=/openclaw/agents
11
+ ENV OPENCLAW_STATE_DIR=/data/session-amplifier
12
+ ENV MAX_TOOLRESULT_CHARS=2000
13
+ ENV SPOOLER_BATCH_SIZE=100
14
+ ENV REVIEW_CONFIDENCE_THRESHOLD=0.5
15
+ ENV API_PORT=8477
16
+
17
+ EXPOSE 8477
18
+
19
+ ENTRYPOINT ["python", "main.py"]
LICENSE ADDED
@@ -0,0 +1,21 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ MIT License
2
+
3
+ Copyright (c) 2026 Patrick
4
+
5
+ Permission is hereby granted, free of charge, to any person obtaining a copy
6
+ of this software and associated documentation files (the "Software"), to deal
7
+ in the Software without restriction, including without limitation the rights
8
+ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9
+ copies of the Software, and to permit persons to whom the Software is
10
+ furnished to do so, subject to the following conditions:
11
+
12
+ The above copyright notice and this permission notice shall be included in all
13
+ copies or substantial portions of the Software.
14
+
15
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18
+ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20
+ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21
+ SOFTWARE.
README.md ADDED
@@ -0,0 +1,128 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Session Amplifier
2
+
3
+ Lightweight OpenClaw sidecar for transcript spooling and session review.
4
+
5
+ ## What it does
6
+
7
+ - **Spooler** — walks `~/.openclaw/agents/<agent>/sessions/*.jsonl`, cleans/redacts tool results, stores summaries to SQLite
8
+ - **Reviewer** — scores sessions for quality/failure patterns, detects unused tools/skills, surfaces recommendations
9
+ - **Snapshot adapter** — exposes canonical `openclaw.session.v1` session snapshots for dashboards/reviewers without gateway core patches
10
+ - **API** — FastAPI server on port 8477 with 4 endpoints
11
+
12
+ ## Quick start
13
+
14
+ ```bash
15
+ # Install deps
16
+ pip install -r requirements.txt
17
+
18
+ # Init DB
19
+ python main.py init
20
+
21
+ # Run once (spool or review)
22
+ python main.py spool
23
+ python main.py review
24
+
25
+ # Start API server
26
+ python main.py serve
27
+
28
+ # Watch transcript files and spool on changes
29
+ python main.py watch --interval 5
30
+ ```
31
+
32
+ ## Running Tests
33
+
34
+ The application uses `pytest` for testing. The tests use an in-memory SQLite database and do not require filesystem access.
35
+
36
+ ```bash
37
+ # Run all tests
38
+ pytest tests/
39
+ ```
40
+
41
+ ## Docker / container run path
42
+
43
+ ```bash
44
+ cd sidecar/session-amplifier
45
+ docker compose up -d --build
46
+ ```
47
+
48
+ Manual equivalent:
49
+
50
+ ```bash
51
+ docker build -t session-amplifier sidecar/session-amplifier/
52
+ docker run -p 8477:8477 \
53
+ -v ~/.openclaw:/openclaw:ro \
54
+ -v session_amplifier_state:/data/session-amplifier \
55
+ -e OPENCLAW_AGENTS_ROOT=/openclaw/agents \
56
+ -e OPENCLAW_STATE_DIR=/data/session-amplifier \
57
+ session-amplifier serve
58
+ ```
59
+
60
+ Host note: the bare host Python environment may not have `uvicorn` / `fastapi` installed. The intended live-service path is the container.
61
+
62
+ Gateway integration note: when OpenClaw runs in Docker, gateway-side scripts should target the sidecar by container hostname, not `localhost`. Default wrapper target is `SESSION_AMPLIFIER_BASE_URL=http://session-amplifier:8477`. The sidecar compose file joins the external `librechat_default` network so the gateway container can resolve it.
63
+
64
+ ## API endpoints
65
+
66
+ | Method | Path | Description |
67
+ |--------|------|-------------|
68
+ | GET | `/health` | Container health + version |
69
+ | POST | `/spool` | Trigger incremental spooling |
70
+ | GET | `/review/report` | Fetch latest review report |
71
+ | GET | `/review/skills` | Fetch skill/MCP coverage report |
72
+ | GET | `/sessions/recent` | Recent sessions with activity/error counts |
73
+ | GET | `/sessions/snapshots` | Recent sessions as canonical `openclaw.session.v1` snapshots |
74
+ | GET | `/session/{id}/snapshot` | One canonical `openclaw.session.v1` snapshot |
75
+ | GET | `/session/{id}/activity` | Normalized per-session activity feed |
76
+
77
+ ## Config (env)
78
+
79
+ | Var | Default | Description |
80
+ |-----|---------|-------------|
81
+ | `OPENCLAW_AGENTS_ROOT` | `~/.openclaw/agents` | Transcript source |
82
+ | `OPENCLAW_STATE_DIR` | `~/.openclaw/workspace/ops/state` | SQLite + artifacts output |
83
+ | `MAX_TOOLRESULT_CHARS` | `2000` | Truncate threshold |
84
+ | `SPOOLER_BATCH_SIZE` | `100` | DB insert batch size |
85
+ | `REVIEW_CONFIDENCE_THRESHOLD` | `0.5` | Min confidence for recommendations |
86
+ | `API_PORT` | `8477` | HTTP server port |
87
+
88
+ ## Package layout
89
+
90
+ ```
91
+ session-amplifier/
92
+ ├── config.py # Env/config loading
93
+ ├── main.py # CLI entrypoint (init/serve/serve-watch/spool/review/watch)
94
+ ├── requirements.txt
95
+ ├── Dockerfile
96
+ ├── spooler/
97
+ │ ├── processor.py # JSONL → spooled rows
98
+ │ ├── redaction.py # API key / path / base64 redaction
99
+ │ ├── noise_filter.py # Drop known-noise tool output
100
+ │ └── store.py # SQLite read/write
101
+ ├── reviewer/
102
+ │ ├── scorer.py # Session quality scoring
103
+ │ ├── pattern_detector.py # Recurring failure detection
104
+ │ ├── skill_analyzer.py # MCP/skill coverage
105
+ │ └── report.py # Report generation + persistence
106
+ ├── api/
107
+ │ └── routes.py # FastAPI route handlers
108
+ └── tests/
109
+ └── ... # pytest suite
110
+ ```
111
+
112
+ ## Architecture notes
113
+
114
+ - Reads-only from `OPENCLAW_AGENTS_ROOT`; never modifies transcripts
115
+ - Idempotent spooling via `UNIQUE(session_id, entry_idx)` constraint
116
+ - **Incremental spooling**: Uses tracked `last_entry_idx` file state to only parse lines appending to existing files, drastically reducing processing time overhead.
117
+ - Reviewer is deterministic (no LLM required in v1); recommendations scored by confidence threshold
118
+ - Watch mode is polling-based in v1 for simplicity; `python main.py watch --interval 5` or `serve-watch`
119
+ - By default `serve-watch` / `watch` only triggers spooling. To optionally trigger the reviewer append `--review-every <N>`.
120
+ - Manual trigger remains available for cron/recovery: `POST /spool` → wait → `GET /review/report`
121
+ - A simple CLI-style live monitor is available at `/home/node/.openclaw/workspace/ops/scripts/session_amplifier_live_monitor.py`
122
+ - Snapshot endpoints are read-only and sidecar-local. Rollback is removing `reviewer/session_snapshot.py`, the two route handlers/imports in `api/routes.py`, and the snapshot smoke test; no gateway config change is required unless the service was rebuilt/redeployed.
123
+
124
+ ## Troubleshooting
125
+
126
+ - **API keys exposed in tool outputs?** Check `SPOOLER_REDACT_PATTERNS`.
127
+ - **Database lock errors?** Multiple cron instances might be racing. Restart with a clean volume or disable concurrent spool calls.
128
+ - **Reporting "No data"?** Ensure `OPENCLAW_AGENTS_ROOT` path exists inside the container and matches host volume bindings.
SECURITY.md ADDED
@@ -0,0 +1,5 @@
 
 
 
 
 
 
1
+ # Security
2
+
3
+ Session transcripts can contain secrets, personal data, and operational details. Use synthetic fixtures for public examples.
4
+
5
+ The sidecar redacts common API-key, path, and base64 patterns, but operators should still treat source transcript directories and generated SQLite databases as private.
api/__init__.py ADDED
@@ -0,0 +1 @@
 
 
1
+ # api package
api/routes.py ADDED
@@ -0,0 +1,355 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import FastAPI, Query, HTTPException
2
+ from fastapi.responses import StreamingResponse
3
+ from datetime import datetime, timezone
4
+ from pydantic import BaseModel
5
+ import asyncio
6
+ import json
7
+ import sqlite3
8
+
9
+ from spooler.processor import run_spool
10
+ from spooler.store import get_recent_sessions, get_session_activity, get_conn, get_session_summary
11
+ from reviewer.report import generate_report, generate_skills_report
12
+ from reviewer.skill_analyzer import find_skill_candidates
13
+ from reviewer.intelligence import (
14
+ generate_context_pressure_report,
15
+ generate_failure_mode_report,
16
+ generate_intelligence_bundle,
17
+ generate_session_sprawl_report,
18
+ )
19
+ from reviewer.session_snapshot import build_recent_session_snapshots, build_session_snapshot
20
+ from api.scheduler import claim_lock, release_lock, get_job_status
21
+ from config import settings
22
+
23
+ app = FastAPI(title="Session Amplifier", version="0.1.0")
24
+
25
+
26
+ @app.get("/health")
27
+ def health():
28
+ conn = get_conn()
29
+ stats = conn.execute(
30
+ "SELECT COUNT(*) AS entries, COUNT(DISTINCT session_id) AS sessions FROM spooled_entries"
31
+ ).fetchone()
32
+ conn.close()
33
+ entries = int(stats["entries"]) if stats else 0
34
+ sessions = int(stats["sessions"]) if stats else 0
35
+ return {
36
+ "status": "ok",
37
+ "version": "0.1.0",
38
+ "db_path": str(settings.db_path),
39
+ "agents_root": str(settings.openclaw_agents_root),
40
+ "spool_ready": entries > 0,
41
+ "entries": entries,
42
+ "sessions": sessions,
43
+ }
44
+
45
+
46
+ @app.post("/spool")
47
+ def spool():
48
+ count, sessions = run_spool()
49
+ return {"entries_spooled": count, "sessions_updated": sessions}
50
+
51
+
52
+ @app.get("/review/report")
53
+ def review_report(since: str | None = Query(None)):
54
+ try:
55
+ report = generate_report(since=since)
56
+ return report
57
+ except Exception as exc:
58
+ raise HTTPException(status_code=500, detail=str(exc))
59
+
60
+
61
+ @app.get("/review/skills")
62
+ def review_skills():
63
+ return generate_skills_report()
64
+
65
+
66
+ @app.get("/review/skills/candidates")
67
+ def review_skill_candidates(
68
+ query: str = Query(..., min_length=3, max_length=500),
69
+ agent_id: str | None = Query(None, max_length=100),
70
+ limit: int = Query(8, ge=1, le=25),
71
+ ):
72
+ return find_skill_candidates(query=query, agent_id=agent_id, limit=limit)
73
+
74
+
75
+ @app.get("/sessions/recent")
76
+ def sessions_recent(limit: int = Query(25, ge=1, le=200)):
77
+ return {
78
+ "generated_at": datetime.now(timezone.utc).isoformat(),
79
+ "sessions": get_recent_sessions(limit),
80
+ }
81
+
82
+
83
+ @app.get("/sessions/active-bulk")
84
+ def sessions_active_bulk(
85
+ limit: int = Query(40, ge=1, le=200),
86
+ activity_limit: int = Query(200, ge=1, le=1000),
87
+ ):
88
+ sessions = get_recent_sessions(limit)
89
+ return {
90
+ "generated_at": datetime.now(timezone.utc).isoformat(),
91
+ "sessions": sessions,
92
+ "activity": {
93
+ row["session_id"]: _normalize_activity_rows(get_session_activity(row["session_id"], activity_limit))
94
+ for row in sessions
95
+ },
96
+ }
97
+
98
+
99
+ @app.get("/sessions/snapshots")
100
+ def sessions_snapshots(
101
+ limit: int = Query(40, ge=1, le=200),
102
+ activity_limit: int = Query(80, ge=1, le=500),
103
+ ):
104
+ """Return canonical OpenClaw session snapshots for recent sessions."""
105
+ return build_recent_session_snapshots(limit=limit, activity_limit=activity_limit)
106
+
107
+
108
+ @app.get("/session/{session_id}/snapshot")
109
+ def session_snapshot(session_id: str, activity_limit: int = Query(80, ge=1, le=500)):
110
+ """Return a canonical OpenClaw session snapshot for one session."""
111
+ summary = get_session_summary(session_id)
112
+ if not summary:
113
+ raise HTTPException(status_code=404, detail="session not found")
114
+ return build_session_snapshot(summary, activity_limit=activity_limit)
115
+
116
+
117
+ @app.get("/reports/session-sprawl")
118
+ def report_session_sprawl(
119
+ limit: int = Query(500, ge=1, le=5000),
120
+ stale_days: int = Query(30, ge=1, le=3650),
121
+ ):
122
+ return generate_session_sprawl_report(limit=limit, stale_days=stale_days)
123
+
124
+
125
+ @app.get("/reports/context-pressure")
126
+ def report_context_pressure(limit: int = Query(200, ge=1, le=5000)):
127
+ return generate_context_pressure_report(limit=limit)
128
+
129
+
130
+ @app.get("/reports/failure-modes")
131
+ def report_failure_modes(limit: int = Query(200, ge=1, le=1000)):
132
+ return generate_failure_mode_report(limit=limit)
133
+
134
+
135
+ @app.post("/review/run")
136
+ def review_run(kind: str = Query("light", pattern="^(light|deep)$")):
137
+ return generate_intelligence_bundle(kind=kind)
138
+
139
+
140
+ def _normalize_activity_rows(rows: list[dict]) -> list[dict]:
141
+ normalized = []
142
+ for row in rows:
143
+ role = row.get("role") or ""
144
+ tool_name = row.get("tool_name") or ""
145
+ clean_text = row.get("clean_text") or ""
146
+ is_error = bool(row.get("is_error"))
147
+ preview = row.get("preview") or ""
148
+
149
+ # Classify event type
150
+ if role == "toolResult":
151
+ if is_error:
152
+ event_type = "tool_error"
153
+ summary = f"✗ {tool_name or 'tool'}"
154
+ else:
155
+ event_type = "tool_result"
156
+ summary = f"✓ {tool_name}" if tool_name else (preview[:80] or "tool result")
157
+ elif tool_name and role in ("assistant", "user"):
158
+ event_type = "tool_call"
159
+ summary = f"→ {tool_name}"
160
+ elif role == "assistant":
161
+ lower = clean_text.strip().lower()
162
+ if lower.startswith(("using", "i'll use", "i will use")) or lower.startswith(("tool call", "calling")):
163
+ event_type = "assistant_meta"
164
+ summary = preview[:120] or "assistant planning"
165
+ elif any(kw in lower[:100] for kw in ("thinking", "reasoning", "analyzing")):
166
+ event_type = "assistant_thinking"
167
+ summary = preview[:120] or "thinking"
168
+ else:
169
+ event_type = "assistant_text"
170
+ summary = preview[:120] or "assistant"
171
+ elif role == "user":
172
+ event_type = "user_message"
173
+ summary = preview[:120] or "user"
174
+ elif role == "system":
175
+ event_type = "system"
176
+ summary = preview[:80] or "system"
177
+ else:
178
+ event_type = "event"
179
+ summary = preview[:80] or str(role) or "event"
180
+
181
+ normalized.append(
182
+ {
183
+ "timestamp": row.get("timestamp") or row.get("indexed_at"),
184
+ "session_id": row.get("session_id"),
185
+ "agent_id": row.get("agent_id"),
186
+ "event_type": event_type,
187
+ "role": role,
188
+ "tool_name": tool_name,
189
+ "summary": summary,
190
+ "details": clean_text[:500] if clean_text else "",
191
+ "is_error": is_error,
192
+ "entry_idx": row.get("entry_idx"),
193
+ }
194
+ )
195
+ return normalized
196
+
197
+
198
+ @app.get("/session/{session_id}/activity")
199
+ def session_activity(session_id: str, limit: int = Query(200, ge=1, le=1000)):
200
+ rows = get_session_activity(session_id, limit)
201
+ return {
202
+ "generated_at": datetime.now(timezone.utc).isoformat(),
203
+ "session_id": session_id,
204
+ "activity": _normalize_activity_rows(rows),
205
+ }
206
+
207
+
208
+ # Pricing per 1M tokens (input, output). Mirror of session_context_report.py MODEL_PRICING.
209
+ _API_MODEL_PRICING = {
210
+ "gpt-5": (2.5, 10.0), "gpt-5-4o": (2.5, 10.0), "gpt-4o": (2.5, 10.0),
211
+ "gpt-4o-mini": (0.15, 0.6), "gpt-4.1": (2.0, 8.0),
212
+ "gpt-4-turbo": (10.0, 30.0), "gpt-4": (30.0, 60.0),
213
+ "claude-opus-4-6": (3.0, 15.0), "claude-sonnet-4-6": (3.0, 15.0),
214
+ "claude-haiku-4-6": (0.8, 4.0), "claude-3-5-sonnet": (3.0, 15.0),
215
+ "claude-3-opus": (15.0, 75.0), "claude-3-sonnet": (3.0, 15.0),
216
+ "deepseek-chat": (0.14, 0.28), "deepseek-reasoner": (0.55, 2.19),
217
+ "gemini-2.5-pro": (1.25, 5.0), "gemini-2.5-flash": (0.075, 0.30),
218
+ "gemini-2.5-flash-lite": (0.075, 0.15),
219
+ "mistral-large": (2.0, 6.0), "mistral-small": (0.15, 0.6),
220
+ "minimax-m2.7": (0.099, 0.396), "minimax-m2": (0.099, 0.396),
221
+ "qwen": (0.5, 2.0), "moonshotai/kimi-k2": (0.5, 1.5),
222
+ "default": (0.1, 0.4),
223
+ }
224
+
225
+
226
+ def _infer_pricing(model: str) -> tuple[float, float]:
227
+ lowered = model.lower()
228
+ for key, price in _API_MODEL_PRICING.items():
229
+ if key.lower() in lowered or lowered in key.lower():
230
+ return price
231
+ return (0.1, 0.4)
232
+
233
+
234
+ @app.get("/session/{session_id}/cost_summary")
235
+ def session_cost_summary(session_id: str):
236
+ """Return token count and cost estimate for a session."""
237
+ import math
238
+ conn = get_conn()
239
+ conn.row_factory = sqlite3.Row # return Row objects for dict-like access
240
+ rows = conn.execute(
241
+ """
242
+ SELECT role, tool_name, clean_text, is_error, entry_type
243
+ FROM spooled_entries
244
+ WHERE session_id = ? OR session_id LIKE ?
245
+ ORDER BY entry_idx ASC
246
+ """,
247
+ (session_id, session_id + "%"),
248
+ ).fetchall()
249
+
250
+ if not rows:
251
+ raise HTTPException(status_code=404, detail="session not found")
252
+
253
+ # Get model from model_change entries
254
+ model = "default"
255
+ for row in reversed(rows):
256
+ if row["entry_type"] == "model_change" or row["role"] == "model_change":
257
+ try:
258
+ obj = json.loads(row["clean_text"] or "{}")
259
+ model = obj.get("modelId", obj.get("provider", "default"))
260
+ except:
261
+ pass
262
+ break
263
+
264
+ inp_price, out_price = _infer_pricing(model)
265
+
266
+ user_tokens = assistant_tokens = tool_result_tokens = error_count = 0
267
+ tool_usage = {}
268
+
269
+ for row in rows:
270
+ role = row["role"] or ""
271
+ text = row["clean_text"] or ""
272
+ tool_name = row["tool_name"] or ""
273
+ is_error = bool(row["is_error"])
274
+ tokens = max(1, math.ceil(len(text) / 4))
275
+
276
+ if role in ("user", "system"):
277
+ user_tokens += tokens
278
+ elif role == "assistant":
279
+ assistant_tokens += tokens
280
+ elif role == "toolResult":
281
+ tool_result_tokens += tokens
282
+ if is_error:
283
+ error_count += 1
284
+ if tool_name:
285
+ if tool_name not in tool_usage:
286
+ tool_usage[tool_name] = {"calls": 0, "result_tokens": 0, "errors": 0}
287
+ tool_usage[tool_name]["result_tokens"] += tokens
288
+ if is_error:
289
+ tool_usage[tool_name]["errors"] += 1
290
+
291
+ input_cost = (user_tokens / 1_000_000) * inp_price
292
+ output_cost = ((assistant_tokens + tool_result_tokens) / 1_000_000) * out_price
293
+
294
+ return {
295
+ "generated_at": datetime.now(timezone.utc).isoformat(),
296
+ "session_id": session_id,
297
+ "model": model,
298
+ "pricing_per_1m": {"input": inp_price, "output": out_price},
299
+ "tokens": {
300
+ "user_input": user_tokens,
301
+ "assistant_output": assistant_tokens,
302
+ "tool_results": tool_result_tokens,
303
+ "total": user_tokens + assistant_tokens + tool_result_tokens,
304
+ },
305
+ "cost_usd": {
306
+ "input": round(input_cost, 4),
307
+ "output": round(output_cost, 4),
308
+ "total": round(input_cost + output_cost, 4),
309
+ },
310
+ "error_count": error_count,
311
+ "tool_usage": tool_usage,
312
+ }
313
+
314
+
315
+ @app.get("/session/{session_id}/stream")
316
+ async def session_stream(session_id: str):
317
+ async def event_generator():
318
+ last_idx = -1
319
+ while True:
320
+ data = session_activity(session_id, limit=200)
321
+ act = data.get("activity", [])
322
+ new_events = [e for e in act if e.get("entry_idx") is not None and e["entry_idx"] > last_idx]
323
+ if new_events:
324
+ new_events.sort(key=lambda x: x["entry_idx"])
325
+ for evt in new_events:
326
+ yield f"data: {json.dumps(evt)}\n\n"
327
+ last_idx = evt["entry_idx"]
328
+ await asyncio.sleep(1.0)
329
+
330
+ return StreamingResponse(event_generator(), media_type="text/event-stream")
331
+
332
+
333
+ class LockRequest(BaseModel):
334
+ owner: str
335
+ ttl_minutes: int = 60
336
+
337
+ class ReleaseRequest(BaseModel):
338
+ owner: str
339
+ status_msg: str = "completed"
340
+
341
+ @app.post("/jobs/{job_name}/lock")
342
+ def api_lock_job(job_name: str, req: LockRequest):
343
+ if claim_lock(job_name, req.owner, req.ttl_minutes):
344
+ return {"status": "ok", "message": "lock acquired"}
345
+ raise HTTPException(status_code=409, detail="lock held by another owner")
346
+
347
+ @app.post("/jobs/{job_name}/release")
348
+ def api_release_job(job_name: str, req: ReleaseRequest):
349
+ if release_lock(job_name, req.owner, req.status_msg):
350
+ return {"status": "ok", "message": "lock released"}
351
+ raise HTTPException(status_code=403, detail="lock not held by owner")
352
+
353
+ @app.get("/jobs")
354
+ def api_list_jobs():
355
+ return {"jobs": get_job_status()}
api/scheduler.py ADDED
@@ -0,0 +1,78 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import time
2
+ from datetime import datetime, timezone, timedelta
3
+ from typing import Optional
4
+
5
+ from spooler.store import get_conn
6
+
7
+ def claim_lock(job_name: str, owner: str, ttl_minutes: int = 60) -> bool:
8
+ """Attempt to claim a distributed lock for a job name. Returns True if claimed."""
9
+ conn = get_conn()
10
+ now_iso = datetime.now(timezone.utc).isoformat()
11
+ until_iso = (datetime.now(timezone.utc) + timedelta(minutes=ttl_minutes)).isoformat()
12
+
13
+ # Check if a valid lock exists
14
+ row = conn.execute(
15
+ "SELECT locked_until FROM scheduler_jobs WHERE job_name = ?",
16
+ (job_name,)
17
+ ).fetchone()
18
+
19
+ if row and row["locked_until"] and row["locked_until"] > now_iso:
20
+ # Lock is currently held and not expired
21
+ conn.close()
22
+ return False
23
+
24
+ # Claim the lock (insert if new, overwrite if expired)
25
+ conn.execute(
26
+ """
27
+ INSERT OR REPLACE INTO scheduler_jobs
28
+ (job_name, last_run_at, locked_until, locked_by, status)
29
+ VALUES (?, COALESCE((SELECT last_run_at FROM scheduler_jobs WHERE job_name = ?), ?), ?, ?, 'locked')
30
+ """,
31
+ (job_name, job_name, now_iso, until_iso, owner)
32
+ )
33
+ conn.commit()
34
+ conn.close()
35
+ return True
36
+
37
+ def release_lock(job_name: str, owner: str, status_msg: str = "completed") -> bool:
38
+ """Release a held lock, providing a status message."""
39
+ conn = get_conn()
40
+ now_iso = datetime.now(timezone.utc).isoformat()
41
+
42
+ # Ensure this owner actually holds the lock
43
+ row = conn.execute(
44
+ "SELECT locked_by FROM scheduler_jobs WHERE job_name = ?",
45
+ (job_name,)
46
+ ).fetchone()
47
+
48
+ if not row or row["locked_by"] != owner:
49
+ conn.close()
50
+ return False
51
+
52
+ conn.execute(
53
+ """
54
+ UPDATE scheduler_jobs
55
+ SET locked_until = NULL,
56
+ locked_by = NULL,
57
+ last_run_at = ?,
58
+ status = ?
59
+ WHERE job_name = ? AND locked_by = ?
60
+ """,
61
+ (now_iso, status_msg, job_name, owner)
62
+ )
63
+ conn.commit()
64
+ conn.close()
65
+ return True
66
+
67
+ def get_job_status(job_name: Optional[str] = None) -> list[dict]:
68
+ """Get status of all jobs or a specific job."""
69
+ conn = get_conn()
70
+ if job_name:
71
+ rows = conn.execute(
72
+ "SELECT * FROM scheduler_jobs WHERE job_name = ?",
73
+ (job_name,)
74
+ ).fetchall()
75
+ else:
76
+ rows = conn.execute("SELECT * FROM scheduler_jobs").fetchall()
77
+ conn.close()
78
+ return [dict(row) for row in rows]
config.py ADDED
@@ -0,0 +1,58 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ from dataclasses import dataclass
3
+ from pathlib import Path
4
+
5
+
6
+ @dataclass
7
+ class Settings:
8
+ # Paths
9
+ openclaw_agents_root: Path
10
+ openclaw_state_dir: Path
11
+ db_path: Path
12
+
13
+ # Agent allowlist (comma-separated; empty = watch all)
14
+ agents_allowlist: list[str]
15
+
16
+ # Glob exclusion patterns for session files (comma-separated; e.g. "*.trajectory.jsonl,*.lock")
17
+ session_glob_exclude: list[str]
18
+
19
+ # Spooler
20
+ max_toolresult_chars: int
21
+ spooler_batch_size: int
22
+ spooler_redact_patterns: str
23
+ tool_noise_patterns: str
24
+
25
+ # Reviewer
26
+ review_confidence_threshold: float
27
+
28
+ # API
29
+ api_port: int
30
+
31
+
32
+ def _env_path(name: str, default: str) -> Path:
33
+ return Path(os.environ.get(name, default)).expanduser()
34
+
35
+
36
+ def load_settings() -> Settings:
37
+ state_dir = _env_path("OPENCLAW_STATE_DIR", "~/.openclaw/workspace/ops/state")
38
+ state_dir.mkdir(parents=True, exist_ok=True)
39
+ raw_allowlist = os.environ.get("AGENTS_ALLOWLIST", "")
40
+ parsed_allowlist = [a.strip() for a in raw_allowlist.split(",") if a.strip()] if raw_allowlist else []
41
+ raw_exclude = os.environ.get("SESSION_GLOB_EXCLUDE", "")
42
+ parsed_exclude = [e.strip() for e in raw_exclude.split(",") if e.strip()] if raw_exclude else []
43
+ return Settings(
44
+ agents_allowlist=parsed_allowlist,
45
+ session_glob_exclude=parsed_exclude,
46
+ openclaw_agents_root=_env_path("OPENCLAW_AGENTS_ROOT", "~/.openclaw/agents"),
47
+ openclaw_state_dir=state_dir,
48
+ db_path=_env_path("SESSION_AMPLIFIER_DB_PATH", str(state_dir / "session_amplifier.sqlite")),
49
+ max_toolresult_chars=int(os.environ.get("MAX_TOOLRESULT_CHARS", "2000")),
50
+ spooler_batch_size=int(os.environ.get("SPOOLER_BATCH_SIZE", "100")),
51
+ spooler_redact_patterns=os.environ.get("SPOOLER_REDACT_PATTERNS", "api_key,path,base64"),
52
+ tool_noise_patterns=os.environ.get("TOOL_NOISE_PATTERNS", "ENOENT,no output,command exited"),
53
+ review_confidence_threshold=float(os.environ.get("REVIEW_CONFIDENCE_THRESHOLD", "0.5")),
54
+ api_port=int(os.environ.get("API_PORT", "8477")),
55
+ )
56
+
57
+
58
+ settings = load_settings()
docker-compose.yml ADDED
@@ -0,0 +1,32 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ services:
2
+ session-amplifier:
3
+ build:
4
+ context: .
5
+ image: session-amplifier:local
6
+ container_name: session-amplifier
7
+ restart: unless-stopped
8
+ command: ["serve"]
9
+ ports:
10
+ - "8477:8477"
11
+ environment:
12
+ OPENCLAW_AGENTS_ROOT: /openclaw/agents
13
+ OPENCLAW_STATE_DIR: /data/session-amplifier
14
+ SESSION_AMPLIFIER_DB_PATH: /data/session-amplifier/session_amplifier.sqlite
15
+ MAX_TOOLRESULT_CHARS: "2000"
16
+ SPOOLER_BATCH_SIZE: "100"
17
+ REVIEW_CONFIDENCE_THRESHOLD: "0.5"
18
+ API_PORT: "8477"
19
+ volumes:
20
+ - session_amplifier_state:/data/session-amplifier
21
+ # Mount your OpenClaw home read-only so session and skill data are visible.
22
+ - ${OPENCLAW_HOME:-~/.openclaw}:/openclaw:ro
23
+ networks:
24
+ - librechat_default
25
+
26
+ networks:
27
+ librechat_default:
28
+ external: true
29
+ name: librechat_default
30
+
31
+ volumes:
32
+ session_amplifier_state:
main.py ADDED
@@ -0,0 +1,196 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Session Amplifier — OpenClaw sidecar
3
+ Lightweight transcript spooler + review worker.
4
+ """
5
+ import argparse
6
+ import os
7
+ import threading
8
+ import time
9
+ from pathlib import Path
10
+
11
+ from config import settings
12
+
13
+
14
+ def init_db():
15
+ """Create SQLite schema if not present."""
16
+ import sqlite3
17
+
18
+ conn = sqlite3.connect(settings.db_path)
19
+ conn.executescript("""
20
+ CREATE TABLE IF NOT EXISTS spooled_entries (
21
+ id INTEGER PRIMARY KEY,
22
+ session_id TEXT NOT NULL,
23
+ agent_id TEXT NOT NULL,
24
+ entry_idx INTEGER NOT NULL,
25
+ entry_type TEXT,
26
+ role TEXT,
27
+ timestamp TEXT,
28
+ tool_name TEXT,
29
+ clean_text TEXT,
30
+ original_length INTEGER,
31
+ preview TEXT,
32
+ is_error INTEGER DEFAULT 0,
33
+ indexed_at TEXT DEFAULT (datetime('now')),
34
+ UNIQUE(session_id, entry_idx)
35
+ );
36
+
37
+ CREATE TABLE IF NOT EXISTS spool_state (
38
+ key TEXT PRIMARY KEY,
39
+ value TEXT
40
+ );
41
+
42
+ CREATE TABLE IF NOT EXISTS review_reports (
43
+ id INTEGER PRIMARY KEY,
44
+ review_id TEXT UNIQUE,
45
+ generated_at TEXT,
46
+ period_from TEXT,
47
+ period_to TEXT,
48
+ report_json TEXT
49
+ );
50
+
51
+ CREATE TABLE IF NOT EXISTS scheduler_jobs (
52
+ job_name TEXT PRIMARY KEY,
53
+ last_run_at TEXT,
54
+ locked_until TEXT,
55
+ locked_by TEXT,
56
+ status TEXT
57
+ );
58
+ """)
59
+ conn.commit()
60
+ conn.close()
61
+ print(f"[init] DB ready at {settings.db_path}")
62
+
63
+
64
+ def run_server():
65
+ import uvicorn
66
+ from api.routes import app
67
+
68
+ uvicorn.run(app, host="0.0.0.0", port=settings.api_port)
69
+
70
+
71
+ def run_spooler():
72
+ from spooler.processor import run_spool
73
+
74
+ count, sessions = run_spool()
75
+ print(f"[spool] entries={count} sessions={sessions}")
76
+
77
+
78
+ def run_reviewer():
79
+ from reviewer.report import generate_report
80
+
81
+ report = generate_report()
82
+ print(f"[review] id={report.get('review_id','?')} sessions={report.get('sessions_reviewed',0)}")
83
+
84
+
85
+ def run_watcher(poll_interval: int = 5, review_every: int = 0):
86
+ """Poll transcript files and run spooler on changes.
87
+
88
+ Args:
89
+ poll_interval: seconds between filesystem polls.
90
+ review_every: if > 0, run reviewer every N spool runs. 0 = disabled.
91
+ """
92
+ from spooler.processor import run_spool
93
+
94
+ print(f"[watch] watching {settings.openclaw_agents_root} every {poll_interval}s"
95
+ f" (review_every={review_every})")
96
+ seen: dict[str, tuple[int, int]] = {}
97
+ spool_run_count = 0
98
+
99
+ while True:
100
+ changed = False
101
+ for transcript_file in settings.openclaw_agents_root.glob("*/sessions/*.jsonl"):
102
+ try:
103
+ # Skip excluded glob patterns
104
+ import fnmatch
105
+ fname = transcript_file.name
106
+ if any(fnmatch.fnmatch(fname, pat) for pat in settings.session_glob_exclude):
107
+ continue
108
+ # Also keep hard exclude for trajectory files
109
+ if ".trajectory.jsonl" in fname:
110
+ continue
111
+ parts = transcript_file.relative_to(settings.openclaw_agents_root).parts
112
+ if len(parts) >= 2 and settings.agents_allowlist:
113
+ agent_part = parts[0] # agent id is first segment after root
114
+ if agent_part not in settings.agents_allowlist:
115
+ continue
116
+ stat = transcript_file.stat()
117
+ fingerprint = (int(stat.st_mtime), stat.st_size)
118
+ except (FileNotFoundError, ValueError):
119
+ continue
120
+ key = str(transcript_file)
121
+ if seen.get(key) != fingerprint:
122
+ seen[key] = fingerprint
123
+ changed = True
124
+
125
+ if changed:
126
+ count, sessions = run_spool()
127
+ print(f"[watch] spool run entries={count} sessions={sessions}")
128
+ spool_run_count += 1
129
+
130
+ if review_every > 0 and spool_run_count % review_every == 0:
131
+ try:
132
+ from reviewer.report import generate_report
133
+ report = generate_report()
134
+ print(f"[watch] review run id={report.get('review_id','?')}"
135
+ f" sessions={report.get('sessions_reviewed',0)}")
136
+ except Exception as exc:
137
+ print(f"[watch] review failed: {exc}")
138
+
139
+ time.sleep(poll_interval)
140
+
141
+
142
+ def run_serve_watch(poll_interval: int = 5, review_every: int = 0):
143
+ """Start API server and run file watcher in a background thread."""
144
+ watcher_thread = threading.Thread(
145
+ target=run_watcher,
146
+ args=(poll_interval, review_every),
147
+ daemon=True,
148
+ )
149
+ watcher_thread.start()
150
+ print(f"[serve-watch] watcher thread started (interval={poll_interval}s)")
151
+ run_server()
152
+
153
+
154
+ def main():
155
+ parser = argparse.ArgumentParser(prog="session-amplifier")
156
+ sub = parser.add_subparsers(dest="command", required=True)
157
+
158
+ sub.add_parser("init", help="Initialize SQLite schema")
159
+ sub.add_parser("serve", help="Run FastAPI server")
160
+ sub.add_parser("spool", help="Run spooler once")
161
+ sub.add_parser("review", help="Run reviewer once")
162
+
163
+ watch_parser = sub.add_parser("watch", help="Poll transcript files and run spooler on changes")
164
+ watch_parser.add_argument("--interval", type=int, default=5, help="Polling interval in seconds")
165
+ watch_parser.add_argument("--review-every", type=int, default=0,
166
+ help="Run reviewer every N spool runs (0=disabled, default=0)")
167
+
168
+ sw_parser = sub.add_parser("serve-watch",
169
+ help="Run API server + file watcher in background thread")
170
+ sw_parser.add_argument("--interval", type=int, default=5, help="Polling interval in seconds")
171
+ sw_parser.add_argument("--review-every", type=int, default=0,
172
+ help="Run reviewer every N spool runs (0=disabled, default=0)")
173
+
174
+ args = parser.parse_args()
175
+
176
+ if args.command == "init":
177
+ init_db()
178
+ elif args.command == "serve":
179
+ init_db()
180
+ run_server()
181
+ elif args.command == "spool":
182
+ init_db()
183
+ run_spooler()
184
+ elif args.command == "review":
185
+ init_db()
186
+ run_reviewer()
187
+ elif args.command == "watch":
188
+ init_db()
189
+ run_watcher(args.interval, args.review_every)
190
+ elif args.command == "serve-watch":
191
+ init_db()
192
+ run_serve_watch(args.interval, args.review_every)
193
+
194
+
195
+ if __name__ == "__main__":
196
+ main()
requirements.txt ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ fastapi>=0.115.0
2
+ uvicorn[standard]>=0.30.0
3
+ pydantic>=2.0.0
reviewer/__init__.py ADDED
@@ -0,0 +1 @@
 
 
1
+ # reviewer package
reviewer/intelligence.py ADDED
@@ -0,0 +1,285 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ from collections import Counter, defaultdict
4
+ from datetime import datetime, timezone
5
+ from difflib import SequenceMatcher
6
+ from pathlib import Path
7
+ from typing import Iterable
8
+
9
+ from config import settings
10
+ from spooler.store import get_conn, get_recent_sessions, get_session_activity
11
+
12
+
13
+ def _now() -> str:
14
+ return datetime.now(timezone.utc).isoformat()
15
+
16
+
17
+ def _write_artifact(relative_path: str, payload: dict) -> None:
18
+ base = settings.openclaw_state_dir / "session_amplifier"
19
+ target = base / relative_path
20
+ target.parent.mkdir(parents=True, exist_ok=True)
21
+ import json
22
+
23
+ target.write_text(json.dumps(payload, indent=2, sort_keys=True))
24
+
25
+
26
+ def _session_transcript_files() -> dict[str, Path]:
27
+ root = settings.openclaw_agents_root
28
+ if not root.exists():
29
+ return {}
30
+ out: dict[str, Path] = {}
31
+ for agent_dir in root.iterdir():
32
+ sessions = agent_dir / "sessions"
33
+ if not sessions.exists():
34
+ continue
35
+ for path in sessions.glob("*.jsonl"):
36
+ out.setdefault(path.stem, path)
37
+ return out
38
+
39
+
40
+ def generate_session_sprawl_report(limit: int = 500, stale_days: int = 30) -> dict:
41
+ """Return non-destructive session sprawl/archive candidates."""
42
+ conn = get_conn()
43
+ rows = conn.execute(
44
+ """
45
+ SELECT session_id, agent_id,
46
+ MAX(COALESCE(timestamp, indexed_at)) AS last_event_at,
47
+ COUNT(*) AS event_count,
48
+ SUM(COALESCE(original_length, 0)) AS original_chars,
49
+ SUM(CASE WHEN role = 'toolResult' THEN 1 ELSE 0 END) AS tool_result_count,
50
+ SUM(CASE WHEN is_error = 1 THEN 1 ELSE 0 END) AS error_count,
51
+ MAX(entry_idx) AS last_entry_idx
52
+ FROM spooled_entries
53
+ GROUP BY session_id, agent_id
54
+ ORDER BY event_count DESC
55
+ LIMIT ?
56
+ """,
57
+ (limit,),
58
+ ).fetchall()
59
+ conn.close()
60
+
61
+ files = _session_transcript_files()
62
+ candidates = []
63
+ now_ts = datetime.now(timezone.utc).timestamp()
64
+ stale_seconds = stale_days * 24 * 60 * 60
65
+ for row in rows:
66
+ rd = dict(row)
67
+ path = files.get(rd["session_id"])
68
+ size = path.stat().st_size if path and path.exists() else None
69
+ mtime = path.stat().st_mtime if path and path.exists() else None
70
+ reasons = []
71
+ if (rd.get("event_count") or 0) > 2000:
72
+ reasons.append("very_high_event_count")
73
+ if (rd.get("original_chars") or 0) > 1_000_000:
74
+ reasons.append("very_large_transcript_content")
75
+ if size and size > 5_000_000:
76
+ reasons.append("large_file")
77
+ if mtime and now_ts - mtime > stale_seconds:
78
+ reasons.append("stale_file")
79
+ if reasons:
80
+ candidates.append({
81
+ **rd,
82
+ "file_path": str(path) if path else None,
83
+ "file_size_bytes": size,
84
+ "file_mtime": datetime.fromtimestamp(mtime, timezone.utc).isoformat() if mtime else None,
85
+ "candidate_reasons": reasons,
86
+ "action": "review_then_archive_or_summarize",
87
+ })
88
+
89
+ report = {
90
+ "generated_at": _now(),
91
+ "policy": "non_destructive_candidates_only",
92
+ "stale_days": stale_days,
93
+ "sessions_scanned": len(rows),
94
+ "candidate_count": len(candidates),
95
+ "candidates": candidates[:200],
96
+ }
97
+ _write_artifact("reports/session-sprawl-latest.json", report)
98
+ return report
99
+
100
+
101
+ def generate_context_pressure_report(limit: int = 200) -> dict:
102
+ """Find transcripts likely to bloat context or retrieval."""
103
+ conn = get_conn()
104
+ rows = conn.execute(
105
+ """
106
+ SELECT session_id, agent_id,
107
+ COUNT(*) AS event_count,
108
+ SUM(COALESCE(original_length, 0)) AS original_chars,
109
+ SUM(CASE WHEN role = 'toolResult' THEN COALESCE(original_length, 0) ELSE 0 END) AS tool_chars,
110
+ SUM(CASE WHEN role = 'assistant' THEN LENGTH(COALESCE(clean_text, '')) ELSE 0 END) AS assistant_chars,
111
+ SUM(CASE WHEN role = 'toolResult' AND COALESCE(original_length, 0) > 5000 THEN 1 ELSE 0 END) AS giant_tool_results,
112
+ SUM(CASE WHEN role = 'toolResult' THEN 1 ELSE 0 END) AS tool_result_count
113
+ FROM spooled_entries
114
+ GROUP BY session_id, agent_id
115
+ HAVING original_chars > 100000 OR giant_tool_results > 0 OR event_count > 1000
116
+ ORDER BY original_chars DESC
117
+ LIMIT ?
118
+ """,
119
+ (limit,),
120
+ ).fetchall()
121
+ conn.close()
122
+
123
+ sessions = []
124
+ for row in rows:
125
+ rd = dict(row)
126
+ flags = []
127
+ if (rd.get("giant_tool_results") or 0) > 0:
128
+ flags.append("giant_tool_results")
129
+ if (rd.get("event_count") or 0) > 1000:
130
+ flags.append("long_running_session")
131
+ if (rd.get("tool_chars") or 0) > max(1, (rd.get("assistant_chars") or 0)) * 3:
132
+ flags.append("tool_output_dominates")
133
+ rd["flags"] = flags
134
+ rd["recommendation"] = "summarize_before_reuse" if flags else "monitor"
135
+ sessions.append(rd)
136
+
137
+ report = {
138
+ "generated_at": _now(),
139
+ "sessions_scanned": len(rows),
140
+ "pressure_sessions": sessions,
141
+ }
142
+ _write_artifact("reports/context-pressure-latest.json", report)
143
+ return report
144
+
145
+
146
+ def generate_failure_mode_report(limit: int = 200) -> dict:
147
+ """Mine repeated operational failure modes from spooled transcript rows."""
148
+ conn = get_conn()
149
+ rows = conn.execute(
150
+ """
151
+ SELECT session_id, agent_id, role, tool_name, clean_text, preview, is_error, timestamp, indexed_at
152
+ FROM spooled_entries
153
+ WHERE is_error = 1
154
+ OR LOWER(COALESCE(clean_text, '')) LIKE '%permission%'
155
+ OR LOWER(COALESCE(clean_text, '')) LIKE '%approve%'
156
+ OR LOWER(COALESCE(clean_text, '')) LIKE '%timeout%'
157
+ OR LOWER(COALESCE(clean_text, '')) LIKE '%failover%'
158
+ OR LOWER(COALESCE(clean_text, '')) LIKE '%fallback%'
159
+ OR LOWER(COALESCE(clean_text, '')) LIKE '%no session found%'
160
+ OR LOWER(COALESCE(clean_text, '')) LIKE '%context limit%'
161
+ ORDER BY COALESCE(timestamp, indexed_at) DESC
162
+ LIMIT 5000
163
+ """
164
+ ).fetchall()
165
+ conn.close()
166
+
167
+ buckets: dict[str, list[dict]] = defaultdict(list)
168
+ tool_errors: Counter[str] = Counter()
169
+ for r in rows:
170
+ rd = dict(r)
171
+ text = (rd.get("clean_text") or rd.get("preview") or "").lower()
172
+ key = None
173
+ if rd.get("is_error"):
174
+ tool = rd.get("tool_name") or "unknown_tool"
175
+ key = f"tool_error:{tool}"
176
+ tool_errors[tool] += 1
177
+ else:
178
+ key = _classify_failure_text(text)
179
+ if key:
180
+ buckets[key].append({
181
+ "session_id": rd.get("session_id"),
182
+ "agent_id": rd.get("agent_id"),
183
+ "tool_name": rd.get("tool_name"),
184
+ "timestamp": rd.get("timestamp") or rd.get("indexed_at"),
185
+ "preview": (rd.get("preview") or rd.get("clean_text") or "")[:240],
186
+ })
187
+
188
+ patterns = []
189
+ for key, hits in sorted(buckets.items(), key=lambda kv: len(kv[1]), reverse=True):
190
+ patterns.append({
191
+ "pattern": key,
192
+ "count": len(hits),
193
+ "sessions": sorted({h["session_id"] for h in hits if h.get("session_id")})[:20],
194
+ "examples": hits[:10],
195
+ "recommendation": _failure_recommendation(key),
196
+ })
197
+
198
+ report = {
199
+ "generated_at": _now(),
200
+ "patterns": patterns[:limit],
201
+ "top_error_tools": [{"tool": tool, "count": count} for tool, count in tool_errors.most_common(25)],
202
+ }
203
+ _write_artifact("reports/failure-modes-latest.json", report)
204
+ return report
205
+
206
+
207
+ def _failure_recommendation(pattern: str) -> str:
208
+ if pattern.startswith("tool_error:"):
209
+ return "inspect repeated tool failures and add guardrails or repair wrapper"
210
+ if pattern == "model_failover_or_fallback":
211
+ return "audit model routing/fallback logs and expose failover in user-visible status"
212
+ if pattern == "approval_or_permission_stall":
213
+ return "improve approval prompts and stale approval recovery"
214
+ if pattern == "timeout":
215
+ return "identify timeout source and add bounded wait/retry or progress heartbeat"
216
+ if pattern == "stale_session_reference":
217
+ return "repair session lifecycle references and stale session cleanup"
218
+ if pattern == "context_limit":
219
+ return "summarize/archive before continuing session"
220
+ return "review clustered examples"
221
+
222
+
223
+ def _classify_failure_text(text: str) -> str | None:
224
+ """Classify failure text while avoiding overly broad buckets.
225
+
226
+ The first intelligence pass bucketed any mention of "permission" or
227
+ "approve" as an approval stall. That was useful for discovery but too noisy
228
+ for ongoing degradation detection. Keep the public pattern names stable, but
229
+ require stronger textual evidence.
230
+ """
231
+ lowered = text.lower()
232
+ if "failover" in lowered or "fallback" in lowered:
233
+ return "model_failover_or_fallback"
234
+ if any(phrase in lowered for phrase in (
235
+ "approval pending",
236
+ "approval required",
237
+ "approve this",
238
+ "permission denied",
239
+ "requires permission",
240
+ "insufficient permission",
241
+ "not permitted",
242
+ )):
243
+ return "approval_or_permission_stall"
244
+ if any(phrase in lowered for phrase in (
245
+ "timed out",
246
+ "timeout",
247
+ "deadline exceeded",
248
+ "context deadline",
249
+ )):
250
+ return "timeout"
251
+ if "no session found" in lowered or "unknown session" in lowered:
252
+ return "stale_session_reference"
253
+ if "context limit" in lowered or "context length" in lowered:
254
+ return "context_limit"
255
+ return None
256
+
257
+
258
+ def generate_active_sessions_bulk(limit: int = 40, activity_limit: int = 200) -> dict:
259
+ """Bulk endpoint for visual clients: recent sessions plus normalized activity."""
260
+ sessions = get_recent_sessions(limit)
261
+ activity = {
262
+ row["session_id"]: get_session_activity(row["session_id"], activity_limit)
263
+ for row in sessions
264
+ }
265
+ return {
266
+ "generated_at": _now(),
267
+ "sessions": sessions,
268
+ "activity": activity,
269
+ }
270
+
271
+
272
+ def generate_intelligence_bundle(kind: str = "light") -> dict:
273
+ """Run the deterministic intelligence suite and write a compact bundle."""
274
+ sprawl_limit = 1000 if kind == "deep" else 300
275
+ context_limit = 500 if kind == "deep" else 150
276
+ failure_limit = 500 if kind == "deep" else 150
277
+ bundle = {
278
+ "generated_at": _now(),
279
+ "kind": kind,
280
+ "session_sprawl": generate_session_sprawl_report(limit=sprawl_limit),
281
+ "context_pressure": generate_context_pressure_report(limit=context_limit),
282
+ "failure_modes": generate_failure_mode_report(limit=failure_limit),
283
+ }
284
+ _write_artifact(f"reports/intelligence-{kind}-latest.json", bundle)
285
+ return bundle
reviewer/pattern_detector.py ADDED
@@ -0,0 +1,71 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from collections import Counter, defaultdict
2
+
3
+
4
+ def detect_failure_patterns(session_scores: list[dict], spooled_entries: list[dict]) -> list[dict]:
5
+ """
6
+ Scan session data for recurring failure patterns.
7
+ Returns a list of pattern objects with count + affected sessions.
8
+ """
9
+ patterns = []
10
+
11
+ tool_empty_count = defaultdict(list)
12
+ tool_error_count = defaultdict(list)
13
+ for entry in spooled_entries:
14
+ if entry.get("role") != "toolResult":
15
+ continue
16
+ tool_name = entry.get("tool_name", "unknown") or "unknown"
17
+ clean_text = entry.get("clean_text", "").strip()
18
+ if clean_text == "":
19
+ tool_empty_count[tool_name].append(entry["session_id"])
20
+ if entry.get("is_error"):
21
+ tool_error_count[tool_name].append(entry["session_id"])
22
+
23
+ for tool, sessions in tool_empty_count.items():
24
+ uniq = sorted(set(sessions))
25
+ if len(uniq) >= 3:
26
+ patterns.append({
27
+ "pattern": f"empty_tool_result:{tool}",
28
+ "count": len(uniq),
29
+ "sessions": uniq[:5],
30
+ "description": f"Tool '{tool}' returned empty output across {len(uniq)} sessions",
31
+ })
32
+
33
+ for tool, sessions in tool_error_count.items():
34
+ uniq = sorted(set(sessions))
35
+ if len(uniq) >= 3:
36
+ patterns.append({
37
+ "pattern": f"tool_errors:{tool}",
38
+ "count": len(uniq),
39
+ "sessions": uniq[:5],
40
+ "description": f"Tool '{tool}' showed error-like output across {len(uniq)} sessions",
41
+ })
42
+
43
+ flag_counts = Counter()
44
+ flag_sessions = defaultdict(list)
45
+ for sess in session_scores:
46
+ for flag in sess.get("flags", []):
47
+ flag_counts[flag] += 1
48
+ flag_sessions[flag].append(sess["session_id"])
49
+
50
+ for flag, count in flag_counts.items():
51
+ if count >= 5:
52
+ patterns.append({
53
+ "pattern": f"session_flag:{flag}",
54
+ "count": count,
55
+ "sessions": sorted(set(flag_sessions[flag]))[:5],
56
+ "description": f"Flag '{flag}' appeared in {count} sessions",
57
+ })
58
+
59
+ multi_flag_sessions = [
60
+ sess for sess in session_scores
61
+ if len(sess.get("flags", [])) >= 2
62
+ ]
63
+ if multi_flag_sessions:
64
+ patterns.append({
65
+ "pattern": "session_multiple_flags",
66
+ "count": len(multi_flag_sessions),
67
+ "sessions": [sess["session_id"] for sess in multi_flag_sessions[:5]],
68
+ "description": f"{len(multi_flag_sessions)} sessions triggered multiple review flags",
69
+ })
70
+
71
+ return patterns
reviewer/report.py ADDED
@@ -0,0 +1,133 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ import uuid
3
+ from datetime import datetime, timezone
4
+ from pathlib import Path
5
+ from config import settings
6
+ from reviewer.scorer import score_session, get_all_spooled_entries
7
+ from reviewer.pattern_detector import detect_failure_patterns
8
+ from reviewer.skill_analyzer import analyze_skill_coverage
9
+
10
+
11
+ def _group_by_session(rows: list[dict]) -> dict[str, list[dict]]:
12
+ grouped = {}
13
+ for row in rows:
14
+ grouped.setdefault(row["session_id"], []).append(row)
15
+ return grouped
16
+
17
+
18
+ def generate_report(since: str | None = None) -> dict:
19
+ from spooler.store import get_conn
20
+
21
+ conn = get_conn()
22
+ rows = get_all_spooled_entries(conn, since)
23
+ conn.close()
24
+
25
+ # Group and score
26
+ grouped = _group_by_session(rows)
27
+ session_scores = []
28
+ for session_id, sess_rows in grouped.items():
29
+ agent_id = sess_rows[0].get("agent_id", "unknown")
30
+ scored = score_session(sess_rows)
31
+ scored["session_id"] = session_id
32
+ scored["agent_id"] = agent_id
33
+ session_scores.append(scored)
34
+
35
+ # Detect patterns
36
+ patterns = detect_failure_patterns(session_scores, rows)
37
+
38
+ # Skill coverage
39
+ skill_coverage = analyze_skill_coverage()
40
+
41
+ # Optimization recommendations (stub — deterministic only)
42
+ recommendations = []
43
+ for mcp in skill_coverage.get("mcps_missing_skill_surface", []):
44
+ recommendations.append({
45
+ "type": "skill_surface_missing",
46
+ "description": f"MCP '{mcp}' is registered but has no skill surface",
47
+ "target": mcp,
48
+ "confidence": 0.8,
49
+ })
50
+
51
+ # Unused skills (skills dir exists, no tool_name match in spool)
52
+ skill_names = set(skill_coverage.get("mcps_with_skill_surface", []))
53
+ used_tools = {r.get("tool_name") for r in rows if r.get("tool_name")}
54
+ unused_skills = [s for s in skill_names if s not in used_tools and s not in skill_coverage.get("mcps_missing_skill_surface", [])]
55
+ for skill in unused_skills:
56
+ recommendations.append({
57
+ "type": "skill_never_used",
58
+ "description": f"Skill '{skill}' exists but was not used in this period",
59
+ "target": skill,
60
+ "confidence": 0.7,
61
+ })
62
+
63
+ # Filter by confidence threshold
64
+ recommendations = [r for r in recommendations if r["confidence"] >= settings.review_confidence_threshold]
65
+
66
+ # Drift signals (stub — compare to previous report if available)
67
+ drift_signals = []
68
+
69
+ report = {
70
+ "review_id": str(uuid.uuid4()),
71
+ "generated_at": datetime.now(timezone.utc).isoformat(),
72
+ "period": {
73
+ "from": since or "all",
74
+ "to": datetime.now(timezone.utc).isoformat(),
75
+ },
76
+ "sessions_reviewed": len(session_scores),
77
+ "session_quality_scores": session_scores,
78
+ "failure_patterns": patterns,
79
+ "unused_tools": [
80
+ {"tool": mcp, "registered": True, "used_this_week": False}
81
+ for mcp in skill_coverage.get("mcps_missing_skill_surface", [])
82
+ ],
83
+ "optimization_recommendations": recommendations,
84
+ "drift_signals": drift_signals,
85
+ "skill_coverage": skill_coverage,
86
+ }
87
+
88
+ # Persist to DB
89
+ _persist_report(report)
90
+
91
+ return report
92
+
93
+
94
+ def generate_skills_report() -> dict:
95
+ coverage = analyze_skill_coverage()
96
+ report = {
97
+ "generated_at": datetime.now(timezone.utc).isoformat(),
98
+ "period": "weekly",
99
+ **coverage,
100
+ }
101
+ _write_json_artifact("skills-latest.json", report)
102
+ return report
103
+
104
+
105
+ def _persist_report(report: dict):
106
+ from spooler.store import get_conn
107
+
108
+ conn = get_conn()
109
+ conn.execute(
110
+ """
111
+ INSERT OR REPLACE INTO review_reports
112
+ (review_id, generated_at, period_from, period_to, report_json)
113
+ VALUES (?, ?, ?, ?, ?)
114
+ """,
115
+ (
116
+ report["review_id"],
117
+ report["generated_at"],
118
+ report["period"]["from"],
119
+ report["period"]["to"],
120
+ json.dumps(report),
121
+ ),
122
+ )
123
+ conn.commit()
124
+ conn.close()
125
+ _write_json_artifact("review-latest.json", report)
126
+ _write_json_artifact(f"history/review-{report['generated_at'].replace(':', '').replace('+00:00', 'Z')}.json", report)
127
+
128
+
129
+ def _write_json_artifact(relative_path: str, payload: dict):
130
+ base = settings.openclaw_state_dir / "session_amplifier"
131
+ target = base / relative_path
132
+ target.parent.mkdir(parents=True, exist_ok=True)
133
+ target.write_text(json.dumps(payload, indent=2, sort_keys=True))
reviewer/scorer.py ADDED
@@ -0,0 +1,93 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import sqlite3
2
+ from config import settings
3
+
4
+
5
+ META_MARKERS = (
6
+ "using the",
7
+ "i'll use",
8
+ "i will use",
9
+ "tool call",
10
+ "internal process",
11
+ "routing this",
12
+ )
13
+
14
+
15
+ def score_session(rows: list[dict]) -> dict:
16
+ """
17
+ Compute a quality score 0.0-1.0 for a session based on spooled entries.
18
+ Flags are list of string tags.
19
+ """
20
+ if not rows:
21
+ return {"score": 0.0, "flags": ["no_data"]}
22
+
23
+ flags = []
24
+ tool_results = [r for r in rows if r.get("role") == "toolResult"]
25
+ assistant_msgs = [r for r in rows if r.get("role") == "assistant"]
26
+ user_msgs = [r for r in rows if r.get("role") == "user"]
27
+
28
+ score = 1.0
29
+
30
+ long_results = [
31
+ r for r in tool_results if r.get("original_length", 0) > settings.max_toolresult_chars * 2
32
+ ]
33
+ if long_results:
34
+ flags.append("excessively_long_tool_output")
35
+ score -= 0.1 * min(1.0, len(long_results) / max(1, len(tool_results)))
36
+
37
+ if assistant_msgs and len(tool_results) > len(assistant_msgs) * 5:
38
+ flags.append("high_tool_call_ratio")
39
+ score -= 0.05
40
+
41
+ empty_results = [r for r in tool_results if not r.get("clean_text") or len(r.get("clean_text", "")) < 5]
42
+ if empty_results:
43
+ flags.append("empty_tool_results")
44
+ score -= 0.05
45
+
46
+ if len(rows) < 3 and not tool_results:
47
+ flags.append("minimal_session")
48
+
49
+ assistant_text_total = sum(len(r.get("clean_text", "")) for r in assistant_msgs)
50
+ tool_text_total = sum(len(r.get("clean_text", "")) for r in tool_results)
51
+ if assistant_text_total and tool_text_total > assistant_text_total * 3:
52
+ flags.append("tool_output_dominates_context")
53
+ score -= 0.08
54
+
55
+ meta_msgs = [
56
+ r for r in assistant_msgs
57
+ if any(marker in r.get("clean_text", "").lower() for marker in META_MARKERS)
58
+ ]
59
+ if meta_msgs:
60
+ flags.append("meta_process_narration")
61
+ score -= 0.05
62
+
63
+ fts_empty_queries = [
64
+ r for r in tool_results
65
+ if r.get("tool_name", "") == "session_search"
66
+ and ("0 results" in r.get("clean_text", "").lower() or "no results found" in r.get("clean_text", "").lower())
67
+ ]
68
+ if len(fts_empty_queries) > 2:
69
+ flags.append("fts_query_noise")
70
+ score -= 0.15
71
+
72
+ score = max(0.0, min(1.0, score))
73
+
74
+ return {
75
+ "score": round(score, 3),
76
+ "flags": sorted(set(flags)),
77
+ "tool_result_count": len(tool_results),
78
+ "assistant_msg_count": len(assistant_msgs),
79
+ "user_msg_count": len(user_msgs),
80
+ "assistant_text_total": assistant_text_total,
81
+ "tool_text_total": tool_text_total,
82
+ }
83
+
84
+
85
+ def get_all_spooled_entries(conn: sqlite3.Connection, since: str | None = None) -> list[dict]:
86
+ if since:
87
+ rows = conn.execute(
88
+ "SELECT * FROM spooled_entries WHERE indexed_at > ? ORDER BY indexed_at",
89
+ (since,),
90
+ ).fetchall()
91
+ else:
92
+ rows = conn.execute("SELECT * FROM spooled_entries ORDER BY indexed_at").fetchall()
93
+ return [dict(r) for r in rows]
reviewer/session_snapshot.py ADDED
@@ -0,0 +1,178 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import logging
4
+ from datetime import datetime, timezone
5
+ from typing import Iterable
6
+
7
+ from spooler.store import get_recent_sessions, get_session_activity
8
+
9
+
10
+ SNAPSHOT_SCHEMA = "openclaw.session.v1"
11
+ log = logging.getLogger("session_snapshot")
12
+
13
+
14
+ def _now() -> str:
15
+ return datetime.now(timezone.utc).isoformat()
16
+
17
+
18
+ def _parse_ts(value: str | None) -> datetime | None:
19
+ if not value:
20
+ return None
21
+ try:
22
+ return datetime.fromisoformat(value.replace("Z", "+00:00"))
23
+ except (TypeError, ValueError):
24
+ log.debug("invalid session timestamp", extra={"timestamp_value": value})
25
+ return None
26
+
27
+
28
+ def _age_seconds(value: str | None) -> int | None:
29
+ ts = _parse_ts(value)
30
+ if not ts:
31
+ return None
32
+ if ts.tzinfo is None:
33
+ ts = ts.replace(tzinfo=timezone.utc)
34
+ return max(0, int((datetime.now(timezone.utc) - ts).total_seconds()))
35
+
36
+
37
+ def _classify_activity(row: dict) -> dict:
38
+ role = row.get("role") or ""
39
+ tool_name = row.get("tool_name") or ""
40
+ clean_text = row.get("clean_text") or ""
41
+ preview = row.get("preview") or clean_text[:300]
42
+ is_error = bool(row.get("is_error"))
43
+
44
+ if role == "toolResult":
45
+ event_type = "tool_error" if is_error else "tool_result"
46
+ summary = f"✗ {tool_name or 'tool'}" if is_error else f"✓ {tool_name or 'tool'}"
47
+ elif role == "assistant":
48
+ event_type = "assistant_text"
49
+ summary = preview[:140] or "assistant"
50
+ elif role == "user":
51
+ event_type = "user_message"
52
+ summary = preview[:140] or "user"
53
+ elif role in ("system", "developer"):
54
+ event_type = role
55
+ summary = preview[:120] or role
56
+ else:
57
+ event_type = "event"
58
+ summary = preview[:120] or role or "event"
59
+
60
+ return {
61
+ "timestamp": row.get("timestamp") or row.get("indexed_at"),
62
+ "entry_idx": row.get("entry_idx"),
63
+ "event_type": event_type,
64
+ "role": role,
65
+ "tool_name": tool_name,
66
+ "summary": summary,
67
+ "is_error": is_error,
68
+ }
69
+
70
+
71
+ def _risk_flags(activity: Iterable[dict]) -> list[str]:
72
+ flags: set[str] = set()
73
+ for row in activity:
74
+ text = f"{row.get('clean_text') or ''}\n{row.get('preview') or ''}".lower()
75
+ if any(token in text for token in ("approve", "approval", "permission")):
76
+ flags.add("approval_or_permission")
77
+ if any(token in text for token in ("delete", "rm -rf", "drop table", "destroy", "rollback")):
78
+ flags.add("destructive_or_rollback_language")
79
+ if any(token in text for token in ("public deploy", "dns", "domain", "cloud run", "gcloud")):
80
+ flags.add("external_infra")
81
+ if any(token in text for token in ("api key", "secret", "password", "token")):
82
+ flags.add("secret_sensitive")
83
+ return sorted(flags)
84
+
85
+
86
+ def _health_from_summary(summary: dict, last_event_age_seconds: int | None) -> dict:
87
+ error_count = int(summary.get("error_count") or 0)
88
+ event_count = int(summary.get("event_count") or 0)
89
+ noisy = int(summary.get("noisy_tool_results") or 0)
90
+ state = "active"
91
+ reasons: list[str] = []
92
+
93
+ if last_event_age_seconds is not None and last_event_age_seconds > 30 * 24 * 60 * 60:
94
+ state = "stale"
95
+ reasons.append("no_recent_activity_30d")
96
+ elif error_count > 0:
97
+ state = "warning"
98
+ reasons.append("errors_present")
99
+ elif noisy > 2:
100
+ state = "warning"
101
+ reasons.append("noisy_tool_outputs")
102
+ elif event_count == 0:
103
+ state = "unknown"
104
+ reasons.append("no_spooled_events")
105
+
106
+ return {
107
+ "state": state,
108
+ "reasons": reasons,
109
+ "last_event_age_seconds": last_event_age_seconds,
110
+ }
111
+
112
+
113
+ def build_session_snapshot(session_summary: dict, activity_limit: int = 80) -> dict:
114
+ """Build a canonical, backend-neutral OpenClaw session snapshot.
115
+
116
+ This is intentionally sidecar-local and read-only. It gives dashboards and
117
+ reviewers one stable shape without requiring gateway core changes.
118
+ """
119
+ session_id = session_summary.get("session_id")
120
+ activity = get_session_activity(session_id, activity_limit) if session_id else []
121
+ recent_events = [_classify_activity(row) for row in activity]
122
+ last_event_at = session_summary.get("last_event_at")
123
+ age = _age_seconds(last_event_at)
124
+ tool_result_count = int(session_summary.get("tool_result_count") or 0)
125
+ event_count = int(session_summary.get("event_count") or 0)
126
+ error_count = int(session_summary.get("error_count") or 0)
127
+ tool_ratio = round(tool_result_count / event_count, 3) if event_count else 0.0
128
+
129
+ pressure_flags = []
130
+ if event_count > 1000:
131
+ pressure_flags.append("long_running_session")
132
+ if tool_ratio > 0.7:
133
+ pressure_flags.append("tool_heavy")
134
+ if int(session_summary.get("noisy_tool_results") or 0) > 2:
135
+ pressure_flags.append("noisy_tool_outputs")
136
+
137
+ return {
138
+ "schema": SNAPSHOT_SCHEMA,
139
+ "generated_at": _now(),
140
+ "id": session_id,
141
+ "kind": "transcript_session",
142
+ "owner": {"agent_id": session_summary.get("agent_id")},
143
+ "state": _health_from_summary(session_summary, age),
144
+ "runtime": {
145
+ "adapter": "session-amplifier-spooler",
146
+ "source": "spooled_transcript",
147
+ },
148
+ "health": {
149
+ "event_count": event_count,
150
+ "tool_result_count": tool_result_count,
151
+ "error_count": error_count,
152
+ "tool_ratio": tool_ratio,
153
+ "pressure_flags": pressure_flags,
154
+ "hints": session_summary.get("hints") or [],
155
+ },
156
+ "risk": {
157
+ "flags": _risk_flags(activity),
158
+ "policy": "signal_only_non_destructive",
159
+ },
160
+ "outputs": {
161
+ "last_event_at": last_event_at,
162
+ "last_entry_idx": session_summary.get("last_entry_idx"),
163
+ "recent_events": recent_events,
164
+ },
165
+ "rollback": {
166
+ "note": "Snapshot generation is read-only; remove session_snapshot.py and route imports to roll back this sidecar feature.",
167
+ },
168
+ }
169
+
170
+
171
+ def build_recent_session_snapshots(limit: int = 40, activity_limit: int = 80) -> dict:
172
+ sessions = get_recent_sessions(limit)
173
+ return {
174
+ "schema": f"{SNAPSHOT_SCHEMA}.collection",
175
+ "generated_at": _now(),
176
+ "count": len(sessions),
177
+ "snapshots": [build_session_snapshot(row, activity_limit=activity_limit) for row in sessions],
178
+ }
reviewer/skill_analyzer.py ADDED
@@ -0,0 +1,228 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ import logging
3
+ import os
4
+ import re
5
+ from pathlib import Path
6
+ from config import settings
7
+
8
+ log = logging.getLogger("skill_analyzer")
9
+
10
+ SKILLS_DIR = Path(os.environ.get("OPENCLAW_SKILLS_DIR", "/home/node/.openclaw/skills")).expanduser()
11
+ MCPORTER_CONFIG = Path("/home/node/.openclaw/workspace/_shared/mcporter/config/mcporter.json")
12
+ OPENCLAW_CONFIG = Path(os.environ.get("OPENCLAW_CONFIG", "/home/node/.openclaw/openclaw.json")).expanduser()
13
+ STOP_WORDS = {
14
+ "the", "and", "for", "with", "from", "that", "this", "when", "into", "your", "you",
15
+ "are", "can", "use", "using", "skill", "skills", "agent", "agents", "tool", "tools",
16
+ "task", "tasks", "workflow", "workflows", "process", "openclaw",
17
+ "before", "after", "existing", "new", "review", "checks", "first", "instead", "only",
18
+ "improve", "simple", "appropriate", "repo", "updates", "adding", "draft", "recommend",
19
+ "requested", "evaluate",
20
+ }
21
+ REUSE_SCORE_THRESHOLD = 6
22
+
23
+
24
+ def _list_mcp_skills() -> list[str]:
25
+ """Return skill names from the skills dir."""
26
+ if not SKILLS_DIR.exists():
27
+ return []
28
+ return sorted(d.name for d in SKILLS_DIR.iterdir() if d.is_dir() and (d / "SKILL.md").exists())
29
+
30
+
31
+ def _tokens(text: str | None) -> set[str]:
32
+ if not text:
33
+ return set()
34
+ return {
35
+ tok
36
+ for tok in re.findall(r"[a-z][a-z0-9_-]{2,}", text.lower())
37
+ if tok not in STOP_WORDS
38
+ }
39
+
40
+
41
+ def _frontmatter(text: str) -> dict[str, str]:
42
+ if not text.startswith("---"):
43
+ return {}
44
+ parts = text.split("---", 2)
45
+ if len(parts) < 3:
46
+ return {}
47
+ out: dict[str, str] = {}
48
+ for line in parts[1].splitlines():
49
+ if ":" not in line or line.startswith((" ", "\t")):
50
+ continue
51
+ key, value = line.split(":", 1)
52
+ out[key.strip()] = value.strip().strip('"\'')
53
+ return out
54
+
55
+
56
+ def _skill_catalog() -> list[dict]:
57
+ if not SKILLS_DIR.exists():
58
+ return []
59
+ rows: list[dict] = []
60
+ for skill_md in sorted(SKILLS_DIR.glob("**/SKILL.md")):
61
+ if any(part in {"_archive", ".git", "node_modules", ".cache"} for part in skill_md.parts):
62
+ continue
63
+ try:
64
+ text = skill_md.read_text(encoding="utf-8", errors="replace")
65
+ except Exception:
66
+ continue
67
+ fm = _frontmatter(text)
68
+ rel = str(skill_md.parent.relative_to(SKILLS_DIR))
69
+ name = fm.get("name") or skill_md.parent.name
70
+ description = fm.get("description") or ""
71
+ body = text.split("---", 2)[2] if text.startswith("---") and len(text.split("---", 2)) > 2 else text
72
+ rows.append({
73
+ "name": name,
74
+ "path": str(skill_md),
75
+ "relativePath": rel,
76
+ "description": description[:500],
77
+ "tokens": sorted(_tokens(" ".join([name, rel, description, body[:4000]]))),
78
+ })
79
+ return rows
80
+
81
+
82
+ def find_skill_candidates(query: str, agent_id: str | None = None, limit: int = 8) -> dict:
83
+ query_tokens = _tokens(query)
84
+ matches = []
85
+ for skill in _skill_catalog():
86
+ skill_tokens = set(skill["tokens"])
87
+ overlap = sorted(query_tokens & skill_tokens)
88
+ if not overlap:
89
+ continue
90
+ score = len(overlap)
91
+ score += 3 * len(query_tokens & _tokens(skill["name"]))
92
+ score += 2 * len(query_tokens & _tokens(skill["description"]))
93
+ matches.append({
94
+ "name": skill["name"],
95
+ "path": skill["path"],
96
+ "relativePath": skill["relativePath"],
97
+ "description": skill["description"],
98
+ "score": score,
99
+ "matchedTerms": overlap[:12],
100
+ })
101
+ matches = sorted(matches, key=lambda row: (-row["score"], row["name"]))[:limit]
102
+ top = matches[0] if matches and matches[0]["score"] >= REUSE_SCORE_THRESHOLD else None
103
+ agent_filter = load_agent_skill_filter(agent_id)
104
+ if top and isinstance(agent_filter.get("skills"), list):
105
+ present = top["name"] in agent_filter["skills"] or top["relativePath"] in agent_filter["skills"]
106
+ decision = "reuse-existing-skill"
107
+ allowlist_action = "none" if present else "add-existing-skill-to-agent-allowlist"
108
+ elif top:
109
+ decision = "reuse-existing-skill"
110
+ allowlist_action = "none-unrestricted-or-unknown"
111
+ else:
112
+ decision = "draft-new-skill-for-review"
113
+ allowlist_action = "n/a"
114
+ return {
115
+ "query": query,
116
+ "agentId": agent_id,
117
+ "decision": decision,
118
+ "allowlistAction": allowlist_action,
119
+ "allowlistValue": top["name"] if top else None,
120
+ "agentSkillFilter": agent_filter,
121
+ "matches": matches,
122
+ }
123
+
124
+
125
+ def load_agent_skill_filter(agent_id: str | None) -> dict:
126
+ if not agent_id:
127
+ return {"agentId": None, "mode": "not-evaluated", "skills": None}
128
+ try:
129
+ cfg = json.loads(OPENCLAW_CONFIG.read_text(encoding="utf-8"))
130
+ except Exception as exc:
131
+ return {"agentId": agent_id, "mode": "config-unavailable", "error": str(exc), "skills": None}
132
+ defaults = (cfg.get("agents") or {}).get("defaults") or {}
133
+ for agent in (cfg.get("agents") or {}).get("list") or []:
134
+ if agent.get("id") == agent_id:
135
+ if "skills" in agent:
136
+ return {"agentId": agent_id, "mode": "explicit-agent-allowlist", "skills": agent.get("skills") or []}
137
+ if "skills" in defaults:
138
+ return {"agentId": agent_id, "mode": "inherits-default-allowlist", "skills": defaults.get("skills") or []}
139
+ return {"agentId": agent_id, "mode": "unrestricted", "skills": None}
140
+ return {"agentId": agent_id, "mode": "agent-not-found", "skills": None}
141
+
142
+
143
+ def analyze_agent_skill_filters() -> dict:
144
+ try:
145
+ cfg = json.loads(OPENCLAW_CONFIG.read_text(encoding="utf-8"))
146
+ except Exception as exc:
147
+ return {"error": str(exc), "agents": []}
148
+ defaults = (cfg.get("agents") or {}).get("defaults") or {}
149
+ rows = []
150
+ for agent in (cfg.get("agents") or {}).get("list") or []:
151
+ if "skills" in agent:
152
+ mode = "explicit-agent-allowlist"
153
+ skills = agent.get("skills") or []
154
+ elif "skills" in defaults:
155
+ mode = "inherits-default-allowlist"
156
+ skills = defaults.get("skills") or []
157
+ else:
158
+ mode = "unrestricted"
159
+ skills = None
160
+ rows.append({"agentId": agent.get("id"), "mode": mode, "skillCount": len(skills) if isinstance(skills, list) else None})
161
+ return {"defaultSkillsSet": "skills" in defaults, "agents": rows}
162
+
163
+
164
+ def _list_registered_mcps() -> list[dict]:
165
+ """Read registered MCP servers from shared mcporter config."""
166
+ if not MCPORTER_CONFIG.exists():
167
+ return []
168
+ try:
169
+ data = json.loads(MCPORTER_CONFIG.read_text())
170
+ servers = data.get("mcpServers", {})
171
+ if not isinstance(servers, dict):
172
+ return []
173
+ return [
174
+ {
175
+ "name": name,
176
+ "transport": "http" if isinstance(cfg, dict) and cfg.get("type") == "http" else "stdio",
177
+ "enabled": True,
178
+ }
179
+ for name, cfg in servers.items()
180
+ if isinstance(cfg, dict)
181
+ ]
182
+ except Exception as exc:
183
+ log.warning("Could not read mcporter config: %s", exc)
184
+ return []
185
+
186
+
187
+ def _skill_aliases(name: str) -> set[str]:
188
+ aliases = {name, name.replace("_", "-"), name.replace("-", "_")}
189
+ if name.endswith("-mcp"):
190
+ aliases.add(name[:-4])
191
+ aliases.add(name[:-4].replace("-", "_"))
192
+ if name.endswith("_mcp"):
193
+ aliases.add(name[:-4].replace("_", "-"))
194
+ aliases.add(name[:-4])
195
+ return aliases
196
+
197
+
198
+ def analyze_skill_coverage() -> dict:
199
+ """Compare registered MCPs vs skill surfaces."""
200
+ registered_mcps = _list_registered_mcps()
201
+ skill_names = set(_list_mcp_skills())
202
+
203
+ mcps_with_skill = []
204
+ mcps_missing_skill = []
205
+ for mcp in registered_mcps:
206
+ name = mcp["name"]
207
+ aliases = _skill_aliases(name)
208
+ if aliases & skill_names:
209
+ mcps_with_skill.append(name)
210
+ else:
211
+ mcps_missing_skill.append(name)
212
+
213
+ skills_without_registered_mcp = sorted(
214
+ skill for skill in skill_names
215
+ if not any(skill in _skill_aliases(mcp["name"]) for mcp in registered_mcps)
216
+ )
217
+
218
+ missing_sorted = sorted(mcps_missing_skill)
219
+ return {
220
+ "registered_mcps": registered_mcps,
221
+ "mcps_with_skill_surface": sorted(mcps_with_skill),
222
+ "mcps_missing_skill_surface": missing_sorted,
223
+ "skills_without_registered_mcp": skills_without_registered_mcp,
224
+ "skill_catalog_count": len(_skill_catalog()),
225
+ "agent_skill_filters": analyze_agent_skill_filters(),
226
+ # Deprecated alias — use mcps_missing_skill_surface instead
227
+ "mcps_missing_skill": missing_sorted,
228
+ }
spooler/__init__.py ADDED
@@ -0,0 +1 @@
 
 
1
+ # spooler package
spooler/noise_filter.py ADDED
@@ -0,0 +1,17 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from config import settings
2
+
3
+
4
+ _NOISE_PATTERNS = [p.strip() for p in settings.tool_noise_patterns.split(",")]
5
+
6
+
7
+ def is_noise(clean_text: str) -> bool:
8
+ """Return True if this tool result should be dropped entirely."""
9
+ if not clean_text:
10
+ return True
11
+ lower = clean_text.lower()
12
+ for pat in _NOISE_PATTERNS:
13
+ if pat.lower() in lower:
14
+ return True
15
+ if len(clean_text.strip()) < 2:
16
+ return True
17
+ return False
spooler/processor.py ADDED
@@ -0,0 +1,177 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ import logging
3
+ from pathlib import Path
4
+
5
+ from config import settings
6
+ from spooler.redaction import redact
7
+ from spooler.noise_filter import is_noise
8
+ from spooler.store import get_file_state, set_file_state, spool_entries
9
+
10
+ log = logging.getLogger("spooler")
11
+
12
+
13
+ MESSAGE_TYPES_TO_SPOOL = {"message"}
14
+ ROLES_TO_SPOOL = {"user", "assistant", "toolResult", "system", "developer"}
15
+ BLOCK_TYPES_TO_KEEP = {"text", "input_text", "resource", "thinking"}
16
+
17
+
18
+ def _extract_text_from_content(content) -> str:
19
+ text_parts: list[str] = []
20
+ if isinstance(content, str):
21
+ text_parts.append(content)
22
+ elif isinstance(content, list):
23
+ for block in content:
24
+ if not isinstance(block, dict):
25
+ text_parts.append(str(block))
26
+ continue
27
+ block_type = block.get("type")
28
+ if block_type in ("text", "input_text"):
29
+ text_parts.append(block.get("text", ""))
30
+ elif block_type == "thinking":
31
+ text_parts.append(block.get("thinking", ""))
32
+ elif block_type == "resource":
33
+ text_parts.append(str(block.get("resource", "")))
34
+ elif content is not None:
35
+ text_parts.append(str(content))
36
+ return "\n".join(part for part in text_parts if part)
37
+
38
+
39
+ def _extract_message_payload(entry: dict) -> tuple[str, str, str, bool]:
40
+ message = entry.get("message") or {}
41
+ role = message.get("role", "")
42
+ tool_name = message.get("toolName", "") or entry.get("toolName", "")
43
+ is_error = bool(message.get("isError") or entry.get("isError"))
44
+ raw_text = _extract_text_from_content(message.get("content", ""))
45
+ return role, tool_name, raw_text, is_error
46
+
47
+
48
+ def _process_entry(entry: dict, session_id: str, agent_id: str, entry_idx: int) -> dict | None:
49
+ """Transform a raw transcript entry into a spooled row."""
50
+ entry_type = entry.get("type", "")
51
+ if entry_type not in MESSAGE_TYPES_TO_SPOOL:
52
+ return None
53
+
54
+ role, tool_name, raw_text, is_error = _extract_message_payload(entry)
55
+ if role not in ROLES_TO_SPOOL:
56
+ return None
57
+
58
+ timestamp = entry.get("timestamp", "")
59
+ original_length = len(raw_text)
60
+ clean_text = redact(raw_text)
61
+
62
+ if len(clean_text) > settings.max_toolresult_chars:
63
+ clean_text = clean_text[: settings.max_toolresult_chars] + "\n... [truncated]"
64
+
65
+ if role == "toolResult" and is_noise(clean_text):
66
+ return None
67
+
68
+ preview = clean_text[:300]
69
+
70
+ return {
71
+ "session_id": session_id,
72
+ "agent_id": agent_id,
73
+ "entry_idx": entry_idx,
74
+ "entry_type": entry_type,
75
+ "role": role,
76
+ "timestamp": timestamp,
77
+ "tool_name": tool_name,
78
+ "clean_text": clean_text,
79
+ "original_length": original_length,
80
+ "preview": preview,
81
+ "is_error": is_error,
82
+ }
83
+
84
+
85
+ def _process_transcript_file(path: Path, agent_id: str, *, skip_before: int = -1) -> list[dict]:
86
+ """Parse a transcript JSONL and return spooled rows.
87
+
88
+ Args:
89
+ skip_before: skip entries with idx <= this value (incremental ingestion).
90
+ """
91
+ rows = []
92
+ session_id = path.stem
93
+ try:
94
+ with open(path) as f:
95
+ for idx, line in enumerate(f):
96
+ if idx <= skip_before:
97
+ continue
98
+ line = line.strip()
99
+ if not line:
100
+ continue
101
+ try:
102
+ entry = json.loads(line)
103
+ except json.JSONDecodeError:
104
+ continue
105
+ if entry.get("type") == "session" and entry.get("id"):
106
+ session_id = entry["id"]
107
+ continue
108
+ row = _process_entry(entry, session_id, agent_id, idx)
109
+ if row:
110
+ rows.append(row)
111
+ except Exception as exc:
112
+ log.warning("Failed to process %s: %s", path, exc)
113
+ return rows
114
+
115
+
116
+ def run_spool() -> tuple[int, int]:
117
+ """Walk agent session dirs, process transcript messages into SQLite."""
118
+ agents_root = settings.openclaw_agents_root
119
+ if not agents_root.exists():
120
+ log.warning("Agents root not found: %s", agents_root)
121
+ return 0, 0
122
+
123
+ all_rows = []
124
+ sessions_updated = 0
125
+
126
+ for agent_dir in agents_root.iterdir():
127
+ if not agent_dir.is_dir():
128
+ continue
129
+ agent_id = agent_dir.name
130
+ # Skip if agent not in allowlist (allowlist empty = watch all)
131
+ if settings.agents_allowlist and agent_id not in settings.agents_allowlist:
132
+ continue
133
+ sessions_dir = agent_dir / "sessions"
134
+ if not sessions_dir.exists():
135
+ continue
136
+
137
+ for transcript_file in sessions_dir.glob("*.jsonl"):
138
+ # Skip excluded glob patterns and trajectory files
139
+ import fnmatch
140
+ fname = transcript_file.name
141
+ if any(fnmatch.fnmatch(fname, pat) for pat in settings.session_glob_exclude):
142
+ continue
143
+ if ".trajectory.jsonl" in fname:
144
+ continue
145
+ try:
146
+ stat = transcript_file.stat()
147
+ current_state = {"mtime": int(stat.st_mtime), "size": stat.st_size}
148
+ except FileNotFoundError:
149
+ continue
150
+
151
+ previous_state = get_file_state(str(transcript_file))
152
+ if previous_state and previous_state.get("mtime") == current_state["mtime"] \
153
+ and previous_state.get("size") == current_state["size"]:
154
+ continue
155
+
156
+ skip_before = previous_state.get("last_entry_idx", -1) if previous_state else -1
157
+ rows = _process_transcript_file(transcript_file, agent_id, skip_before=skip_before)
158
+ max_idx = skip_before
159
+ if rows:
160
+ all_rows.extend(rows)
161
+ sessions_updated += 1
162
+ max_idx = max(r["entry_idx"] for r in rows)
163
+ set_file_state(str(transcript_file),
164
+ mtime=current_state["mtime"],
165
+ size=current_state["size"],
166
+ last_entry_idx=max(max_idx, skip_before))
167
+
168
+ if not all_rows:
169
+ return 0, 0
170
+
171
+ batch_size = settings.spooler_batch_size
172
+ inserted = 0
173
+ for i in range(0, len(all_rows), batch_size):
174
+ batch = all_rows[i : i + batch_size]
175
+ inserted += spool_entries(batch)
176
+ return inserted, sessions_updated
177
+
spooler/redaction.py ADDED
@@ -0,0 +1,40 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import re
2
+ import os
3
+ from pathlib import Path
4
+ from config import settings
5
+
6
+
7
+ # Patterns resolved from env
8
+ _REDACT_PATTERNS = settings.spooler_redact_patterns.split(",")
9
+
10
+ _REDACTORS = []
11
+
12
+
13
+ def _build_redactors():
14
+ global _REDACTORS
15
+ for pat in _REDACT_PATTERNS:
16
+ pat = pat.strip()
17
+ if pat == "api_key":
18
+ # Matches common key formats
19
+ _REDACTORS.append(
20
+ (re.compile(r"(?i)(api[_-]?key|token|secret|password)\s*[:=]\s*['\"]?[\w\-]{8,}['\"]?"), "[REDACTED]")
21
+ )
22
+ elif pat == "path":
23
+ # Redact home dirs and usernames in paths
24
+ _REDACTORS.append(
25
+ (re.compile(r"/home/[^/]+|C:\\Users\\[^\\]+"), "[REDACTED_PATH]")
26
+ )
27
+ elif pat == "base64":
28
+ # Long base64-ish strings
29
+ _REDACTORS.append(
30
+ (re.compile(r"[A-Za-z0-9+/]{60,}={0,2}"), "[BASE64_REDACTED]")
31
+ )
32
+
33
+
34
+ _build_redactors()
35
+
36
+
37
+ def redact(text: str) -> str:
38
+ for pattern, replacement in _REDACTORS:
39
+ text = pattern.sub(replacement, text)
40
+ return text
spooler/store.py ADDED
@@ -0,0 +1,204 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import sqlite3
2
+ from config import settings
3
+
4
+
5
+ def _compact_title(text: str, *, fallback: str = "") -> str:
6
+ title = " ".join((text or "").strip().split())
7
+ if not title:
8
+ return fallback
9
+ prefixes = (
10
+ "Conversation info",
11
+ "Current user request:",
12
+ "OpenClaw assembled context",
13
+ "Treat the conversation context",
14
+ )
15
+ for prefix in prefixes:
16
+ if title.startswith(prefix):
17
+ return fallback
18
+ return title[:96]
19
+
20
+
21
+ def _derive_display_title(session_id: str, activity: list[dict]) -> str:
22
+ for row in activity:
23
+ role = row.get("role") or ""
24
+ if role not in {"user", "assistant"}:
25
+ continue
26
+ title = _compact_title(row.get("clean_text") or row.get("preview") or "")
27
+ if title:
28
+ return title
29
+ return session_id[:32]
30
+
31
+
32
+ def get_conn() -> sqlite3.Connection:
33
+ conn = sqlite3.connect(settings.db_path)
34
+ conn.row_factory = sqlite3.Row
35
+ return conn
36
+
37
+
38
+ def spool_entries(rows: list[dict]):
39
+ """Bulk-insert spooled entries. Skips duplicates via UNIQUE constraint."""
40
+ if not rows:
41
+ return 0
42
+ conn = get_conn()
43
+ inserted = 0
44
+ for row in rows:
45
+ try:
46
+ cur = conn.execute(
47
+ """
48
+ INSERT OR IGNORE INTO spooled_entries
49
+ (session_id, agent_id, entry_idx, entry_type, role, timestamp,
50
+ tool_name, clean_text, original_length, preview, is_error)
51
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
52
+ """,
53
+ (
54
+ row["session_id"],
55
+ row["agent_id"],
56
+ row.get("entry_idx", 0),
57
+ row.get("entry_type"),
58
+ row.get("role"),
59
+ row.get("timestamp"),
60
+ row.get("tool_name"),
61
+ row["clean_text"],
62
+ row.get("original_length"),
63
+ row["preview"],
64
+ int(bool(row.get("is_error", False))),
65
+ ),
66
+ )
67
+ inserted += cur.rowcount > 0
68
+ except Exception:
69
+ pass
70
+ conn.commit()
71
+ conn.close()
72
+ return inserted
73
+
74
+
75
+ def get_file_state(file_path: str) -> dict | None:
76
+ conn = get_conn()
77
+ row = conn.execute(
78
+ "SELECT value FROM spool_state WHERE key = ?",
79
+ (f"file::{file_path}",),
80
+ ).fetchone()
81
+ conn.close()
82
+ if not row:
83
+ return None
84
+ value = row["value"]
85
+ try:
86
+ parts = value.split(":")
87
+ state = {"mtime": int(parts[0]), "size": int(parts[1])}
88
+ if len(parts) > 2:
89
+ state["last_entry_idx"] = int(parts[2])
90
+ return state
91
+ except Exception:
92
+ return None
93
+
94
+
95
+ def set_file_state(file_path: str, *, mtime: int, size: int, last_entry_idx: int = -1):
96
+ conn = get_conn()
97
+ conn.execute(
98
+ "INSERT OR REPLACE INTO spool_state (key, value) VALUES (?, ?)",
99
+ (f"file::{file_path}", f"{mtime}:{size}:{last_entry_idx}"),
100
+ )
101
+ conn.commit()
102
+ conn.close()
103
+
104
+
105
+ def get_recent_sessions(limit: int = 25) -> list[dict]:
106
+ conn = get_conn()
107
+ rows = conn.execute(
108
+ """
109
+ SELECT session_id, agent_id,
110
+ MAX(COALESCE(timestamp, indexed_at)) AS last_event_at,
111
+ COUNT(*) AS event_count,
112
+ SUM(CASE WHEN role = 'toolResult' THEN 1 ELSE 0 END) AS tool_result_count,
113
+ SUM(CASE WHEN is_error = 1 THEN 1 ELSE 0 END) AS error_count,
114
+ SUM(CASE WHEN role = 'toolResult' AND original_length > 5000 THEN 1 ELSE 0 END) AS noisy_tool_results,
115
+ MAX(entry_idx) AS last_entry_idx
116
+ FROM spooled_entries
117
+ GROUP BY session_id, agent_id
118
+ ORDER BY MAX(COALESCE(timestamp, indexed_at)) DESC
119
+ LIMIT ?
120
+ """,
121
+ (limit,),
122
+ ).fetchall()
123
+ conn.close()
124
+
125
+ # Attach last summary text to each session
126
+ result = []
127
+ for r in rows:
128
+ rd = dict(r)
129
+ # Add health hints
130
+ tool_count = rd.get("tool_result_count", 0) or 0
131
+ error_count = rd.get("error_count", 0) or 0
132
+ noisy = rd.get("noisy_tool_results", 0) or 0
133
+ event_count = rd.get("event_count", 0) or 0
134
+ hints = []
135
+ if error_count > 0:
136
+ hints.append(f"{error_count} error(s)")
137
+ if noisy > 2:
138
+ hints.append(f"{noisy} noisy tool outputs")
139
+ if tool_count > 0 and event_count > 0 and tool_count / event_count > 0.7:
140
+ hints.append("tool-heavy")
141
+ rd["hints"] = hints
142
+ rd["health"] = "error" if error_count > 0 else "warning" if noisy > 2 else "ok"
143
+ rd["display_title"] = _derive_display_title(rd["session_id"], get_session_activity(rd["session_id"], limit=50))
144
+ result.append(rd)
145
+ return result
146
+
147
+
148
+ def get_session_summary(session_id: str) -> dict | None:
149
+ """Return the same summary shape as get_recent_sessions for one session."""
150
+ conn = get_conn()
151
+ row = conn.execute(
152
+ """
153
+ SELECT session_id, agent_id,
154
+ MAX(COALESCE(timestamp, indexed_at)) AS last_event_at,
155
+ COUNT(*) AS event_count,
156
+ SUM(CASE WHEN role = 'toolResult' THEN 1 ELSE 0 END) AS tool_result_count,
157
+ SUM(CASE WHEN is_error = 1 THEN 1 ELSE 0 END) AS error_count,
158
+ SUM(CASE WHEN role = 'toolResult' AND original_length > 5000 THEN 1 ELSE 0 END) AS noisy_tool_results,
159
+ MAX(entry_idx) AS last_entry_idx
160
+ FROM spooled_entries
161
+ WHERE session_id = ?
162
+ GROUP BY session_id, agent_id
163
+ LIMIT 1
164
+ """,
165
+ (session_id,),
166
+ ).fetchone()
167
+ conn.close()
168
+ if not row:
169
+ return None
170
+
171
+ rd = dict(row)
172
+ tool_count = rd.get("tool_result_count", 0) or 0
173
+ error_count = rd.get("error_count", 0) or 0
174
+ noisy = rd.get("noisy_tool_results", 0) or 0
175
+ event_count = rd.get("event_count", 0) or 0
176
+ hints = []
177
+ if error_count > 0:
178
+ hints.append(f"{error_count} error(s)")
179
+ if noisy > 2:
180
+ hints.append(f"{noisy} noisy tool outputs")
181
+ if tool_count > 0 and event_count > 0 and tool_count / event_count > 0.7:
182
+ hints.append("tool-heavy")
183
+ rd["hints"] = hints
184
+ rd["health"] = "error" if error_count > 0 else "warning" if noisy > 2 else "ok"
185
+ rd["display_title"] = _derive_display_title(rd["session_id"], get_session_activity(rd["session_id"], limit=50))
186
+ return rd
187
+
188
+
189
+
190
+ def get_session_activity(session_id: str, limit: int = 200) -> list[dict]:
191
+ conn = get_conn()
192
+ rows = conn.execute(
193
+ """
194
+ SELECT session_id, agent_id, entry_idx, role, entry_type, timestamp,
195
+ tool_name, clean_text, preview, original_length, is_error, indexed_at
196
+ FROM spooled_entries
197
+ WHERE session_id = ?
198
+ ORDER BY entry_idx DESC
199
+ LIMIT ?
200
+ """,
201
+ (session_id, limit),
202
+ ).fetchall()
203
+ conn.close()
204
+ return [dict(r) for r in reversed(rows)]
tests/__init__.py ADDED
@@ -0,0 +1 @@
 
 
1
+ # tests package
tests/conftest.py ADDED
@@ -0,0 +1,163 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Shared test fixtures for Session Amplifier tests."""
2
+ import json
3
+ import sqlite3
4
+ import sys
5
+ import os
6
+ import pytest
7
+
8
+ # Add project root to path so imports work as they do in production
9
+ sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
10
+
11
+ # Override settings before importing anything that uses them
12
+ os.environ.setdefault("OPENCLAW_AGENTS_ROOT", os.path.join(os.path.dirname(__file__), "_fake_agents"))
13
+ os.environ.setdefault("OPENCLAW_STATE_DIR", os.path.join(os.path.dirname(__file__), "_fake_state"))
14
+ os.environ.setdefault("SESSION_AMPLIFIER_DB_PATH", ":memory:")
15
+
16
+
17
+ @pytest.fixture
18
+ def in_memory_db():
19
+ """Create an in-memory SQLite DB with the session amplifier schema."""
20
+ conn = sqlite3.connect(":memory:")
21
+ conn.row_factory = sqlite3.Row
22
+ conn.executescript("""
23
+ CREATE TABLE IF NOT EXISTS spooled_entries (
24
+ id INTEGER PRIMARY KEY,
25
+ session_id TEXT NOT NULL,
26
+ agent_id TEXT NOT NULL,
27
+ entry_idx INTEGER NOT NULL,
28
+ entry_type TEXT,
29
+ role TEXT,
30
+ timestamp TEXT,
31
+ tool_name TEXT,
32
+ clean_text TEXT,
33
+ original_length INTEGER,
34
+ preview TEXT,
35
+ is_error INTEGER DEFAULT 0,
36
+ indexed_at TEXT DEFAULT (datetime('now')),
37
+ UNIQUE(session_id, entry_idx)
38
+ );
39
+ CREATE TABLE IF NOT EXISTS spool_state (
40
+ key TEXT PRIMARY KEY,
41
+ value TEXT
42
+ );
43
+ CREATE TABLE IF NOT EXISTS review_reports (
44
+ id INTEGER PRIMARY KEY,
45
+ review_id TEXT UNIQUE,
46
+ generated_at TEXT,
47
+ period_from TEXT,
48
+ period_to TEXT,
49
+ report_json TEXT
50
+ );
51
+ """)
52
+ yield conn
53
+ conn.close()
54
+
55
+
56
+ @pytest.fixture
57
+ def sample_transcript_lines():
58
+ """Sample JSONL transcript lines for testing."""
59
+ return [
60
+ json.dumps({"type": "session", "id": "test-session-001"}),
61
+ json.dumps({
62
+ "type": "message",
63
+ "timestamp": "2026-04-05T10:00:00Z",
64
+ "message": {
65
+ "role": "user",
66
+ "content": [{"type": "text", "text": "Hello, can you help me?"}],
67
+ },
68
+ }),
69
+ json.dumps({
70
+ "type": "message",
71
+ "timestamp": "2026-04-05T10:00:05Z",
72
+ "message": {
73
+ "role": "assistant",
74
+ "content": [{"type": "text", "text": "Of course! Let me look into that for you."}],
75
+ },
76
+ }),
77
+ json.dumps({
78
+ "type": "message",
79
+ "timestamp": "2026-04-05T10:00:10Z",
80
+ "message": {
81
+ "role": "toolResult",
82
+ "content": [{"type": "text", "text": "file contents: line1\nline2\nline3"}],
83
+ "toolName": "read_file",
84
+ },
85
+ }),
86
+ json.dumps({
87
+ "type": "message",
88
+ "timestamp": "2026-04-05T10:00:15Z",
89
+ "message": {
90
+ "role": "toolResult",
91
+ "content": [{"type": "text", "text": ""}],
92
+ "toolName": "write_file",
93
+ "isError": True,
94
+ },
95
+ }),
96
+ json.dumps({
97
+ "type": "message",
98
+ "timestamp": "2026-04-05T10:00:20Z",
99
+ "message": {
100
+ "role": "assistant",
101
+ "content": [{"type": "text", "text": "I'll use the read_file tool to check."}],
102
+ },
103
+ }),
104
+ ]
105
+
106
+
107
+ @pytest.fixture
108
+ def sample_spooled_entries():
109
+ """Pre-processed spooled entry dicts for reviewer tests."""
110
+ return [
111
+ {
112
+ "session_id": "s1",
113
+ "agent_id": "ops",
114
+ "entry_idx": 1,
115
+ "entry_type": "message",
116
+ "role": "user",
117
+ "timestamp": "2026-04-05T10:00:00Z",
118
+ "tool_name": "",
119
+ "clean_text": "Help me debug this issue",
120
+ "original_length": 25,
121
+ "preview": "Help me debug this issue",
122
+ "is_error": 0,
123
+ },
124
+ {
125
+ "session_id": "s1",
126
+ "agent_id": "ops",
127
+ "entry_idx": 2,
128
+ "entry_type": "message",
129
+ "role": "assistant",
130
+ "timestamp": "2026-04-05T10:00:05Z",
131
+ "tool_name": "",
132
+ "clean_text": "I'll use the read_file tool to inspect the code.",
133
+ "original_length": 48,
134
+ "preview": "I'll use the read_file tool to inspect the code.",
135
+ "is_error": 0,
136
+ },
137
+ {
138
+ "session_id": "s1",
139
+ "agent_id": "ops",
140
+ "entry_idx": 3,
141
+ "entry_type": "message",
142
+ "role": "toolResult",
143
+ "timestamp": "2026-04-05T10:00:10Z",
144
+ "tool_name": "read_file",
145
+ "clean_text": "x" * 5000,
146
+ "original_length": 5000,
147
+ "preview": "x" * 300,
148
+ "is_error": 0,
149
+ },
150
+ {
151
+ "session_id": "s1",
152
+ "agent_id": "ops",
153
+ "entry_idx": 4,
154
+ "entry_type": "message",
155
+ "role": "toolResult",
156
+ "timestamp": "2026-04-05T10:00:15Z",
157
+ "tool_name": "write_file",
158
+ "clean_text": "",
159
+ "original_length": 0,
160
+ "preview": "",
161
+ "is_error": 1,
162
+ },
163
+ ]
tests/failure_classifier_smoke.py ADDED
@@ -0,0 +1,25 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ """Standalone smoke test for deterministic failure-mode classification."""
3
+ import os
4
+ import sys
5
+
6
+ sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
7
+
8
+ from reviewer.intelligence import _classify_failure_text, _failure_recommendation
9
+
10
+
11
+ def main() -> int:
12
+ assert _classify_failure_text("approval required before running this command") == "approval_or_permission_stall"
13
+ assert _classify_failure_text("Permission denied while opening file") == "approval_or_permission_stall"
14
+ assert _classify_failure_text("permission model docs mention safe defaults") is None
15
+ assert _classify_failure_text("request timed out after 30s") == "timeout"
16
+ assert _classify_failure_text("context deadline exceeded") == "timeout"
17
+ assert _classify_failure_text("model fallback activated") == "model_failover_or_fallback"
18
+ assert _classify_failure_text("no session found for id abc") == "stale_session_reference"
19
+ assert "heartbeat" in _failure_recommendation("timeout")
20
+ print("failure_classifier_smoke: ok")
21
+ return 0
22
+
23
+
24
+ if __name__ == "__main__":
25
+ raise SystemExit(main())
tests/session_snapshot_smoke.py ADDED
@@ -0,0 +1,55 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ """Standalone smoke test for session snapshot generation.
3
+
4
+ This intentionally avoids pytest so the sidecar can be checked in constrained
5
+ gateway/runtime shells where pytest is not installed.
6
+ """
7
+ import os
8
+ import sys
9
+
10
+ sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
11
+
12
+ from reviewer import session_snapshot as snap
13
+
14
+
15
+ def main() -> int:
16
+ summary = {
17
+ "session_id": "s1",
18
+ "agent_id": "ops",
19
+ "last_event_at": "2026-05-12T12:00:00+00:00",
20
+ "event_count": 4,
21
+ "tool_result_count": 2,
22
+ "error_count": 1,
23
+ "noisy_tool_results": 0,
24
+ "last_entry_idx": 4,
25
+ "hints": ["1 error(s)"],
26
+ }
27
+ activity = [
28
+ {"session_id": "s1", "agent_id": "ops", "entry_idx": 1, "role": "user", "preview": "deploy to Cloud Run?", "clean_text": "deploy to Cloud Run?", "is_error": 0},
29
+ {"session_id": "s1", "agent_id": "ops", "entry_idx": 2, "role": "toolResult", "tool_name": "exec", "preview": "permission denied", "clean_text": "permission denied", "is_error": 1},
30
+ ]
31
+ original_activity = snap.get_session_activity
32
+ original_recent = snap.get_recent_sessions
33
+ try:
34
+ snap.get_session_activity = lambda session_id, limit: activity
35
+ result = snap.build_session_snapshot(summary, activity_limit=20)
36
+ assert result["schema"] == "openclaw.session.v1"
37
+ assert result["id"] == "s1"
38
+ assert result["state"]["state"] == "warning"
39
+ assert "approval_or_permission" in result["risk"]["flags"]
40
+ assert "external_infra" in result["risk"]["flags"]
41
+ assert result["outputs"]["recent_events"][1]["event_type"] == "tool_error"
42
+
43
+ snap.get_recent_sessions = lambda limit: [summary]
44
+ collection = snap.build_recent_session_snapshots(limit=1, activity_limit=1)
45
+ assert collection["schema"] == "openclaw.session.v1.collection"
46
+ assert collection["count"] == 1
47
+ finally:
48
+ snap.get_session_activity = original_activity
49
+ snap.get_recent_sessions = original_recent
50
+ print("session_snapshot_smoke: ok")
51
+ return 0
52
+
53
+
54
+ if __name__ == "__main__":
55
+ raise SystemExit(main())
tests/skill_analyzer_smoke.py ADDED
@@ -0,0 +1,31 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ """Standalone smoke test for skill coverage alias/path behavior."""
3
+ import os
4
+ import sys
5
+ import tempfile
6
+ from pathlib import Path
7
+
8
+ sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
9
+
10
+ from reviewer import skill_analyzer
11
+
12
+
13
+ def main() -> int:
14
+ assert "google-workspace" in skill_analyzer._skill_aliases("google_workspace")
15
+ assert "arxiv" in skill_analyzer._skill_aliases("arxiv-mcp")
16
+ with tempfile.TemporaryDirectory() as td:
17
+ root = Path(td)
18
+ (root / "google-workspace").mkdir()
19
+ (root / "google-workspace" / "SKILL.md").write_text("---\nname: google-workspace\n---\n")
20
+ old = skill_analyzer.SKILLS_DIR
21
+ try:
22
+ skill_analyzer.SKILLS_DIR = root
23
+ assert skill_analyzer._list_mcp_skills() == ["google-workspace"]
24
+ finally:
25
+ skill_analyzer.SKILLS_DIR = old
26
+ print("skill_analyzer_smoke: ok")
27
+ return 0
28
+
29
+
30
+ if __name__ == "__main__":
31
+ raise SystemExit(main())
tests/test_reviewer.py ADDED
@@ -0,0 +1,86 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from reviewer.scorer import score_session
2
+ from reviewer.pattern_detector import detect_failure_patterns
3
+ from reviewer import skill_analyzer
4
+
5
+ def test_score_session(sample_spooled_entries):
6
+ # Base case with provided sample entries
7
+ result = score_session(sample_spooled_entries)
8
+
9
+ assert "score" in result
10
+ assert result["tool_result_count"] == 2
11
+ assert result["assistant_msg_count"] == 1
12
+ assert result["user_msg_count"] == 1
13
+
14
+ # Check flags triggered by sample entries
15
+ flags = result["flags"]
16
+ assert "excessively_long_tool_output" in flags # One output is 5000 chars length
17
+ assert "empty_tool_results" in flags # One output is empty
18
+ assert "meta_process_narration" in flags # Assistant uses "I'll use the..."
19
+ assert result["score"] < 1.0
20
+
21
+ def test_score_session_fts_noise():
22
+ entries = [
23
+ {"session_id": "s1", "role": "toolResult", "tool_name": "session_search", "clean_text": "0 results found for query xyz", "is_error": 0},
24
+ {"session_id": "s1", "role": "toolResult", "tool_name": "session_search", "clean_text": "no results found. try different terms.", "is_error": 0},
25
+ {"session_id": "s1", "role": "toolResult", "tool_name": "session_search", "clean_text": "0 results", "is_error": 0},
26
+ ]
27
+ result = score_session(entries)
28
+ assert "fts_query_noise" in result["flags"]
29
+
30
+ def test_score_session_empty():
31
+ result = score_session([])
32
+ assert result["score"] == 0.0
33
+ assert "no_data" in result["flags"]
34
+
35
+ def test_detect_failure_patterns():
36
+ # Setup some dummy session scores and entries
37
+ session_scores = [
38
+ {"session_id": "s1", "flags": ["some_flag", "other_flag"]},
39
+ {"session_id": "s2", "flags": ["some_flag"]},
40
+ {"session_id": "s3", "flags": ["some_flag", "third_flag"]},
41
+ {"session_id": "s4", "flags": ["some_flag"]},
42
+ {"session_id": "s5", "flags": ["some_flag"]},
43
+ ]
44
+ spooled_entries = [
45
+ {"session_id": "s1", "role": "toolResult", "tool_name": "bad_tool", "clean_text": "", "is_error": 0},
46
+ {"session_id": "s2", "role": "toolResult", "tool_name": "bad_tool", "clean_text": "", "is_error": 0},
47
+ {"session_id": "s3", "role": "toolResult", "tool_name": "bad_tool", "clean_text": "", "is_error": 0},
48
+ ]
49
+
50
+ patterns = detect_failure_patterns(session_scores, spooled_entries)
51
+
52
+ # We should have recognized 'bad_tool' as having empty output in >= 3 sessions
53
+ assert any(p["pattern"] == "empty_tool_result:bad_tool" for p in patterns)
54
+
55
+ # We should have recognized 'some_flag' triggered in 5 sessions
56
+ assert any(p["pattern"] == "session_flag:some_flag" for p in patterns)
57
+
58
+
59
+ def test_find_skill_candidates_prefers_allowlist_update(tmp_path, monkeypatch):
60
+ skills_dir = tmp_path / "skills"
61
+ skill_dir = skills_dir / "ops-framework"
62
+ skill_dir.mkdir(parents=True)
63
+ (skill_dir / "SKILL.md").write_text(
64
+ "---\n"
65
+ "name: ops-framework\n"
66
+ "description: Use for OpenClaw gateway troubleshooting and deterministic ops checks\n"
67
+ "---\n"
68
+ "# Ops Framework\n"
69
+ "Run bounded diagnostics before changing gateway configuration.\n",
70
+ encoding="utf-8",
71
+ )
72
+ config = tmp_path / "openclaw.json"
73
+ config.write_text(
74
+ '{"agents":{"list":[{"id":"azoth","skills":["context7"]}]}}',
75
+ encoding="utf-8",
76
+ )
77
+ monkeypatch.setattr(skill_analyzer, "SKILLS_DIR", skills_dir)
78
+ monkeypatch.setattr(skill_analyzer, "OPENCLAW_CONFIG", config)
79
+
80
+ result = skill_analyzer.find_skill_candidates(
81
+ "gateway troubleshooting deterministic ops checks", agent_id="azoth"
82
+ )
83
+
84
+ assert result["decision"] == "reuse-existing-skill"
85
+ assert result["allowlistAction"] == "add-existing-skill-to-agent-allowlist"
86
+ assert result["allowlistValue"] == "ops-framework"
tests/test_session_snapshot.py ADDED
@@ -0,0 +1,43 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from reviewer import session_snapshot as snap
2
+
3
+
4
+ def test_build_session_snapshot(monkeypatch):
5
+ summary = {
6
+ "session_id": "s1",
7
+ "agent_id": "ops",
8
+ "last_event_at": "2026-05-12T12:00:00+00:00",
9
+ "event_count": 4,
10
+ "tool_result_count": 2,
11
+ "error_count": 1,
12
+ "noisy_tool_results": 0,
13
+ "last_entry_idx": 4,
14
+ "hints": ["1 error(s)"],
15
+ }
16
+ activity = [
17
+ {"session_id": "s1", "agent_id": "ops", "entry_idx": 1, "role": "user", "preview": "deploy to Cloud Run?", "clean_text": "deploy to Cloud Run?", "is_error": 0},
18
+ {"session_id": "s1", "agent_id": "ops", "entry_idx": 2, "role": "toolResult", "tool_name": "exec", "preview": "permission denied", "clean_text": "permission denied", "is_error": 1},
19
+ ]
20
+ monkeypatch.setattr(snap, "get_session_activity", lambda session_id, limit: activity)
21
+
22
+ result = snap.build_session_snapshot(summary, activity_limit=20)
23
+
24
+ assert result["schema"] == "openclaw.session.v1"
25
+ assert result["id"] == "s1"
26
+ assert result["owner"]["agent_id"] == "ops"
27
+ assert result["state"]["state"] == "warning"
28
+ assert "errors_present" in result["state"]["reasons"]
29
+ assert result["health"]["tool_ratio"] == 0.5
30
+ assert "approval_or_permission" in result["risk"]["flags"]
31
+ assert "external_infra" in result["risk"]["flags"]
32
+ assert result["outputs"]["recent_events"][1]["event_type"] == "tool_error"
33
+
34
+
35
+ def test_build_recent_session_snapshots(monkeypatch):
36
+ monkeypatch.setattr(snap, "get_recent_sessions", lambda limit: [{"session_id": "s1", "agent_id": "ops"}])
37
+ monkeypatch.setattr(snap, "get_session_activity", lambda session_id, limit: [])
38
+
39
+ result = snap.build_recent_session_snapshots(limit=1, activity_limit=1)
40
+
41
+ assert result["schema"] == "openclaw.session.v1.collection"
42
+ assert result["count"] == 1
43
+ assert result["snapshots"][0]["id"] == "s1"
tests/test_spooler.py ADDED
@@ -0,0 +1,75 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ from spooler.processor import _extract_text_from_content, _process_entry
3
+ from spooler.redaction import redact
4
+ from spooler.noise_filter import is_noise
5
+ from spooler.store import _derive_display_title
6
+
7
+ def test_extract_text_from_content():
8
+ # String content
9
+ assert _extract_text_from_content("simple text") == "simple text"
10
+
11
+ # List of blocks
12
+ blocks = [
13
+ {"type": "text", "text": "block 1"},
14
+ {"type": "thinking", "thinking": "thinking text"},
15
+ {"type": "resource", "resource": "res data"},
16
+ {"type": "image", "source": "ignore this"}
17
+ ]
18
+ extracted = _extract_text_from_content(blocks)
19
+ assert "block 1" in extracted
20
+ assert "thinking text" in extracted
21
+ assert "res data" in extracted
22
+ assert "ignore this" not in extracted
23
+
24
+ def test_process_entry(sample_transcript_lines):
25
+ # Parse the user message
26
+ user_entry = json.loads(sample_transcript_lines[1])
27
+ row = _process_entry(user_entry, "sess_1", "agent_1", 1)
28
+ assert row is not None
29
+ assert row["role"] == "user"
30
+ assert row["clean_text"] == "Hello, can you help me?"
31
+ assert row["entry_idx"] == 1
32
+
33
+ # Parse tool result
34
+ tool_entry = json.loads(sample_transcript_lines[3])
35
+ row2 = _process_entry(tool_entry, "sess_1", "agent_1", 3)
36
+ assert row2 is not None
37
+ assert row2["role"] == "toolResult"
38
+ assert row2["tool_name"] == "read_file"
39
+ assert "file contents" in row2["clean_text"]
40
+
41
+ def test_redaction():
42
+ text = "Here is my key: api_key='sk-1234567890abcdef' inside a sentence."
43
+ redacted = redact(text)
44
+ assert "sk-1234567890abcdef" not in redacted
45
+ assert "[REDACTED]" in redacted
46
+
47
+ def test_noise_filter():
48
+ assert is_noise("ENOENT: no such file") == True
49
+ assert is_noise("no output") == True
50
+ assert is_noise("command exited with code 1") == True
51
+ assert is_noise("Valid output from a tool") == False
52
+ assert is_noise("x\n") == True # Too short
53
+ assert is_noise("") == True
54
+
55
+
56
+ def test_derive_display_title_prefers_conversation_text():
57
+ activity = [
58
+ {"role": "system", "clean_text": "ignore system scaffolding"},
59
+ {"role": "user", "clean_text": "Can you inspect the Hermes agent update and dashboard timeout?"},
60
+ ]
61
+
62
+ assert _derive_display_title("548670c7-e187-4960-a714-1f8e70957060", activity) == (
63
+ "Can you inspect the Hermes agent update and dashboard timeout?"
64
+ )
65
+
66
+
67
+ def test_derive_display_title_skips_scaffold_context():
68
+ activity = [
69
+ {"role": "user", "clean_text": "Conversation info (untrusted metadata): long envelope"},
70
+ {"role": "assistant", "clean_text": "Dashboard harvest is complete."},
71
+ ]
72
+
73
+ assert _derive_display_title("548670c7-e187-4960-a714-1f8e70957060", activity) == (
74
+ "Dashboard harvest is complete."
75
+ )