""" Session Amplifier — OpenClaw sidecar Lightweight transcript spooler + review worker. """ import argparse import os import threading import time from pathlib import Path from config import settings def init_db(): """Create SQLite schema if not present.""" import sqlite3 conn = sqlite3.connect(settings.db_path) conn.executescript(""" CREATE TABLE IF NOT EXISTS spooled_entries ( id INTEGER PRIMARY KEY, session_id TEXT NOT NULL, agent_id TEXT NOT NULL, entry_idx INTEGER NOT NULL, entry_type TEXT, role TEXT, timestamp TEXT, tool_name TEXT, clean_text TEXT, original_length INTEGER, preview TEXT, is_error INTEGER DEFAULT 0, indexed_at TEXT DEFAULT (datetime('now')), UNIQUE(session_id, entry_idx) ); CREATE TABLE IF NOT EXISTS spool_state ( key TEXT PRIMARY KEY, value TEXT ); CREATE TABLE IF NOT EXISTS review_reports ( id INTEGER PRIMARY KEY, review_id TEXT UNIQUE, generated_at TEXT, period_from TEXT, period_to TEXT, report_json TEXT ); CREATE TABLE IF NOT EXISTS scheduler_jobs ( job_name TEXT PRIMARY KEY, last_run_at TEXT, locked_until TEXT, locked_by TEXT, status TEXT ); """) conn.commit() conn.close() print(f"[init] DB ready at {settings.db_path}") def run_server(): import uvicorn from api.routes import app uvicorn.run(app, host="0.0.0.0", port=settings.api_port) def run_spooler(): from spooler.processor import run_spool count, sessions = run_spool() print(f"[spool] entries={count} sessions={sessions}") def run_reviewer(): from reviewer.report import generate_report report = generate_report() print(f"[review] id={report.get('review_id','?')} sessions={report.get('sessions_reviewed',0)}") def run_watcher(poll_interval: int = 5, review_every: int = 0): """Poll transcript files and run spooler on changes. Args: poll_interval: seconds between filesystem polls. review_every: if > 0, run reviewer every N spool runs. 0 = disabled. """ from spooler.processor import run_spool print(f"[watch] watching {settings.openclaw_agents_root} every {poll_interval}s" f" (review_every={review_every})") seen: dict[str, tuple[int, int]] = {} spool_run_count = 0 while True: changed = False for transcript_file in settings.openclaw_agents_root.glob("*/sessions/*.jsonl"): try: # Skip excluded glob patterns import fnmatch fname = transcript_file.name if any(fnmatch.fnmatch(fname, pat) for pat in settings.session_glob_exclude): continue # Also keep hard exclude for trajectory files if ".trajectory.jsonl" in fname: continue parts = transcript_file.relative_to(settings.openclaw_agents_root).parts if len(parts) >= 2 and settings.agents_allowlist: agent_part = parts[0] # agent id is first segment after root if agent_part not in settings.agents_allowlist: continue stat = transcript_file.stat() fingerprint = (int(stat.st_mtime), stat.st_size) except (FileNotFoundError, ValueError): continue key = str(transcript_file) if seen.get(key) != fingerprint: seen[key] = fingerprint changed = True if changed: count, sessions = run_spool() print(f"[watch] spool run entries={count} sessions={sessions}") spool_run_count += 1 if review_every > 0 and spool_run_count % review_every == 0: try: from reviewer.report import generate_report report = generate_report() print(f"[watch] review run id={report.get('review_id','?')}" f" sessions={report.get('sessions_reviewed',0)}") except Exception as exc: print(f"[watch] review failed: {exc}") time.sleep(poll_interval) def run_serve_watch(poll_interval: int = 5, review_every: int = 0): """Start API server and run file watcher in a background thread.""" watcher_thread = threading.Thread( target=run_watcher, args=(poll_interval, review_every), daemon=True, ) watcher_thread.start() print(f"[serve-watch] watcher thread started (interval={poll_interval}s)") run_server() def main(): parser = argparse.ArgumentParser(prog="session-amplifier") sub = parser.add_subparsers(dest="command", required=True) sub.add_parser("init", help="Initialize SQLite schema") sub.add_parser("serve", help="Run FastAPI server") sub.add_parser("spool", help="Run spooler once") sub.add_parser("review", help="Run reviewer once") watch_parser = sub.add_parser("watch", help="Poll transcript files and run spooler on changes") watch_parser.add_argument("--interval", type=int, default=5, help="Polling interval in seconds") watch_parser.add_argument("--review-every", type=int, default=0, help="Run reviewer every N spool runs (0=disabled, default=0)") sw_parser = sub.add_parser("serve-watch", help="Run API server + file watcher in background thread") sw_parser.add_argument("--interval", type=int, default=5, help="Polling interval in seconds") sw_parser.add_argument("--review-every", type=int, default=0, help="Run reviewer every N spool runs (0=disabled, default=0)") args = parser.parse_args() if args.command == "init": init_db() elif args.command == "serve": init_db() run_server() elif args.command == "spool": init_db() run_spooler() elif args.command == "review": init_db() run_reviewer() elif args.command == "watch": init_db() run_watcher(args.interval, args.review_every) elif args.command == "serve-watch": init_db() run_serve_watch(args.interval, args.review_every) if __name__ == "__main__": main()