Ordo
Initial public release
63c75d5
"""
Session Amplifier — OpenClaw sidecar
Lightweight transcript spooler + review worker.
"""
import argparse
import os
import threading
import time
from pathlib import Path
from config import settings
def init_db():
"""Create SQLite schema if not present."""
import sqlite3
conn = sqlite3.connect(settings.db_path)
conn.executescript("""
CREATE TABLE IF NOT EXISTS spooled_entries (
id INTEGER PRIMARY KEY,
session_id TEXT NOT NULL,
agent_id TEXT NOT NULL,
entry_idx INTEGER NOT NULL,
entry_type TEXT,
role TEXT,
timestamp TEXT,
tool_name TEXT,
clean_text TEXT,
original_length INTEGER,
preview TEXT,
is_error INTEGER DEFAULT 0,
indexed_at TEXT DEFAULT (datetime('now')),
UNIQUE(session_id, entry_idx)
);
CREATE TABLE IF NOT EXISTS spool_state (
key TEXT PRIMARY KEY,
value TEXT
);
CREATE TABLE IF NOT EXISTS review_reports (
id INTEGER PRIMARY KEY,
review_id TEXT UNIQUE,
generated_at TEXT,
period_from TEXT,
period_to TEXT,
report_json TEXT
);
CREATE TABLE IF NOT EXISTS scheduler_jobs (
job_name TEXT PRIMARY KEY,
last_run_at TEXT,
locked_until TEXT,
locked_by TEXT,
status TEXT
);
""")
conn.commit()
conn.close()
print(f"[init] DB ready at {settings.db_path}")
def run_server():
import uvicorn
from api.routes import app
uvicorn.run(app, host="0.0.0.0", port=settings.api_port)
def run_spooler():
from spooler.processor import run_spool
count, sessions = run_spool()
print(f"[spool] entries={count} sessions={sessions}")
def run_reviewer():
from reviewer.report import generate_report
report = generate_report()
print(f"[review] id={report.get('review_id','?')} sessions={report.get('sessions_reviewed',0)}")
def run_watcher(poll_interval: int = 5, review_every: int = 0):
"""Poll transcript files and run spooler on changes.
Args:
poll_interval: seconds between filesystem polls.
review_every: if > 0, run reviewer every N spool runs. 0 = disabled.
"""
from spooler.processor import run_spool
print(f"[watch] watching {settings.openclaw_agents_root} every {poll_interval}s"
f" (review_every={review_every})")
seen: dict[str, tuple[int, int]] = {}
spool_run_count = 0
while True:
changed = False
for transcript_file in settings.openclaw_agents_root.glob("*/sessions/*.jsonl"):
try:
# Skip excluded glob patterns
import fnmatch
fname = transcript_file.name
if any(fnmatch.fnmatch(fname, pat) for pat in settings.session_glob_exclude):
continue
# Also keep hard exclude for trajectory files
if ".trajectory.jsonl" in fname:
continue
parts = transcript_file.relative_to(settings.openclaw_agents_root).parts
if len(parts) >= 2 and settings.agents_allowlist:
agent_part = parts[0] # agent id is first segment after root
if agent_part not in settings.agents_allowlist:
continue
stat = transcript_file.stat()
fingerprint = (int(stat.st_mtime), stat.st_size)
except (FileNotFoundError, ValueError):
continue
key = str(transcript_file)
if seen.get(key) != fingerprint:
seen[key] = fingerprint
changed = True
if changed:
count, sessions = run_spool()
print(f"[watch] spool run entries={count} sessions={sessions}")
spool_run_count += 1
if review_every > 0 and spool_run_count % review_every == 0:
try:
from reviewer.report import generate_report
report = generate_report()
print(f"[watch] review run id={report.get('review_id','?')}"
f" sessions={report.get('sessions_reviewed',0)}")
except Exception as exc:
print(f"[watch] review failed: {exc}")
time.sleep(poll_interval)
def run_serve_watch(poll_interval: int = 5, review_every: int = 0):
"""Start API server and run file watcher in a background thread."""
watcher_thread = threading.Thread(
target=run_watcher,
args=(poll_interval, review_every),
daemon=True,
)
watcher_thread.start()
print(f"[serve-watch] watcher thread started (interval={poll_interval}s)")
run_server()
def main():
parser = argparse.ArgumentParser(prog="session-amplifier")
sub = parser.add_subparsers(dest="command", required=True)
sub.add_parser("init", help="Initialize SQLite schema")
sub.add_parser("serve", help="Run FastAPI server")
sub.add_parser("spool", help="Run spooler once")
sub.add_parser("review", help="Run reviewer once")
watch_parser = sub.add_parser("watch", help="Poll transcript files and run spooler on changes")
watch_parser.add_argument("--interval", type=int, default=5, help="Polling interval in seconds")
watch_parser.add_argument("--review-every", type=int, default=0,
help="Run reviewer every N spool runs (0=disabled, default=0)")
sw_parser = sub.add_parser("serve-watch",
help="Run API server + file watcher in background thread")
sw_parser.add_argument("--interval", type=int, default=5, help="Polling interval in seconds")
sw_parser.add_argument("--review-every", type=int, default=0,
help="Run reviewer every N spool runs (0=disabled, default=0)")
args = parser.parse_args()
if args.command == "init":
init_db()
elif args.command == "serve":
init_db()
run_server()
elif args.command == "spool":
init_db()
run_spooler()
elif args.command == "review":
init_db()
run_reviewer()
elif args.command == "watch":
init_db()
run_watcher(args.interval, args.review_every)
elif args.command == "serve-watch":
init_db()
run_serve_watch(args.interval, args.review_every)
if __name__ == "__main__":
main()