File size: 6,359 Bytes
63c75d5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 | import json
import logging
from pathlib import Path
from config import settings
from spooler.redaction import redact
from spooler.noise_filter import is_noise
from spooler.store import get_file_state, set_file_state, spool_entries
log = logging.getLogger("spooler")
MESSAGE_TYPES_TO_SPOOL = {"message"}
ROLES_TO_SPOOL = {"user", "assistant", "toolResult", "system", "developer"}
BLOCK_TYPES_TO_KEEP = {"text", "input_text", "resource", "thinking"}
def _extract_text_from_content(content) -> str:
text_parts: list[str] = []
if isinstance(content, str):
text_parts.append(content)
elif isinstance(content, list):
for block in content:
if not isinstance(block, dict):
text_parts.append(str(block))
continue
block_type = block.get("type")
if block_type in ("text", "input_text"):
text_parts.append(block.get("text", ""))
elif block_type == "thinking":
text_parts.append(block.get("thinking", ""))
elif block_type == "resource":
text_parts.append(str(block.get("resource", "")))
elif content is not None:
text_parts.append(str(content))
return "\n".join(part for part in text_parts if part)
def _extract_message_payload(entry: dict) -> tuple[str, str, str, bool]:
message = entry.get("message") or {}
role = message.get("role", "")
tool_name = message.get("toolName", "") or entry.get("toolName", "")
is_error = bool(message.get("isError") or entry.get("isError"))
raw_text = _extract_text_from_content(message.get("content", ""))
return role, tool_name, raw_text, is_error
def _process_entry(entry: dict, session_id: str, agent_id: str, entry_idx: int) -> dict | None:
"""Transform a raw transcript entry into a spooled row."""
entry_type = entry.get("type", "")
if entry_type not in MESSAGE_TYPES_TO_SPOOL:
return None
role, tool_name, raw_text, is_error = _extract_message_payload(entry)
if role not in ROLES_TO_SPOOL:
return None
timestamp = entry.get("timestamp", "")
original_length = len(raw_text)
clean_text = redact(raw_text)
if len(clean_text) > settings.max_toolresult_chars:
clean_text = clean_text[: settings.max_toolresult_chars] + "\n... [truncated]"
if role == "toolResult" and is_noise(clean_text):
return None
preview = clean_text[:300]
return {
"session_id": session_id,
"agent_id": agent_id,
"entry_idx": entry_idx,
"entry_type": entry_type,
"role": role,
"timestamp": timestamp,
"tool_name": tool_name,
"clean_text": clean_text,
"original_length": original_length,
"preview": preview,
"is_error": is_error,
}
def _process_transcript_file(path: Path, agent_id: str, *, skip_before: int = -1) -> list[dict]:
"""Parse a transcript JSONL and return spooled rows.
Args:
skip_before: skip entries with idx <= this value (incremental ingestion).
"""
rows = []
session_id = path.stem
try:
with open(path) as f:
for idx, line in enumerate(f):
if idx <= skip_before:
continue
line = line.strip()
if not line:
continue
try:
entry = json.loads(line)
except json.JSONDecodeError:
continue
if entry.get("type") == "session" and entry.get("id"):
session_id = entry["id"]
continue
row = _process_entry(entry, session_id, agent_id, idx)
if row:
rows.append(row)
except Exception as exc:
log.warning("Failed to process %s: %s", path, exc)
return rows
def run_spool() -> tuple[int, int]:
"""Walk agent session dirs, process transcript messages into SQLite."""
agents_root = settings.openclaw_agents_root
if not agents_root.exists():
log.warning("Agents root not found: %s", agents_root)
return 0, 0
all_rows = []
sessions_updated = 0
for agent_dir in agents_root.iterdir():
if not agent_dir.is_dir():
continue
agent_id = agent_dir.name
# Skip if agent not in allowlist (allowlist empty = watch all)
if settings.agents_allowlist and agent_id not in settings.agents_allowlist:
continue
sessions_dir = agent_dir / "sessions"
if not sessions_dir.exists():
continue
for transcript_file in sessions_dir.glob("*.jsonl"):
# Skip excluded glob patterns and trajectory files
import fnmatch
fname = transcript_file.name
if any(fnmatch.fnmatch(fname, pat) for pat in settings.session_glob_exclude):
continue
if ".trajectory.jsonl" in fname:
continue
try:
stat = transcript_file.stat()
current_state = {"mtime": int(stat.st_mtime), "size": stat.st_size}
except FileNotFoundError:
continue
previous_state = get_file_state(str(transcript_file))
if previous_state and previous_state.get("mtime") == current_state["mtime"] \
and previous_state.get("size") == current_state["size"]:
continue
skip_before = previous_state.get("last_entry_idx", -1) if previous_state else -1
rows = _process_transcript_file(transcript_file, agent_id, skip_before=skip_before)
max_idx = skip_before
if rows:
all_rows.extend(rows)
sessions_updated += 1
max_idx = max(r["entry_idx"] for r in rows)
set_file_state(str(transcript_file),
mtime=current_state["mtime"],
size=current_state["size"],
last_entry_idx=max(max_idx, skip_before))
if not all_rows:
return 0, 0
batch_size = settings.spooler_batch_size
inserted = 0
for i in range(0, len(all_rows), batch_size):
batch = all_rows[i : i + batch_size]
inserted += spool_entries(batch)
return inserted, sessions_updated
|