| """ |
| Session Amplifier — OpenClaw sidecar |
| Lightweight transcript spooler + review worker. |
| """ |
| import argparse |
| import os |
| import threading |
| import time |
| from pathlib import Path |
|
|
| from config import settings |
|
|
|
|
| def init_db(): |
| """Create SQLite schema if not present.""" |
| import sqlite3 |
|
|
| conn = sqlite3.connect(settings.db_path) |
| conn.executescript(""" |
| CREATE TABLE IF NOT EXISTS spooled_entries ( |
| id INTEGER PRIMARY KEY, |
| session_id TEXT NOT NULL, |
| agent_id TEXT NOT NULL, |
| entry_idx INTEGER NOT NULL, |
| entry_type TEXT, |
| role TEXT, |
| timestamp TEXT, |
| tool_name TEXT, |
| clean_text TEXT, |
| original_length INTEGER, |
| preview TEXT, |
| is_error INTEGER DEFAULT 0, |
| indexed_at TEXT DEFAULT (datetime('now')), |
| UNIQUE(session_id, entry_idx) |
| ); |
| |
| CREATE TABLE IF NOT EXISTS spool_state ( |
| key TEXT PRIMARY KEY, |
| value TEXT |
| ); |
| |
| CREATE TABLE IF NOT EXISTS review_reports ( |
| id INTEGER PRIMARY KEY, |
| review_id TEXT UNIQUE, |
| generated_at TEXT, |
| period_from TEXT, |
| period_to TEXT, |
| report_json TEXT |
| ); |
| |
| CREATE TABLE IF NOT EXISTS scheduler_jobs ( |
| job_name TEXT PRIMARY KEY, |
| last_run_at TEXT, |
| locked_until TEXT, |
| locked_by TEXT, |
| status TEXT |
| ); |
| """) |
| conn.commit() |
| conn.close() |
| print(f"[init] DB ready at {settings.db_path}") |
|
|
|
|
| def run_server(): |
| import uvicorn |
| from api.routes import app |
|
|
| uvicorn.run(app, host="0.0.0.0", port=settings.api_port) |
|
|
|
|
| def run_spooler(): |
| from spooler.processor import run_spool |
|
|
| count, sessions = run_spool() |
| print(f"[spool] entries={count} sessions={sessions}") |
|
|
|
|
| def run_reviewer(): |
| from reviewer.report import generate_report |
|
|
| report = generate_report() |
| print(f"[review] id={report.get('review_id','?')} sessions={report.get('sessions_reviewed',0)}") |
|
|
|
|
| def run_watcher(poll_interval: int = 5, review_every: int = 0): |
| """Poll transcript files and run spooler on changes. |
| |
| Args: |
| poll_interval: seconds between filesystem polls. |
| review_every: if > 0, run reviewer every N spool runs. 0 = disabled. |
| """ |
| from spooler.processor import run_spool |
|
|
| print(f"[watch] watching {settings.openclaw_agents_root} every {poll_interval}s" |
| f" (review_every={review_every})") |
| seen: dict[str, tuple[int, int]] = {} |
| spool_run_count = 0 |
|
|
| while True: |
| changed = False |
| for transcript_file in settings.openclaw_agents_root.glob("*/sessions/*.jsonl"): |
| try: |
| |
| import fnmatch |
| fname = transcript_file.name |
| if any(fnmatch.fnmatch(fname, pat) for pat in settings.session_glob_exclude): |
| continue |
| |
| if ".trajectory.jsonl" in fname: |
| continue |
| parts = transcript_file.relative_to(settings.openclaw_agents_root).parts |
| if len(parts) >= 2 and settings.agents_allowlist: |
| agent_part = parts[0] |
| if agent_part not in settings.agents_allowlist: |
| continue |
| stat = transcript_file.stat() |
| fingerprint = (int(stat.st_mtime), stat.st_size) |
| except (FileNotFoundError, ValueError): |
| continue |
| key = str(transcript_file) |
| if seen.get(key) != fingerprint: |
| seen[key] = fingerprint |
| changed = True |
|
|
| if changed: |
| count, sessions = run_spool() |
| print(f"[watch] spool run entries={count} sessions={sessions}") |
| spool_run_count += 1 |
|
|
| if review_every > 0 and spool_run_count % review_every == 0: |
| try: |
| from reviewer.report import generate_report |
| report = generate_report() |
| print(f"[watch] review run id={report.get('review_id','?')}" |
| f" sessions={report.get('sessions_reviewed',0)}") |
| except Exception as exc: |
| print(f"[watch] review failed: {exc}") |
|
|
| time.sleep(poll_interval) |
|
|
|
|
| def run_serve_watch(poll_interval: int = 5, review_every: int = 0): |
| """Start API server and run file watcher in a background thread.""" |
| watcher_thread = threading.Thread( |
| target=run_watcher, |
| args=(poll_interval, review_every), |
| daemon=True, |
| ) |
| watcher_thread.start() |
| print(f"[serve-watch] watcher thread started (interval={poll_interval}s)") |
| run_server() |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser(prog="session-amplifier") |
| sub = parser.add_subparsers(dest="command", required=True) |
|
|
| sub.add_parser("init", help="Initialize SQLite schema") |
| sub.add_parser("serve", help="Run FastAPI server") |
| sub.add_parser("spool", help="Run spooler once") |
| sub.add_parser("review", help="Run reviewer once") |
|
|
| watch_parser = sub.add_parser("watch", help="Poll transcript files and run spooler on changes") |
| watch_parser.add_argument("--interval", type=int, default=5, help="Polling interval in seconds") |
| watch_parser.add_argument("--review-every", type=int, default=0, |
| help="Run reviewer every N spool runs (0=disabled, default=0)") |
|
|
| sw_parser = sub.add_parser("serve-watch", |
| help="Run API server + file watcher in background thread") |
| sw_parser.add_argument("--interval", type=int, default=5, help="Polling interval in seconds") |
| sw_parser.add_argument("--review-every", type=int, default=0, |
| help="Run reviewer every N spool runs (0=disabled, default=0)") |
|
|
| args = parser.parse_args() |
|
|
| if args.command == "init": |
| init_db() |
| elif args.command == "serve": |
| init_db() |
| run_server() |
| elif args.command == "spool": |
| init_db() |
| run_spooler() |
| elif args.command == "review": |
| init_db() |
| run_reviewer() |
| elif args.command == "watch": |
| init_db() |
| run_watcher(args.interval, args.review_every) |
| elif args.command == "serve-watch": |
| init_db() |
| run_serve_watch(args.interval, args.review_every) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|