| import sqlite3 |
| from config import settings |
|
|
|
|
| def _compact_title(text: str, *, fallback: str = "") -> str: |
| title = " ".join((text or "").strip().split()) |
| if not title: |
| return fallback |
| prefixes = ( |
| "Conversation info", |
| "Current user request:", |
| "OpenClaw assembled context", |
| "Treat the conversation context", |
| ) |
| for prefix in prefixes: |
| if title.startswith(prefix): |
| return fallback |
| return title[:96] |
|
|
|
|
| def _derive_display_title(session_id: str, activity: list[dict]) -> str: |
| for row in activity: |
| role = row.get("role") or "" |
| if role not in {"user", "assistant"}: |
| continue |
| title = _compact_title(row.get("clean_text") or row.get("preview") or "") |
| if title: |
| return title |
| return session_id[:32] |
|
|
|
|
| def get_conn() -> sqlite3.Connection: |
| conn = sqlite3.connect(settings.db_path) |
| conn.row_factory = sqlite3.Row |
| return conn |
|
|
|
|
| def spool_entries(rows: list[dict]): |
| """Bulk-insert spooled entries. Skips duplicates via UNIQUE constraint.""" |
| if not rows: |
| return 0 |
| conn = get_conn() |
| inserted = 0 |
| for row in rows: |
| try: |
| cur = conn.execute( |
| """ |
| INSERT OR IGNORE INTO spooled_entries |
| (session_id, agent_id, entry_idx, entry_type, role, timestamp, |
| tool_name, clean_text, original_length, preview, is_error) |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) |
| """, |
| ( |
| row["session_id"], |
| row["agent_id"], |
| row.get("entry_idx", 0), |
| row.get("entry_type"), |
| row.get("role"), |
| row.get("timestamp"), |
| row.get("tool_name"), |
| row["clean_text"], |
| row.get("original_length"), |
| row["preview"], |
| int(bool(row.get("is_error", False))), |
| ), |
| ) |
| inserted += cur.rowcount > 0 |
| except Exception: |
| pass |
| conn.commit() |
| conn.close() |
| return inserted |
|
|
|
|
| def get_file_state(file_path: str) -> dict | None: |
| conn = get_conn() |
| row = conn.execute( |
| "SELECT value FROM spool_state WHERE key = ?", |
| (f"file::{file_path}",), |
| ).fetchone() |
| conn.close() |
| if not row: |
| return None |
| value = row["value"] |
| try: |
| parts = value.split(":") |
| state = {"mtime": int(parts[0]), "size": int(parts[1])} |
| if len(parts) > 2: |
| state["last_entry_idx"] = int(parts[2]) |
| return state |
| except Exception: |
| return None |
|
|
|
|
| def set_file_state(file_path: str, *, mtime: int, size: int, last_entry_idx: int = -1): |
| conn = get_conn() |
| conn.execute( |
| "INSERT OR REPLACE INTO spool_state (key, value) VALUES (?, ?)", |
| (f"file::{file_path}", f"{mtime}:{size}:{last_entry_idx}"), |
| ) |
| conn.commit() |
| conn.close() |
|
|
|
|
| def get_recent_sessions(limit: int = 25) -> list[dict]: |
| conn = get_conn() |
| rows = conn.execute( |
| """ |
| SELECT session_id, agent_id, |
| MAX(COALESCE(timestamp, indexed_at)) AS last_event_at, |
| COUNT(*) AS event_count, |
| SUM(CASE WHEN role = 'toolResult' THEN 1 ELSE 0 END) AS tool_result_count, |
| SUM(CASE WHEN is_error = 1 THEN 1 ELSE 0 END) AS error_count, |
| SUM(CASE WHEN role = 'toolResult' AND original_length > 5000 THEN 1 ELSE 0 END) AS noisy_tool_results, |
| MAX(entry_idx) AS last_entry_idx |
| FROM spooled_entries |
| GROUP BY session_id, agent_id |
| ORDER BY MAX(COALESCE(timestamp, indexed_at)) DESC |
| LIMIT ? |
| """, |
| (limit,), |
| ).fetchall() |
| conn.close() |
|
|
| |
| result = [] |
| for r in rows: |
| rd = dict(r) |
| |
| tool_count = rd.get("tool_result_count", 0) or 0 |
| error_count = rd.get("error_count", 0) or 0 |
| noisy = rd.get("noisy_tool_results", 0) or 0 |
| event_count = rd.get("event_count", 0) or 0 |
| hints = [] |
| if error_count > 0: |
| hints.append(f"{error_count} error(s)") |
| if noisy > 2: |
| hints.append(f"{noisy} noisy tool outputs") |
| if tool_count > 0 and event_count > 0 and tool_count / event_count > 0.7: |
| hints.append("tool-heavy") |
| rd["hints"] = hints |
| rd["health"] = "error" if error_count > 0 else "warning" if noisy > 2 else "ok" |
| rd["display_title"] = _derive_display_title(rd["session_id"], get_session_activity(rd["session_id"], limit=50)) |
| result.append(rd) |
| return result |
|
|
|
|
| def get_session_summary(session_id: str) -> dict | None: |
| """Return the same summary shape as get_recent_sessions for one session.""" |
| conn = get_conn() |
| row = conn.execute( |
| """ |
| SELECT session_id, agent_id, |
| MAX(COALESCE(timestamp, indexed_at)) AS last_event_at, |
| COUNT(*) AS event_count, |
| SUM(CASE WHEN role = 'toolResult' THEN 1 ELSE 0 END) AS tool_result_count, |
| SUM(CASE WHEN is_error = 1 THEN 1 ELSE 0 END) AS error_count, |
| SUM(CASE WHEN role = 'toolResult' AND original_length > 5000 THEN 1 ELSE 0 END) AS noisy_tool_results, |
| MAX(entry_idx) AS last_entry_idx |
| FROM spooled_entries |
| WHERE session_id = ? |
| GROUP BY session_id, agent_id |
| LIMIT 1 |
| """, |
| (session_id,), |
| ).fetchone() |
| conn.close() |
| if not row: |
| return None |
|
|
| rd = dict(row) |
| tool_count = rd.get("tool_result_count", 0) or 0 |
| error_count = rd.get("error_count", 0) or 0 |
| noisy = rd.get("noisy_tool_results", 0) or 0 |
| event_count = rd.get("event_count", 0) or 0 |
| hints = [] |
| if error_count > 0: |
| hints.append(f"{error_count} error(s)") |
| if noisy > 2: |
| hints.append(f"{noisy} noisy tool outputs") |
| if tool_count > 0 and event_count > 0 and tool_count / event_count > 0.7: |
| hints.append("tool-heavy") |
| rd["hints"] = hints |
| rd["health"] = "error" if error_count > 0 else "warning" if noisy > 2 else "ok" |
| rd["display_title"] = _derive_display_title(rd["session_id"], get_session_activity(rd["session_id"], limit=50)) |
| return rd |
|
|
|
|
|
|
| def get_session_activity(session_id: str, limit: int = 200) -> list[dict]: |
| conn = get_conn() |
| rows = conn.execute( |
| """ |
| SELECT session_id, agent_id, entry_idx, role, entry_type, timestamp, |
| tool_name, clean_text, preview, original_length, is_error, indexed_at |
| FROM spooled_entries |
| WHERE session_id = ? |
| ORDER BY entry_idx DESC |
| LIMIT ? |
| """, |
| (session_id, limit), |
| ).fetchall() |
| conn.close() |
| return [dict(r) for r in reversed(rows)] |
|
|