File size: 6,578 Bytes
63c75d5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 | """
Session Amplifier — OpenClaw sidecar
Lightweight transcript spooler + review worker.
"""
import argparse
import os
import threading
import time
from pathlib import Path
from config import settings
def init_db():
"""Create SQLite schema if not present."""
import sqlite3
conn = sqlite3.connect(settings.db_path)
conn.executescript("""
CREATE TABLE IF NOT EXISTS spooled_entries (
id INTEGER PRIMARY KEY,
session_id TEXT NOT NULL,
agent_id TEXT NOT NULL,
entry_idx INTEGER NOT NULL,
entry_type TEXT,
role TEXT,
timestamp TEXT,
tool_name TEXT,
clean_text TEXT,
original_length INTEGER,
preview TEXT,
is_error INTEGER DEFAULT 0,
indexed_at TEXT DEFAULT (datetime('now')),
UNIQUE(session_id, entry_idx)
);
CREATE TABLE IF NOT EXISTS spool_state (
key TEXT PRIMARY KEY,
value TEXT
);
CREATE TABLE IF NOT EXISTS review_reports (
id INTEGER PRIMARY KEY,
review_id TEXT UNIQUE,
generated_at TEXT,
period_from TEXT,
period_to TEXT,
report_json TEXT
);
CREATE TABLE IF NOT EXISTS scheduler_jobs (
job_name TEXT PRIMARY KEY,
last_run_at TEXT,
locked_until TEXT,
locked_by TEXT,
status TEXT
);
""")
conn.commit()
conn.close()
print(f"[init] DB ready at {settings.db_path}")
def run_server():
import uvicorn
from api.routes import app
uvicorn.run(app, host="0.0.0.0", port=settings.api_port)
def run_spooler():
from spooler.processor import run_spool
count, sessions = run_spool()
print(f"[spool] entries={count} sessions={sessions}")
def run_reviewer():
from reviewer.report import generate_report
report = generate_report()
print(f"[review] id={report.get('review_id','?')} sessions={report.get('sessions_reviewed',0)}")
def run_watcher(poll_interval: int = 5, review_every: int = 0):
"""Poll transcript files and run spooler on changes.
Args:
poll_interval: seconds between filesystem polls.
review_every: if > 0, run reviewer every N spool runs. 0 = disabled.
"""
from spooler.processor import run_spool
print(f"[watch] watching {settings.openclaw_agents_root} every {poll_interval}s"
f" (review_every={review_every})")
seen: dict[str, tuple[int, int]] = {}
spool_run_count = 0
while True:
changed = False
for transcript_file in settings.openclaw_agents_root.glob("*/sessions/*.jsonl"):
try:
# Skip excluded glob patterns
import fnmatch
fname = transcript_file.name
if any(fnmatch.fnmatch(fname, pat) for pat in settings.session_glob_exclude):
continue
# Also keep hard exclude for trajectory files
if ".trajectory.jsonl" in fname:
continue
parts = transcript_file.relative_to(settings.openclaw_agents_root).parts
if len(parts) >= 2 and settings.agents_allowlist:
agent_part = parts[0] # agent id is first segment after root
if agent_part not in settings.agents_allowlist:
continue
stat = transcript_file.stat()
fingerprint = (int(stat.st_mtime), stat.st_size)
except (FileNotFoundError, ValueError):
continue
key = str(transcript_file)
if seen.get(key) != fingerprint:
seen[key] = fingerprint
changed = True
if changed:
count, sessions = run_spool()
print(f"[watch] spool run entries={count} sessions={sessions}")
spool_run_count += 1
if review_every > 0 and spool_run_count % review_every == 0:
try:
from reviewer.report import generate_report
report = generate_report()
print(f"[watch] review run id={report.get('review_id','?')}"
f" sessions={report.get('sessions_reviewed',0)}")
except Exception as exc:
print(f"[watch] review failed: {exc}")
time.sleep(poll_interval)
def run_serve_watch(poll_interval: int = 5, review_every: int = 0):
"""Start API server and run file watcher in a background thread."""
watcher_thread = threading.Thread(
target=run_watcher,
args=(poll_interval, review_every),
daemon=True,
)
watcher_thread.start()
print(f"[serve-watch] watcher thread started (interval={poll_interval}s)")
run_server()
def main():
parser = argparse.ArgumentParser(prog="session-amplifier")
sub = parser.add_subparsers(dest="command", required=True)
sub.add_parser("init", help="Initialize SQLite schema")
sub.add_parser("serve", help="Run FastAPI server")
sub.add_parser("spool", help="Run spooler once")
sub.add_parser("review", help="Run reviewer once")
watch_parser = sub.add_parser("watch", help="Poll transcript files and run spooler on changes")
watch_parser.add_argument("--interval", type=int, default=5, help="Polling interval in seconds")
watch_parser.add_argument("--review-every", type=int, default=0,
help="Run reviewer every N spool runs (0=disabled, default=0)")
sw_parser = sub.add_parser("serve-watch",
help="Run API server + file watcher in background thread")
sw_parser.add_argument("--interval", type=int, default=5, help="Polling interval in seconds")
sw_parser.add_argument("--review-every", type=int, default=0,
help="Run reviewer every N spool runs (0=disabled, default=0)")
args = parser.parse_args()
if args.command == "init":
init_db()
elif args.command == "serve":
init_db()
run_server()
elif args.command == "spool":
init_db()
run_spooler()
elif args.command == "review":
init_db()
run_reviewer()
elif args.command == "watch":
init_db()
run_watcher(args.interval, args.review_every)
elif args.command == "serve-watch":
init_db()
run_serve_watch(args.interval, args.review_every)
if __name__ == "__main__":
main()
|