File size: 6,908 Bytes
63c75d5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 | import sqlite3
from config import settings
def _compact_title(text: str, *, fallback: str = "") -> str:
title = " ".join((text or "").strip().split())
if not title:
return fallback
prefixes = (
"Conversation info",
"Current user request:",
"OpenClaw assembled context",
"Treat the conversation context",
)
for prefix in prefixes:
if title.startswith(prefix):
return fallback
return title[:96]
def _derive_display_title(session_id: str, activity: list[dict]) -> str:
for row in activity:
role = row.get("role") or ""
if role not in {"user", "assistant"}:
continue
title = _compact_title(row.get("clean_text") or row.get("preview") or "")
if title:
return title
return session_id[:32]
def get_conn() -> sqlite3.Connection:
conn = sqlite3.connect(settings.db_path)
conn.row_factory = sqlite3.Row
return conn
def spool_entries(rows: list[dict]):
"""Bulk-insert spooled entries. Skips duplicates via UNIQUE constraint."""
if not rows:
return 0
conn = get_conn()
inserted = 0
for row in rows:
try:
cur = conn.execute(
"""
INSERT OR IGNORE INTO spooled_entries
(session_id, agent_id, entry_idx, entry_type, role, timestamp,
tool_name, clean_text, original_length, preview, is_error)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
row["session_id"],
row["agent_id"],
row.get("entry_idx", 0),
row.get("entry_type"),
row.get("role"),
row.get("timestamp"),
row.get("tool_name"),
row["clean_text"],
row.get("original_length"),
row["preview"],
int(bool(row.get("is_error", False))),
),
)
inserted += cur.rowcount > 0
except Exception:
pass
conn.commit()
conn.close()
return inserted
def get_file_state(file_path: str) -> dict | None:
conn = get_conn()
row = conn.execute(
"SELECT value FROM spool_state WHERE key = ?",
(f"file::{file_path}",),
).fetchone()
conn.close()
if not row:
return None
value = row["value"]
try:
parts = value.split(":")
state = {"mtime": int(parts[0]), "size": int(parts[1])}
if len(parts) > 2:
state["last_entry_idx"] = int(parts[2])
return state
except Exception:
return None
def set_file_state(file_path: str, *, mtime: int, size: int, last_entry_idx: int = -1):
conn = get_conn()
conn.execute(
"INSERT OR REPLACE INTO spool_state (key, value) VALUES (?, ?)",
(f"file::{file_path}", f"{mtime}:{size}:{last_entry_idx}"),
)
conn.commit()
conn.close()
def get_recent_sessions(limit: int = 25) -> list[dict]:
conn = get_conn()
rows = conn.execute(
"""
SELECT session_id, agent_id,
MAX(COALESCE(timestamp, indexed_at)) AS last_event_at,
COUNT(*) AS event_count,
SUM(CASE WHEN role = 'toolResult' THEN 1 ELSE 0 END) AS tool_result_count,
SUM(CASE WHEN is_error = 1 THEN 1 ELSE 0 END) AS error_count,
SUM(CASE WHEN role = 'toolResult' AND original_length > 5000 THEN 1 ELSE 0 END) AS noisy_tool_results,
MAX(entry_idx) AS last_entry_idx
FROM spooled_entries
GROUP BY session_id, agent_id
ORDER BY MAX(COALESCE(timestamp, indexed_at)) DESC
LIMIT ?
""",
(limit,),
).fetchall()
conn.close()
# Attach last summary text to each session
result = []
for r in rows:
rd = dict(r)
# Add health hints
tool_count = rd.get("tool_result_count", 0) or 0
error_count = rd.get("error_count", 0) or 0
noisy = rd.get("noisy_tool_results", 0) or 0
event_count = rd.get("event_count", 0) or 0
hints = []
if error_count > 0:
hints.append(f"{error_count} error(s)")
if noisy > 2:
hints.append(f"{noisy} noisy tool outputs")
if tool_count > 0 and event_count > 0 and tool_count / event_count > 0.7:
hints.append("tool-heavy")
rd["hints"] = hints
rd["health"] = "error" if error_count > 0 else "warning" if noisy > 2 else "ok"
rd["display_title"] = _derive_display_title(rd["session_id"], get_session_activity(rd["session_id"], limit=50))
result.append(rd)
return result
def get_session_summary(session_id: str) -> dict | None:
"""Return the same summary shape as get_recent_sessions for one session."""
conn = get_conn()
row = conn.execute(
"""
SELECT session_id, agent_id,
MAX(COALESCE(timestamp, indexed_at)) AS last_event_at,
COUNT(*) AS event_count,
SUM(CASE WHEN role = 'toolResult' THEN 1 ELSE 0 END) AS tool_result_count,
SUM(CASE WHEN is_error = 1 THEN 1 ELSE 0 END) AS error_count,
SUM(CASE WHEN role = 'toolResult' AND original_length > 5000 THEN 1 ELSE 0 END) AS noisy_tool_results,
MAX(entry_idx) AS last_entry_idx
FROM spooled_entries
WHERE session_id = ?
GROUP BY session_id, agent_id
LIMIT 1
""",
(session_id,),
).fetchone()
conn.close()
if not row:
return None
rd = dict(row)
tool_count = rd.get("tool_result_count", 0) or 0
error_count = rd.get("error_count", 0) or 0
noisy = rd.get("noisy_tool_results", 0) or 0
event_count = rd.get("event_count", 0) or 0
hints = []
if error_count > 0:
hints.append(f"{error_count} error(s)")
if noisy > 2:
hints.append(f"{noisy} noisy tool outputs")
if tool_count > 0 and event_count > 0 and tool_count / event_count > 0.7:
hints.append("tool-heavy")
rd["hints"] = hints
rd["health"] = "error" if error_count > 0 else "warning" if noisy > 2 else "ok"
rd["display_title"] = _derive_display_title(rd["session_id"], get_session_activity(rd["session_id"], limit=50))
return rd
def get_session_activity(session_id: str, limit: int = 200) -> list[dict]:
conn = get_conn()
rows = conn.execute(
"""
SELECT session_id, agent_id, entry_idx, role, entry_type, timestamp,
tool_name, clean_text, preview, original_length, is_error, indexed_at
FROM spooled_entries
WHERE session_id = ?
ORDER BY entry_idx DESC
LIMIT ?
""",
(session_id, limit),
).fetchall()
conn.close()
return [dict(r) for r in reversed(rows)]
|