import json import logging from pathlib import Path from config import settings from spooler.redaction import redact from spooler.noise_filter import is_noise from spooler.store import get_file_state, set_file_state, spool_entries log = logging.getLogger("spooler") MESSAGE_TYPES_TO_SPOOL = {"message"} ROLES_TO_SPOOL = {"user", "assistant", "toolResult", "system", "developer"} BLOCK_TYPES_TO_KEEP = {"text", "input_text", "resource", "thinking"} def _extract_text_from_content(content) -> str: text_parts: list[str] = [] if isinstance(content, str): text_parts.append(content) elif isinstance(content, list): for block in content: if not isinstance(block, dict): text_parts.append(str(block)) continue block_type = block.get("type") if block_type in ("text", "input_text"): text_parts.append(block.get("text", "")) elif block_type == "thinking": text_parts.append(block.get("thinking", "")) elif block_type == "resource": text_parts.append(str(block.get("resource", ""))) elif content is not None: text_parts.append(str(content)) return "\n".join(part for part in text_parts if part) def _extract_message_payload(entry: dict) -> tuple[str, str, str, bool]: message = entry.get("message") or {} role = message.get("role", "") tool_name = message.get("toolName", "") or entry.get("toolName", "") is_error = bool(message.get("isError") or entry.get("isError")) raw_text = _extract_text_from_content(message.get("content", "")) return role, tool_name, raw_text, is_error def _process_entry(entry: dict, session_id: str, agent_id: str, entry_idx: int) -> dict | None: """Transform a raw transcript entry into a spooled row.""" entry_type = entry.get("type", "") if entry_type not in MESSAGE_TYPES_TO_SPOOL: return None role, tool_name, raw_text, is_error = _extract_message_payload(entry) if role not in ROLES_TO_SPOOL: return None timestamp = entry.get("timestamp", "") original_length = len(raw_text) clean_text = redact(raw_text) if len(clean_text) > settings.max_toolresult_chars: clean_text = clean_text[: settings.max_toolresult_chars] + "\n... [truncated]" if role == "toolResult" and is_noise(clean_text): return None preview = clean_text[:300] return { "session_id": session_id, "agent_id": agent_id, "entry_idx": entry_idx, "entry_type": entry_type, "role": role, "timestamp": timestamp, "tool_name": tool_name, "clean_text": clean_text, "original_length": original_length, "preview": preview, "is_error": is_error, } def _process_transcript_file(path: Path, agent_id: str, *, skip_before: int = -1) -> list[dict]: """Parse a transcript JSONL and return spooled rows. Args: skip_before: skip entries with idx <= this value (incremental ingestion). """ rows = [] session_id = path.stem try: with open(path) as f: for idx, line in enumerate(f): if idx <= skip_before: continue line = line.strip() if not line: continue try: entry = json.loads(line) except json.JSONDecodeError: continue if entry.get("type") == "session" and entry.get("id"): session_id = entry["id"] continue row = _process_entry(entry, session_id, agent_id, idx) if row: rows.append(row) except Exception as exc: log.warning("Failed to process %s: %s", path, exc) return rows def run_spool() -> tuple[int, int]: """Walk agent session dirs, process transcript messages into SQLite.""" agents_root = settings.openclaw_agents_root if not agents_root.exists(): log.warning("Agents root not found: %s", agents_root) return 0, 0 all_rows = [] sessions_updated = 0 for agent_dir in agents_root.iterdir(): if not agent_dir.is_dir(): continue agent_id = agent_dir.name # Skip if agent not in allowlist (allowlist empty = watch all) if settings.agents_allowlist and agent_id not in settings.agents_allowlist: continue sessions_dir = agent_dir / "sessions" if not sessions_dir.exists(): continue for transcript_file in sessions_dir.glob("*.jsonl"): # Skip excluded glob patterns and trajectory files import fnmatch fname = transcript_file.name if any(fnmatch.fnmatch(fname, pat) for pat in settings.session_glob_exclude): continue if ".trajectory.jsonl" in fname: continue try: stat = transcript_file.stat() current_state = {"mtime": int(stat.st_mtime), "size": stat.st_size} except FileNotFoundError: continue previous_state = get_file_state(str(transcript_file)) if previous_state and previous_state.get("mtime") == current_state["mtime"] \ and previous_state.get("size") == current_state["size"]: continue skip_before = previous_state.get("last_entry_idx", -1) if previous_state else -1 rows = _process_transcript_file(transcript_file, agent_id, skip_before=skip_before) max_idx = skip_before if rows: all_rows.extend(rows) sessions_updated += 1 max_idx = max(r["entry_idx"] for r in rows) set_file_state(str(transcript_file), mtime=current_state["mtime"], size=current_state["size"], last_entry_idx=max(max_idx, skip_before)) if not all_rows: return 0, 0 batch_size = settings.spooler_batch_size inserted = 0 for i in range(0, len(all_rows), batch_size): batch = all_rows[i : i + batch_size] inserted += spool_entries(batch) return inserted, sessions_updated