File size: 6,578 Bytes
63c75d5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
"""
Session Amplifier — OpenClaw sidecar
Lightweight transcript spooler + review worker.
"""
import argparse
import os
import threading
import time
from pathlib import Path

from config import settings


def init_db():
    """Create SQLite schema if not present."""
    import sqlite3

    conn = sqlite3.connect(settings.db_path)
    conn.executescript("""
        CREATE TABLE IF NOT EXISTS spooled_entries (
            id INTEGER PRIMARY KEY,
            session_id TEXT NOT NULL,
            agent_id TEXT NOT NULL,
            entry_idx INTEGER NOT NULL,
            entry_type TEXT,
            role TEXT,
            timestamp TEXT,
            tool_name TEXT,
            clean_text TEXT,
            original_length INTEGER,
            preview TEXT,
            is_error INTEGER DEFAULT 0,
            indexed_at TEXT DEFAULT (datetime('now')),
            UNIQUE(session_id, entry_idx)
        );

        CREATE TABLE IF NOT EXISTS spool_state (
            key TEXT PRIMARY KEY,
            value TEXT
        );

        CREATE TABLE IF NOT EXISTS review_reports (
            id INTEGER PRIMARY KEY,
            review_id TEXT UNIQUE,
            generated_at TEXT,
            period_from TEXT,
            period_to TEXT,
            report_json TEXT
        );

        CREATE TABLE IF NOT EXISTS scheduler_jobs (
            job_name TEXT PRIMARY KEY,
            last_run_at TEXT,
            locked_until TEXT,
            locked_by TEXT,
            status TEXT
        );
    """)
    conn.commit()
    conn.close()
    print(f"[init] DB ready at {settings.db_path}")


def run_server():
    import uvicorn
    from api.routes import app

    uvicorn.run(app, host="0.0.0.0", port=settings.api_port)


def run_spooler():
    from spooler.processor import run_spool

    count, sessions = run_spool()
    print(f"[spool] entries={count} sessions={sessions}")


def run_reviewer():
    from reviewer.report import generate_report

    report = generate_report()
    print(f"[review] id={report.get('review_id','?')} sessions={report.get('sessions_reviewed',0)}")


def run_watcher(poll_interval: int = 5, review_every: int = 0):
    """Poll transcript files and run spooler on changes.

    Args:
        poll_interval: seconds between filesystem polls.
        review_every: if > 0, run reviewer every N spool runs. 0 = disabled.
    """
    from spooler.processor import run_spool

    print(f"[watch] watching {settings.openclaw_agents_root} every {poll_interval}s"
          f" (review_every={review_every})")
    seen: dict[str, tuple[int, int]] = {}
    spool_run_count = 0

    while True:
        changed = False
        for transcript_file in settings.openclaw_agents_root.glob("*/sessions/*.jsonl"):
            try:
                # Skip excluded glob patterns
                import fnmatch
                fname = transcript_file.name
                if any(fnmatch.fnmatch(fname, pat) for pat in settings.session_glob_exclude):
                    continue
                # Also keep hard exclude for trajectory files
                if ".trajectory.jsonl" in fname:
                    continue
                parts = transcript_file.relative_to(settings.openclaw_agents_root).parts
                if len(parts) >= 2 and settings.agents_allowlist:
                    agent_part = parts[0]  # agent id is first segment after root
                    if agent_part not in settings.agents_allowlist:
                        continue
                stat = transcript_file.stat()
                fingerprint = (int(stat.st_mtime), stat.st_size)
            except (FileNotFoundError, ValueError):
                continue
            key = str(transcript_file)
            if seen.get(key) != fingerprint:
                seen[key] = fingerprint
                changed = True

        if changed:
            count, sessions = run_spool()
            print(f"[watch] spool run entries={count} sessions={sessions}")
            spool_run_count += 1

            if review_every > 0 and spool_run_count % review_every == 0:
                try:
                    from reviewer.report import generate_report
                    report = generate_report()
                    print(f"[watch] review run id={report.get('review_id','?')}"
                          f" sessions={report.get('sessions_reviewed',0)}")
                except Exception as exc:
                    print(f"[watch] review failed: {exc}")

        time.sleep(poll_interval)


def run_serve_watch(poll_interval: int = 5, review_every: int = 0):
    """Start API server and run file watcher in a background thread."""
    watcher_thread = threading.Thread(
        target=run_watcher,
        args=(poll_interval, review_every),
        daemon=True,
    )
    watcher_thread.start()
    print(f"[serve-watch] watcher thread started (interval={poll_interval}s)")
    run_server()


def main():
    parser = argparse.ArgumentParser(prog="session-amplifier")
    sub = parser.add_subparsers(dest="command", required=True)

    sub.add_parser("init", help="Initialize SQLite schema")
    sub.add_parser("serve", help="Run FastAPI server")
    sub.add_parser("spool", help="Run spooler once")
    sub.add_parser("review", help="Run reviewer once")

    watch_parser = sub.add_parser("watch", help="Poll transcript files and run spooler on changes")
    watch_parser.add_argument("--interval", type=int, default=5, help="Polling interval in seconds")
    watch_parser.add_argument("--review-every", type=int, default=0,
                              help="Run reviewer every N spool runs (0=disabled, default=0)")

    sw_parser = sub.add_parser("serve-watch",
                               help="Run API server + file watcher in background thread")
    sw_parser.add_argument("--interval", type=int, default=5, help="Polling interval in seconds")
    sw_parser.add_argument("--review-every", type=int, default=0,
                           help="Run reviewer every N spool runs (0=disabled, default=0)")

    args = parser.parse_args()

    if args.command == "init":
        init_db()
    elif args.command == "serve":
        init_db()
        run_server()
    elif args.command == "spool":
        init_db()
        run_spooler()
    elif args.command == "review":
        init_db()
        run_reviewer()
    elif args.command == "watch":
        init_db()
        run_watcher(args.interval, args.review_every)
    elif args.command == "serve-watch":
        init_db()
        run_serve_watch(args.interval, args.review_every)


if __name__ == "__main__":
    main()