Ordo
Initial public release
63c75d5
import json
import logging
from pathlib import Path
from config import settings
from spooler.redaction import redact
from spooler.noise_filter import is_noise
from spooler.store import get_file_state, set_file_state, spool_entries
log = logging.getLogger("spooler")
MESSAGE_TYPES_TO_SPOOL = {"message"}
ROLES_TO_SPOOL = {"user", "assistant", "toolResult", "system", "developer"}
BLOCK_TYPES_TO_KEEP = {"text", "input_text", "resource", "thinking"}
def _extract_text_from_content(content) -> str:
text_parts: list[str] = []
if isinstance(content, str):
text_parts.append(content)
elif isinstance(content, list):
for block in content:
if not isinstance(block, dict):
text_parts.append(str(block))
continue
block_type = block.get("type")
if block_type in ("text", "input_text"):
text_parts.append(block.get("text", ""))
elif block_type == "thinking":
text_parts.append(block.get("thinking", ""))
elif block_type == "resource":
text_parts.append(str(block.get("resource", "")))
elif content is not None:
text_parts.append(str(content))
return "\n".join(part for part in text_parts if part)
def _extract_message_payload(entry: dict) -> tuple[str, str, str, bool]:
message = entry.get("message") or {}
role = message.get("role", "")
tool_name = message.get("toolName", "") or entry.get("toolName", "")
is_error = bool(message.get("isError") or entry.get("isError"))
raw_text = _extract_text_from_content(message.get("content", ""))
return role, tool_name, raw_text, is_error
def _process_entry(entry: dict, session_id: str, agent_id: str, entry_idx: int) -> dict | None:
"""Transform a raw transcript entry into a spooled row."""
entry_type = entry.get("type", "")
if entry_type not in MESSAGE_TYPES_TO_SPOOL:
return None
role, tool_name, raw_text, is_error = _extract_message_payload(entry)
if role not in ROLES_TO_SPOOL:
return None
timestamp = entry.get("timestamp", "")
original_length = len(raw_text)
clean_text = redact(raw_text)
if len(clean_text) > settings.max_toolresult_chars:
clean_text = clean_text[: settings.max_toolresult_chars] + "\n... [truncated]"
if role == "toolResult" and is_noise(clean_text):
return None
preview = clean_text[:300]
return {
"session_id": session_id,
"agent_id": agent_id,
"entry_idx": entry_idx,
"entry_type": entry_type,
"role": role,
"timestamp": timestamp,
"tool_name": tool_name,
"clean_text": clean_text,
"original_length": original_length,
"preview": preview,
"is_error": is_error,
}
def _process_transcript_file(path: Path, agent_id: str, *, skip_before: int = -1) -> list[dict]:
"""Parse a transcript JSONL and return spooled rows.
Args:
skip_before: skip entries with idx <= this value (incremental ingestion).
"""
rows = []
session_id = path.stem
try:
with open(path) as f:
for idx, line in enumerate(f):
if idx <= skip_before:
continue
line = line.strip()
if not line:
continue
try:
entry = json.loads(line)
except json.JSONDecodeError:
continue
if entry.get("type") == "session" and entry.get("id"):
session_id = entry["id"]
continue
row = _process_entry(entry, session_id, agent_id, idx)
if row:
rows.append(row)
except Exception as exc:
log.warning("Failed to process %s: %s", path, exc)
return rows
def run_spool() -> tuple[int, int]:
"""Walk agent session dirs, process transcript messages into SQLite."""
agents_root = settings.openclaw_agents_root
if not agents_root.exists():
log.warning("Agents root not found: %s", agents_root)
return 0, 0
all_rows = []
sessions_updated = 0
for agent_dir in agents_root.iterdir():
if not agent_dir.is_dir():
continue
agent_id = agent_dir.name
# Skip if agent not in allowlist (allowlist empty = watch all)
if settings.agents_allowlist and agent_id not in settings.agents_allowlist:
continue
sessions_dir = agent_dir / "sessions"
if not sessions_dir.exists():
continue
for transcript_file in sessions_dir.glob("*.jsonl"):
# Skip excluded glob patterns and trajectory files
import fnmatch
fname = transcript_file.name
if any(fnmatch.fnmatch(fname, pat) for pat in settings.session_glob_exclude):
continue
if ".trajectory.jsonl" in fname:
continue
try:
stat = transcript_file.stat()
current_state = {"mtime": int(stat.st_mtime), "size": stat.st_size}
except FileNotFoundError:
continue
previous_state = get_file_state(str(transcript_file))
if previous_state and previous_state.get("mtime") == current_state["mtime"] \
and previous_state.get("size") == current_state["size"]:
continue
skip_before = previous_state.get("last_entry_idx", -1) if previous_state else -1
rows = _process_transcript_file(transcript_file, agent_id, skip_before=skip_before)
max_idx = skip_before
if rows:
all_rows.extend(rows)
sessions_updated += 1
max_idx = max(r["entry_idx"] for r in rows)
set_file_state(str(transcript_file),
mtime=current_state["mtime"],
size=current_state["size"],
last_entry_idx=max(max_idx, skip_before))
if not all_rows:
return 0, 0
batch_size = settings.spooler_batch_size
inserted = 0
for i in range(0, len(all_rows), batch_size):
batch = all_rows[i : i + batch_size]
inserted += spool_entries(batch)
return inserted, sessions_updated