File size: 6,359 Bytes
63c75d5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
import json
import logging
from pathlib import Path

from config import settings
from spooler.redaction import redact
from spooler.noise_filter import is_noise
from spooler.store import get_file_state, set_file_state, spool_entries

log = logging.getLogger("spooler")


MESSAGE_TYPES_TO_SPOOL = {"message"}
ROLES_TO_SPOOL = {"user", "assistant", "toolResult", "system", "developer"}
BLOCK_TYPES_TO_KEEP = {"text", "input_text", "resource", "thinking"}


def _extract_text_from_content(content) -> str:
    text_parts: list[str] = []
    if isinstance(content, str):
        text_parts.append(content)
    elif isinstance(content, list):
        for block in content:
            if not isinstance(block, dict):
                text_parts.append(str(block))
                continue
            block_type = block.get("type")
            if block_type in ("text", "input_text"):
                text_parts.append(block.get("text", ""))
            elif block_type == "thinking":
                text_parts.append(block.get("thinking", ""))
            elif block_type == "resource":
                text_parts.append(str(block.get("resource", "")))
    elif content is not None:
        text_parts.append(str(content))
    return "\n".join(part for part in text_parts if part)


def _extract_message_payload(entry: dict) -> tuple[str, str, str, bool]:
    message = entry.get("message") or {}
    role = message.get("role", "")
    tool_name = message.get("toolName", "") or entry.get("toolName", "")
    is_error = bool(message.get("isError") or entry.get("isError"))
    raw_text = _extract_text_from_content(message.get("content", ""))
    return role, tool_name, raw_text, is_error


def _process_entry(entry: dict, session_id: str, agent_id: str, entry_idx: int) -> dict | None:
    """Transform a raw transcript entry into a spooled row."""
    entry_type = entry.get("type", "")
    if entry_type not in MESSAGE_TYPES_TO_SPOOL:
        return None

    role, tool_name, raw_text, is_error = _extract_message_payload(entry)
    if role not in ROLES_TO_SPOOL:
        return None

    timestamp = entry.get("timestamp", "")
    original_length = len(raw_text)
    clean_text = redact(raw_text)

    if len(clean_text) > settings.max_toolresult_chars:
        clean_text = clean_text[: settings.max_toolresult_chars] + "\n... [truncated]"

    if role == "toolResult" and is_noise(clean_text):
        return None

    preview = clean_text[:300]

    return {
        "session_id": session_id,
        "agent_id": agent_id,
        "entry_idx": entry_idx,
        "entry_type": entry_type,
        "role": role,
        "timestamp": timestamp,
        "tool_name": tool_name,
        "clean_text": clean_text,
        "original_length": original_length,
        "preview": preview,
        "is_error": is_error,
    }


def _process_transcript_file(path: Path, agent_id: str, *, skip_before: int = -1) -> list[dict]:
    """Parse a transcript JSONL and return spooled rows.

    Args:
        skip_before: skip entries with idx <= this value (incremental ingestion).
    """
    rows = []
    session_id = path.stem
    try:
        with open(path) as f:
            for idx, line in enumerate(f):
                if idx <= skip_before:
                    continue
                line = line.strip()
                if not line:
                    continue
                try:
                    entry = json.loads(line)
                except json.JSONDecodeError:
                    continue
                if entry.get("type") == "session" and entry.get("id"):
                    session_id = entry["id"]
                    continue
                row = _process_entry(entry, session_id, agent_id, idx)
                if row:
                    rows.append(row)
    except Exception as exc:
        log.warning("Failed to process %s: %s", path, exc)
    return rows


def run_spool() -> tuple[int, int]:
    """Walk agent session dirs, process transcript messages into SQLite."""
    agents_root = settings.openclaw_agents_root
    if not agents_root.exists():
        log.warning("Agents root not found: %s", agents_root)
        return 0, 0

    all_rows = []
    sessions_updated = 0

    for agent_dir in agents_root.iterdir():
        if not agent_dir.is_dir():
            continue
        agent_id = agent_dir.name
        # Skip if agent not in allowlist (allowlist empty = watch all)
        if settings.agents_allowlist and agent_id not in settings.agents_allowlist:
            continue
        sessions_dir = agent_dir / "sessions"
        if not sessions_dir.exists():
            continue

        for transcript_file in sessions_dir.glob("*.jsonl"):
            # Skip excluded glob patterns and trajectory files
            import fnmatch
            fname = transcript_file.name
            if any(fnmatch.fnmatch(fname, pat) for pat in settings.session_glob_exclude):
                continue
            if ".trajectory.jsonl" in fname:
                continue
            try:
                stat = transcript_file.stat()
                current_state = {"mtime": int(stat.st_mtime), "size": stat.st_size}
            except FileNotFoundError:
                continue

            previous_state = get_file_state(str(transcript_file))
            if previous_state and previous_state.get("mtime") == current_state["mtime"] \
                    and previous_state.get("size") == current_state["size"]:
                continue

            skip_before = previous_state.get("last_entry_idx", -1) if previous_state else -1
            rows = _process_transcript_file(transcript_file, agent_id, skip_before=skip_before)
            max_idx = skip_before
            if rows:
                all_rows.extend(rows)
                sessions_updated += 1
                max_idx = max(r["entry_idx"] for r in rows)
            set_file_state(str(transcript_file),
                           mtime=current_state["mtime"],
                           size=current_state["size"],
                           last_entry_idx=max(max_idx, skip_before))

    if not all_rows:
        return 0, 0

    batch_size = settings.spooler_batch_size
    inserted = 0
    for i in range(0, len(all_rows), batch_size):
        batch = all_rows[i : i + batch_size]
        inserted += spool_entries(batch)
    return inserted, sessions_updated