File size: 6,908 Bytes
63c75d5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
import sqlite3
from config import settings


def _compact_title(text: str, *, fallback: str = "") -> str:
    title = " ".join((text or "").strip().split())
    if not title:
        return fallback
    prefixes = (
        "Conversation info",
        "Current user request:",
        "OpenClaw assembled context",
        "Treat the conversation context",
    )
    for prefix in prefixes:
        if title.startswith(prefix):
            return fallback
    return title[:96]


def _derive_display_title(session_id: str, activity: list[dict]) -> str:
    for row in activity:
        role = row.get("role") or ""
        if role not in {"user", "assistant"}:
            continue
        title = _compact_title(row.get("clean_text") or row.get("preview") or "")
        if title:
            return title
    return session_id[:32]


def get_conn() -> sqlite3.Connection:
    conn = sqlite3.connect(settings.db_path)
    conn.row_factory = sqlite3.Row
    return conn


def spool_entries(rows: list[dict]):
    """Bulk-insert spooled entries. Skips duplicates via UNIQUE constraint."""
    if not rows:
        return 0
    conn = get_conn()
    inserted = 0
    for row in rows:
        try:
            cur = conn.execute(
                """
                INSERT OR IGNORE INTO spooled_entries
                (session_id, agent_id, entry_idx, entry_type, role, timestamp,
                 tool_name, clean_text, original_length, preview, is_error)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                """,
                (
                    row["session_id"],
                    row["agent_id"],
                    row.get("entry_idx", 0),
                    row.get("entry_type"),
                    row.get("role"),
                    row.get("timestamp"),
                    row.get("tool_name"),
                    row["clean_text"],
                    row.get("original_length"),
                    row["preview"],
                    int(bool(row.get("is_error", False))),
                ),
            )
            inserted += cur.rowcount > 0
        except Exception:
            pass
    conn.commit()
    conn.close()
    return inserted


def get_file_state(file_path: str) -> dict | None:
    conn = get_conn()
    row = conn.execute(
        "SELECT value FROM spool_state WHERE key = ?",
        (f"file::{file_path}",),
    ).fetchone()
    conn.close()
    if not row:
        return None
    value = row["value"]
    try:
        parts = value.split(":")
        state = {"mtime": int(parts[0]), "size": int(parts[1])}
        if len(parts) > 2:
            state["last_entry_idx"] = int(parts[2])
        return state
    except Exception:
        return None


def set_file_state(file_path: str, *, mtime: int, size: int, last_entry_idx: int = -1):
    conn = get_conn()
    conn.execute(
        "INSERT OR REPLACE INTO spool_state (key, value) VALUES (?, ?)",
        (f"file::{file_path}", f"{mtime}:{size}:{last_entry_idx}"),
    )
    conn.commit()
    conn.close()


def get_recent_sessions(limit: int = 25) -> list[dict]:
    conn = get_conn()
    rows = conn.execute(
        """
        SELECT session_id, agent_id,
               MAX(COALESCE(timestamp, indexed_at)) AS last_event_at,
               COUNT(*) AS event_count,
               SUM(CASE WHEN role = 'toolResult' THEN 1 ELSE 0 END) AS tool_result_count,
               SUM(CASE WHEN is_error = 1 THEN 1 ELSE 0 END) AS error_count,
               SUM(CASE WHEN role = 'toolResult' AND original_length > 5000 THEN 1 ELSE 0 END) AS noisy_tool_results,
               MAX(entry_idx) AS last_entry_idx
        FROM spooled_entries
        GROUP BY session_id, agent_id
        ORDER BY MAX(COALESCE(timestamp, indexed_at)) DESC
        LIMIT ?
        """,
        (limit,),
    ).fetchall()
    conn.close()

    # Attach last summary text to each session
    result = []
    for r in rows:
        rd = dict(r)
        # Add health hints
        tool_count = rd.get("tool_result_count", 0) or 0
        error_count = rd.get("error_count", 0) or 0
        noisy = rd.get("noisy_tool_results", 0) or 0
        event_count = rd.get("event_count", 0) or 0
        hints = []
        if error_count > 0:
            hints.append(f"{error_count} error(s)")
        if noisy > 2:
            hints.append(f"{noisy} noisy tool outputs")
        if tool_count > 0 and event_count > 0 and tool_count / event_count > 0.7:
            hints.append("tool-heavy")
        rd["hints"] = hints
        rd["health"] = "error" if error_count > 0 else "warning" if noisy > 2 else "ok"
        rd["display_title"] = _derive_display_title(rd["session_id"], get_session_activity(rd["session_id"], limit=50))
        result.append(rd)
    return result


def get_session_summary(session_id: str) -> dict | None:
    """Return the same summary shape as get_recent_sessions for one session."""
    conn = get_conn()
    row = conn.execute(
        """
        SELECT session_id, agent_id,
               MAX(COALESCE(timestamp, indexed_at)) AS last_event_at,
               COUNT(*) AS event_count,
               SUM(CASE WHEN role = 'toolResult' THEN 1 ELSE 0 END) AS tool_result_count,
               SUM(CASE WHEN is_error = 1 THEN 1 ELSE 0 END) AS error_count,
               SUM(CASE WHEN role = 'toolResult' AND original_length > 5000 THEN 1 ELSE 0 END) AS noisy_tool_results,
               MAX(entry_idx) AS last_entry_idx
        FROM spooled_entries
        WHERE session_id = ?
        GROUP BY session_id, agent_id
        LIMIT 1
        """,
        (session_id,),
    ).fetchone()
    conn.close()
    if not row:
        return None

    rd = dict(row)
    tool_count = rd.get("tool_result_count", 0) or 0
    error_count = rd.get("error_count", 0) or 0
    noisy = rd.get("noisy_tool_results", 0) or 0
    event_count = rd.get("event_count", 0) or 0
    hints = []
    if error_count > 0:
        hints.append(f"{error_count} error(s)")
    if noisy > 2:
        hints.append(f"{noisy} noisy tool outputs")
    if tool_count > 0 and event_count > 0 and tool_count / event_count > 0.7:
        hints.append("tool-heavy")
    rd["hints"] = hints
    rd["health"] = "error" if error_count > 0 else "warning" if noisy > 2 else "ok"
    rd["display_title"] = _derive_display_title(rd["session_id"], get_session_activity(rd["session_id"], limit=50))
    return rd



def get_session_activity(session_id: str, limit: int = 200) -> list[dict]:
    conn = get_conn()
    rows = conn.execute(
        """
        SELECT session_id, agent_id, entry_idx, role, entry_type, timestamp,
               tool_name, clean_text, preview, original_length, is_error, indexed_at
        FROM spooled_entries
        WHERE session_id = ?
        ORDER BY entry_idx DESC
        LIMIT ?
        """,
        (session_id, limit),
    ).fetchall()
    conn.close()
    return [dict(r) for r in reversed(rows)]