File size: 10,946 Bytes
63c75d5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
from __future__ import annotations

from collections import Counter, defaultdict
from datetime import datetime, timezone
from difflib import SequenceMatcher
from pathlib import Path
from typing import Iterable

from config import settings
from spooler.store import get_conn, get_recent_sessions, get_session_activity


def _now() -> str:
    return datetime.now(timezone.utc).isoformat()


def _write_artifact(relative_path: str, payload: dict) -> None:
    base = settings.openclaw_state_dir / "session_amplifier"
    target = base / relative_path
    target.parent.mkdir(parents=True, exist_ok=True)
    import json

    target.write_text(json.dumps(payload, indent=2, sort_keys=True))


def _session_transcript_files() -> dict[str, Path]:
    root = settings.openclaw_agents_root
    if not root.exists():
        return {}
    out: dict[str, Path] = {}
    for agent_dir in root.iterdir():
        sessions = agent_dir / "sessions"
        if not sessions.exists():
            continue
        for path in sessions.glob("*.jsonl"):
            out.setdefault(path.stem, path)
    return out


def generate_session_sprawl_report(limit: int = 500, stale_days: int = 30) -> dict:
    """Return non-destructive session sprawl/archive candidates."""
    conn = get_conn()
    rows = conn.execute(
        """
        SELECT session_id, agent_id,
               MAX(COALESCE(timestamp, indexed_at)) AS last_event_at,
               COUNT(*) AS event_count,
               SUM(COALESCE(original_length, 0)) AS original_chars,
               SUM(CASE WHEN role = 'toolResult' THEN 1 ELSE 0 END) AS tool_result_count,
               SUM(CASE WHEN is_error = 1 THEN 1 ELSE 0 END) AS error_count,
               MAX(entry_idx) AS last_entry_idx
        FROM spooled_entries
        GROUP BY session_id, agent_id
        ORDER BY event_count DESC
        LIMIT ?
        """,
        (limit,),
    ).fetchall()
    conn.close()

    files = _session_transcript_files()
    candidates = []
    now_ts = datetime.now(timezone.utc).timestamp()
    stale_seconds = stale_days * 24 * 60 * 60
    for row in rows:
        rd = dict(row)
        path = files.get(rd["session_id"])
        size = path.stat().st_size if path and path.exists() else None
        mtime = path.stat().st_mtime if path and path.exists() else None
        reasons = []
        if (rd.get("event_count") or 0) > 2000:
            reasons.append("very_high_event_count")
        if (rd.get("original_chars") or 0) > 1_000_000:
            reasons.append("very_large_transcript_content")
        if size and size > 5_000_000:
            reasons.append("large_file")
        if mtime and now_ts - mtime > stale_seconds:
            reasons.append("stale_file")
        if reasons:
            candidates.append({
                **rd,
                "file_path": str(path) if path else None,
                "file_size_bytes": size,
                "file_mtime": datetime.fromtimestamp(mtime, timezone.utc).isoformat() if mtime else None,
                "candidate_reasons": reasons,
                "action": "review_then_archive_or_summarize",
            })

    report = {
        "generated_at": _now(),
        "policy": "non_destructive_candidates_only",
        "stale_days": stale_days,
        "sessions_scanned": len(rows),
        "candidate_count": len(candidates),
        "candidates": candidates[:200],
    }
    _write_artifact("reports/session-sprawl-latest.json", report)
    return report


def generate_context_pressure_report(limit: int = 200) -> dict:
    """Find transcripts likely to bloat context or retrieval."""
    conn = get_conn()
    rows = conn.execute(
        """
        SELECT session_id, agent_id,
               COUNT(*) AS event_count,
               SUM(COALESCE(original_length, 0)) AS original_chars,
               SUM(CASE WHEN role = 'toolResult' THEN COALESCE(original_length, 0) ELSE 0 END) AS tool_chars,
               SUM(CASE WHEN role = 'assistant' THEN LENGTH(COALESCE(clean_text, '')) ELSE 0 END) AS assistant_chars,
               SUM(CASE WHEN role = 'toolResult' AND COALESCE(original_length, 0) > 5000 THEN 1 ELSE 0 END) AS giant_tool_results,
               SUM(CASE WHEN role = 'toolResult' THEN 1 ELSE 0 END) AS tool_result_count
        FROM spooled_entries
        GROUP BY session_id, agent_id
        HAVING original_chars > 100000 OR giant_tool_results > 0 OR event_count > 1000
        ORDER BY original_chars DESC
        LIMIT ?
        """,
        (limit,),
    ).fetchall()
    conn.close()

    sessions = []
    for row in rows:
        rd = dict(row)
        flags = []
        if (rd.get("giant_tool_results") or 0) > 0:
            flags.append("giant_tool_results")
        if (rd.get("event_count") or 0) > 1000:
            flags.append("long_running_session")
        if (rd.get("tool_chars") or 0) > max(1, (rd.get("assistant_chars") or 0)) * 3:
            flags.append("tool_output_dominates")
        rd["flags"] = flags
        rd["recommendation"] = "summarize_before_reuse" if flags else "monitor"
        sessions.append(rd)

    report = {
        "generated_at": _now(),
        "sessions_scanned": len(rows),
        "pressure_sessions": sessions,
    }
    _write_artifact("reports/context-pressure-latest.json", report)
    return report


