| import json |
| import logging |
| from pathlib import Path |
|
|
| from config import settings |
| from spooler.redaction import redact |
| from spooler.noise_filter import is_noise |
| from spooler.store import get_file_state, set_file_state, spool_entries |
|
|
| log = logging.getLogger("spooler") |
|
|
|
|
| MESSAGE_TYPES_TO_SPOOL = {"message"} |
| ROLES_TO_SPOOL = {"user", "assistant", "toolResult", "system", "developer"} |
| BLOCK_TYPES_TO_KEEP = {"text", "input_text", "resource", "thinking"} |
|
|
|
|
| def _extract_text_from_content(content) -> str: |
| text_parts: list[str] = [] |
| if isinstance(content, str): |
| text_parts.append(content) |
| elif isinstance(content, list): |
| for block in content: |
| if not isinstance(block, dict): |
| text_parts.append(str(block)) |
| continue |
| block_type = block.get("type") |
| if block_type in ("text", "input_text"): |
| text_parts.append(block.get("text", "")) |
| elif block_type == "thinking": |
| text_parts.append(block.get("thinking", "")) |
| elif block_type == "resource": |
| text_parts.append(str(block.get("resource", ""))) |
| elif content is not None: |
| text_parts.append(str(content)) |
| return "\n".join(part for part in text_parts if part) |
|
|
|
|
| def _extract_message_payload(entry: dict) -> tuple[str, str, str, bool]: |
| message = entry.get("message") or {} |
| role = message.get("role", "") |
| tool_name = message.get("toolName", "") or entry.get("toolName", "") |
| is_error = bool(message.get("isError") or entry.get("isError")) |
| raw_text = _extract_text_from_content(message.get("content", "")) |
| return role, tool_name, raw_text, is_error |
|
|
|
|
| def _process_entry(entry: dict, session_id: str, agent_id: str, entry_idx: int) -> dict | None: |
| """Transform a raw transcript entry into a spooled row.""" |
| entry_type = entry.get("type", "") |
| if entry_type not in MESSAGE_TYPES_TO_SPOOL: |
| return None |
|
|
| role, tool_name, raw_text, is_error = _extract_message_payload(entry) |
| if role not in ROLES_TO_SPOOL: |
| return None |
|
|
| timestamp = entry.get("timestamp", "") |
| original_length = len(raw_text) |
| clean_text = redact(raw_text) |
|
|
| if len(clean_text) > settings.max_toolresult_chars: |
| clean_text = clean_text[: settings.max_toolresult_chars] + "\n... [truncated]" |
|
|
| if role == "toolResult" and is_noise(clean_text): |
| return None |
|
|
| preview = clean_text[:300] |
|
|
| return { |
| "session_id": session_id, |
| "agent_id": agent_id, |
| "entry_idx": entry_idx, |
| "entry_type": entry_type, |
| "role": role, |
| "timestamp": timestamp, |
| "tool_name": tool_name, |
| "clean_text": clean_text, |
| "original_length": original_length, |
| "preview": preview, |
| "is_error": is_error, |
| } |
|
|
|
|
| def _process_transcript_file(path: Path, agent_id: str, *, skip_before: int = -1) -> list[dict]: |
| """Parse a transcript JSONL and return spooled rows. |
| |
| Args: |
| skip_before: skip entries with idx <= this value (incremental ingestion). |
| """ |
| rows = [] |
| session_id = path.stem |
| try: |
| with open(path) as f: |
| for idx, line in enumerate(f): |
| if idx <= skip_before: |
| continue |
| line = line.strip() |
| if not line: |
| continue |
| try: |
| entry = json.loads(line) |
| except json.JSONDecodeError: |
| continue |
| if entry.get("type") == "session" and entry.get("id"): |
| session_id = entry["id"] |
| continue |
| row = _process_entry(entry, session_id, agent_id, idx) |
| if row: |
| rows.append(row) |
| except Exception as exc: |
| log.warning("Failed to process %s: %s", path, exc) |
| return rows |
|
|
|
|
| def run_spool() -> tuple[int, int]: |
| """Walk agent session dirs, process transcript messages into SQLite.""" |
| agents_root = settings.openclaw_agents_root |
| if not agents_root.exists(): |
| log.warning("Agents root not found: %s", agents_root) |
| return 0, 0 |
|
|
| all_rows = [] |
| sessions_updated = 0 |
|
|
| for agent_dir in agents_root.iterdir(): |
| if not agent_dir.is_dir(): |
| continue |
| agent_id = agent_dir.name |
| |
| if settings.agents_allowlist and agent_id not in settings.agents_allowlist: |
| continue |
| sessions_dir = agent_dir / "sessions" |
| if not sessions_dir.exists(): |
| continue |
|
|
| for transcript_file in sessions_dir.glob("*.jsonl"): |
| |
| import fnmatch |
| fname = transcript_file.name |
| if any(fnmatch.fnmatch(fname, pat) for pat in settings.session_glob_exclude): |
| continue |
| if ".trajectory.jsonl" in fname: |
| continue |
| try: |
| stat = transcript_file.stat() |
| current_state = {"mtime": int(stat.st_mtime), "size": stat.st_size} |
| except FileNotFoundError: |
| continue |
|
|
| previous_state = get_file_state(str(transcript_file)) |
| if previous_state and previous_state.get("mtime") == current_state["mtime"] \ |
| and previous_state.get("size") == current_state["size"]: |
| continue |
|
|
| skip_before = previous_state.get("last_entry_idx", -1) if previous_state else -1 |
| rows = _process_transcript_file(transcript_file, agent_id, skip_before=skip_before) |
| max_idx = skip_before |
| if rows: |
| all_rows.extend(rows) |
| sessions_updated += 1 |
| max_idx = max(r["entry_idx"] for r in rows) |
| set_file_state(str(transcript_file), |
| mtime=current_state["mtime"], |
| size=current_state["size"], |
| last_entry_idx=max(max_idx, skip_before)) |
|
|
| if not all_rows: |
| return 0, 0 |
|
|
| batch_size = settings.spooler_batch_size |
| inserted = 0 |
| for i in range(0, len(all_rows), batch_size): |
| batch = all_rows[i : i + batch_size] |
| inserted += spool_entries(batch) |
| return inserted, sessions_updated |
|
|
|
|