import sqlite3 from config import settings def _compact_title(text: str, *, fallback: str = "") -> str: title = " ".join((text or "").strip().split()) if not title: return fallback prefixes = ( "Conversation info", "Current user request:", "OpenClaw assembled context", "Treat the conversation context", ) for prefix in prefixes: if title.startswith(prefix): return fallback return title[:96] def _derive_display_title(session_id: str, activity: list[dict]) -> str: for row in activity: role = row.get("role") or "" if role not in {"user", "assistant"}: continue title = _compact_title(row.get("clean_text") or row.get("preview") or "") if title: return title return session_id[:32] def get_conn() -> sqlite3.Connection: conn = sqlite3.connect(settings.db_path) conn.row_factory = sqlite3.Row return conn def spool_entries(rows: list[dict]): """Bulk-insert spooled entries. Skips duplicates via UNIQUE constraint.""" if not rows: return 0 conn = get_conn() inserted = 0 for row in rows: try: cur = conn.execute( """ INSERT OR IGNORE INTO spooled_entries (session_id, agent_id, entry_idx, entry_type, role, timestamp, tool_name, clean_text, original_length, preview, is_error) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( row["session_id"], row["agent_id"], row.get("entry_idx", 0), row.get("entry_type"), row.get("role"), row.get("timestamp"), row.get("tool_name"), row["clean_text"], row.get("original_length"), row["preview"], int(bool(row.get("is_error", False))), ), ) inserted += cur.rowcount > 0 except Exception: pass conn.commit() conn.close() return inserted def get_file_state(file_path: str) -> dict | None: conn = get_conn() row = conn.execute( "SELECT value FROM spool_state WHERE key = ?", (f"file::{file_path}",), ).fetchone() conn.close() if not row: return None value = row["value"] try: parts = value.split(":") state = {"mtime": int(parts[0]), "size": int(parts[1])} if len(parts) > 2: state["last_entry_idx"] = int(parts[2]) return state except Exception: return None def set_file_state(file_path: str, *, mtime: int, size: int, last_entry_idx: int = -1): conn = get_conn() conn.execute( "INSERT OR REPLACE INTO spool_state (key, value) VALUES (?, ?)", (f"file::{file_path}", f"{mtime}:{size}:{last_entry_idx}"), ) conn.commit() conn.close() def get_recent_sessions(limit: int = 25) -> list[dict]: conn = get_conn() rows = conn.execute( """ SELECT session_id, agent_id, MAX(COALESCE(timestamp, indexed_at)) AS last_event_at, COUNT(*) AS event_count, SUM(CASE WHEN role = 'toolResult' THEN 1 ELSE 0 END) AS tool_result_count, SUM(CASE WHEN is_error = 1 THEN 1 ELSE 0 END) AS error_count, SUM(CASE WHEN role = 'toolResult' AND original_length > 5000 THEN 1 ELSE 0 END) AS noisy_tool_results, MAX(entry_idx) AS last_entry_idx FROM spooled_entries GROUP BY session_id, agent_id ORDER BY MAX(COALESCE(timestamp, indexed_at)) DESC LIMIT ? """, (limit,), ).fetchall() conn.close() # Attach last summary text to each session result = [] for r in rows: rd = dict(r) # Add health hints tool_count = rd.get("tool_result_count", 0) or 0 error_count = rd.get("error_count", 0) or 0 noisy = rd.get("noisy_tool_results", 0) or 0 event_count = rd.get("event_count", 0) or 0 hints = [] if error_count > 0: hints.append(f"{error_count} error(s)") if noisy > 2: hints.append(f"{noisy} noisy tool outputs") if tool_count > 0 and event_count > 0 and tool_count / event_count > 0.7: hints.append("tool-heavy") rd["hints"] = hints rd["health"] = "error" if error_count > 0 else "warning" if noisy > 2 else "ok" rd["display_title"] = _derive_display_title(rd["session_id"], get_session_activity(rd["session_id"], limit=50)) result.append(rd) return result def get_session_summary(session_id: str) -> dict | None: """Return the same summary shape as get_recent_sessions for one session.""" conn = get_conn() row = conn.execute( """ SELECT session_id, agent_id, MAX(COALESCE(timestamp, indexed_at)) AS last_event_at, COUNT(*) AS event_count, SUM(CASE WHEN role = 'toolResult' THEN 1 ELSE 0 END) AS tool_result_count, SUM(CASE WHEN is_error = 1 THEN 1 ELSE 0 END) AS error_count, SUM(CASE WHEN role = 'toolResult' AND original_length > 5000 THEN 1 ELSE 0 END) AS noisy_tool_results, MAX(entry_idx) AS last_entry_idx FROM spooled_entries WHERE session_id = ? GROUP BY session_id, agent_id LIMIT 1 """, (session_id,), ).fetchone() conn.close() if not row: return None rd = dict(row) tool_count = rd.get("tool_result_count", 0) or 0 error_count = rd.get("error_count", 0) or 0 noisy = rd.get("noisy_tool_results", 0) or 0 event_count = rd.get("event_count", 0) or 0 hints = [] if error_count > 0: hints.append(f"{error_count} error(s)") if noisy > 2: hints.append(f"{noisy} noisy tool outputs") if tool_count > 0 and event_count > 0 and tool_count / event_count > 0.7: hints.append("tool-heavy") rd["hints"] = hints rd["health"] = "error" if error_count > 0 else "warning" if noisy > 2 else "ok" rd["display_title"] = _derive_display_title(rd["session_id"], get_session_activity(rd["session_id"], limit=50)) return rd def get_session_activity(session_id: str, limit: int = 200) -> list[dict]: conn = get_conn() rows = conn.execute( """ SELECT session_id, agent_id, entry_idx, role, entry_type, timestamp, tool_name, clean_text, preview, original_length, is_error, indexed_at FROM spooled_entries WHERE session_id = ? ORDER BY entry_idx DESC LIMIT ? """, (session_id, limit), ).fetchall() conn.close() return [dict(r) for r in reversed(rows)]