def generate_failure_mode_report(limit: int = 200) -> dict:
    """Mine repeated operational failure modes from spooled transcript rows."""
    conn = get_conn()
    rows = conn.execute(
        """
        SELECT session_id, agent_id, role, tool_name, clean_text, preview, is_error, timestamp, indexed_at
        FROM spooled_entries
        WHERE is_error = 1
           OR LOWER(COALESCE(clean_text, '')) LIKE '%permission%'
           OR LOWER(COALESCE(clean_text, '')) LIKE '%approve%'
           OR LOWER(COALESCE(clean_text, '')) LIKE '%timeout%'
           OR LOWER(COALESCE(clean_text, '')) LIKE '%failover%'
           OR LOWER(COALESCE(clean_text, '')) LIKE '%fallback%'
           OR LOWER(COALESCE(clean_text, '')) LIKE '%no session found%'
           OR LOWER(COALESCE(clean_text, '')) LIKE '%context limit%'
        ORDER BY COALESCE(timestamp, indexed_at) DESC
        LIMIT 5000
        """
    ).fetchall()
    conn.close()

    buckets: dict[str, list[dict]] = defaultdict(list)
    tool_errors: Counter[str] = Counter()
    for r in rows:
        rd = dict(r)
        text = (rd.get("clean_text") or rd.get("preview") or "").lower()
        key = None
        if rd.get("is_error"):
            tool = rd.get("tool_name") or "unknown_tool"
            key = f"tool_error:{tool}"
            tool_errors[tool] += 1
        else:
            key = _classify_failure_text(text)
        if key:
            buckets[key].append({
                "session_id": rd.get("session_id"),
                "agent_id": rd.get("agent_id"),
                "tool_name": rd.get("tool_name"),
                "timestamp": rd.get("timestamp") or rd.get("indexed_at"),
                "preview": (rd.get("preview") or rd.get("clean_text") or "")[:240],
            })

    patterns = []
    for key, hits in sorted(buckets.items(), key=lambda kv: len(kv[1]), reverse=True):
        patterns.append({
            "pattern": key,
            "count": len(hits),
            "sessions": sorted({h["session_id"] for h in hits if h.get("session_id")})[:20],
            "examples": hits[:10],
            "recommendation": _failure_recommendation(key),
        })

    report = {
        "generated_at": _now(),
        "patterns": patterns[:limit],
        "top_error_tools": [{"tool": tool, "count": count} for tool, count in tool_errors.most_common(25)],
    }
    _write_artifact("reports/failure-modes-latest.json", report)
    return report


def _failure_recommendation(pattern: str) -> str:
    if pattern.startswith("tool_error:"):
        return "inspect repeated tool failures and add guardrails or repair wrapper"
    if pattern == "model_failover_or_fallback":
        return "audit model routing/fallback logs and expose failover in user-visible status"
    if pattern == "approval_or_permission_stall":
        return "improve approval prompts and stale approval recovery"
    if pattern == "timeout":
        return "identify timeout source and add bounded wait/retry or progress heartbeat"
    if pattern == "stale_session_reference":
        return "repair session lifecycle references and stale session cleanup"
    if pattern == "context_limit":
        return "summarize/archive before continuing session"
    return "review clustered examples"


def _classify_failure_text(text: str) -> str | None:
    """Classify failure text while avoiding overly broad buckets.

    The first intelligence pass bucketed any mention of "permission" or
    "approve" as an approval stall. That was useful for discovery but too noisy
    for ongoing degradation detection. Keep the public pattern names stable, but
    require stronger textual evidence.
    """
    lowered = text.lower()
    if "failover" in lowered or "fallback" in lowered:
        return "model_failover_or_fallback"
    if any(phrase in lowered for phrase in (
        "approval pending",
        "approval required",
        "approve this",
        "permission denied",
        "requires permission",
        "insufficient permission",
        "not permitted",
    )):
        return "approval_or_permission_stall"
    if any(phrase in lowered for phrase in (
        "timed out",
        "timeout",
        "deadline exceeded",
        "context deadline",
    )):
        return "timeout"
    if "no session found" in lowered or "unknown session" in lowered:
        return "stale_session_reference"
    if "context limit" in lowered or "context length" in lowered:
        return "context_limit"
    return None


def generate_active_sessions_bulk(limit: int = 40, activity_limit: int = 200) -> dict:
    """Bulk endpoint for visual clients: recent sessions plus normalized activity."""
    sessions = get_recent_sessions(limit)
    activity = {
        row["session_id"]: get_session_activity(row["session_id"], activity_limit)
        for row in sessions
    }
    return {
        "generated_at": _now(),
        "sessions": sessions,
        "activity": activity,
    }


def generate_intelligence_bundle(kind: str = "light") -> dict:
    """Run the deterministic intelligence suite and write a compact bundle."""
    sprawl_limit = 1000 if kind == "deep" else 300
    context_limit = 500 if kind == "deep" else 150
    failure_limit = 500 if kind == "deep" else 150
    bundle = {
        "generated_at": _now(),
        "kind": kind,
        "session_sprawl": generate_session_sprawl_report(limit=sprawl_limit),
        "context_pressure": generate_context_pressure_report(limit=context_limit),
        "failure_modes": generate_failure_mode_report(limit=failure_limit),
    }
    _write_artifact(f"reports/intelligence-{kind}-latest.json", bundle)
    return bundle