| from __future__ import annotations |
|
|
| from collections import Counter, defaultdict |
| from datetime import datetime, timezone |
| from difflib import SequenceMatcher |
| from pathlib import Path |
| from typing import Iterable |
|
|
| from config import settings |
| from spooler.store import get_conn, get_recent_sessions, get_session_activity |
|
|
|
|
| def _now() -> str: |
| return datetime.now(timezone.utc).isoformat() |
|
|
|
|
| def _write_artifact(relative_path: str, payload: dict) -> None: |
| base = settings.openclaw_state_dir / "session_amplifier" |
| target = base / relative_path |
| target.parent.mkdir(parents=True, exist_ok=True) |
| import json |
|
|
| target.write_text(json.dumps(payload, indent=2, sort_keys=True)) |
|
|
|
|
| def _session_transcript_files() -> dict[str, Path]: |
| root = settings.openclaw_agents_root |
| if not root.exists(): |
| return {} |
| out: dict[str, Path] = {} |
| for agent_dir in root.iterdir(): |
| sessions = agent_dir / "sessions" |
| if not sessions.exists(): |
| continue |
| for path in sessions.glob("*.jsonl"): |
| out.setdefault(path.stem, path) |
| return out |
|
|
|
|
| def generate_session_sprawl_report(limit: int = 500, stale_days: int = 30) -> dict: |
| """Return non-destructive session sprawl/archive candidates.""" |
| conn = get_conn() |
| rows = conn.execute( |
| """ |
| SELECT session_id, agent_id, |
| MAX(COALESCE(timestamp, indexed_at)) AS last_event_at, |
| COUNT(*) AS event_count, |
| SUM(COALESCE(original_length, 0)) AS original_chars, |
| SUM(CASE WHEN role = 'toolResult' THEN 1 ELSE 0 END) AS tool_result_count, |
| SUM(CASE WHEN is_error = 1 THEN 1 ELSE 0 END) AS error_count, |
| MAX(entry_idx) AS last_entry_idx |
| FROM spooled_entries |
| GROUP BY session_id, agent_id |
| ORDER BY event_count DESC |
| LIMIT ? |
| """, |
| (limit,), |
| ).fetchall() |
| conn.close() |
|
|
| files = _session_transcript_files() |
| candidates = [] |
| now_ts = datetime.now(timezone.utc).timestamp() |
| stale_seconds = stale_days * 24 * 60 * 60 |
| for row in rows: |
| rd = dict(row) |
| path = files.get(rd["session_id"]) |
| size = path.stat().st_size if path and path.exists() else None |
| mtime = path.stat().st_mtime if path and path.exists() else None |
| reasons = [] |
| if (rd.get("event_count") or 0) > 2000: |
| reasons.append("very_high_event_count") |
| if (rd.get("original_chars") or 0) > 1_000_000: |
| reasons.append("very_large_transcript_content") |
| if size and size > 5_000_000: |
| reasons.append("large_file") |
| if mtime and now_ts - mtime > stale_seconds: |
| reasons.append("stale_file") |
| if reasons: |
| candidates.append({ |
| **rd, |
| "file_path": str(path) if path else None, |
| "file_size_bytes": size, |
| "file_mtime": datetime.fromtimestamp(mtime, timezone.utc).isoformat() if mtime else None, |
| "candidate_reasons": reasons, |
| "action": "review_then_archive_or_summarize", |
| }) |
|
|
| report = { |
| "generated_at": _now(), |
| "policy": "non_destructive_candidates_only", |
| "stale_days": stale_days, |
| "sessions_scanned": len(rows), |
| "candidate_count": len(candidates), |
| "candidates": candidates[:200], |
| } |
| _write_artifact("reports/session-sprawl-latest.json", report) |
| return report |
|
|
|
|
| def generate_context_pressure_report(limit: int = 200) -> dict: |
| """Find transcripts likely to bloat context or retrieval.""" |
| conn = get_conn() |
| rows = conn.execute( |
| """ |
| SELECT session_id, agent_id, |
| COUNT(*) AS event_count, |
| SUM(COALESCE(original_length, 0)) AS original_chars, |
| SUM(CASE WHEN role = 'toolResult' THEN COALESCE(original_length, 0) ELSE 0 END) AS tool_chars, |
| SUM(CASE WHEN role = 'assistant' THEN LENGTH(COALESCE(clean_text, '')) ELSE 0 END) AS assistant_chars, |
| SUM(CASE WHEN role = 'toolResult' AND COALESCE(original_length, 0) > 5000 THEN 1 ELSE 0 END) AS giant_tool_results, |
| SUM(CASE WHEN role = 'toolResult' THEN 1 ELSE 0 END) AS tool_result_count |
| FROM spooled_entries |
| GROUP BY session_id, agent_id |
| HAVING original_chars > 100000 OR giant_tool_results > 0 OR event_count > 1000 |
| ORDER BY original_chars DESC |
| LIMIT ? |
| """, |
| (limit,), |
| ).fetchall() |
| conn.close() |
|
|
| sessions = [] |
| for row in rows: |
| rd = dict(row) |
| flags = [] |
| if (rd.get("giant_tool_results") or 0) > 0: |
| flags.append("giant_tool_results") |
| if (rd.get("event_count") or 0) > 1000: |
| flags.append("long_running_session") |
| if (rd.get("tool_chars") or 0) > max(1, (rd.get("assistant_chars") or 0)) * 3: |
| flags.append("tool_output_dominates") |
| rd["flags"] = flags |
| rd["recommendation"] = "summarize_before_reuse" if flags else "monitor" |
| sessions.append(rd) |
|
|
| report = { |
| "generated_at": _now(), |
| "sessions_scanned": len(rows), |
| "pressure_sessions": sessions, |
| } |
| _write_artifact("reports/context-pressure-latest.json", report) |
| return report |
|
|
|
|
| def generate_failure_mode_report(limit: int = 200) -> dict: |
| """Mine repeated operational failure modes from spooled transcript rows.""" |
| conn = get_conn() |
| rows = conn.execute( |
| """ |
| SELECT session_id, agent_id, role, tool_name, clean_text, preview, is_error, timestamp, indexed_at |
| FROM spooled_entries |
| WHERE is_error = 1 |
| OR LOWER(COALESCE(clean_text, '')) LIKE '%permission%' |
| OR LOWER(COALESCE(clean_text, '')) LIKE '%approve%' |
| OR LOWER(COALESCE(clean_text, '')) LIKE '%timeout%' |
| OR LOWER(COALESCE(clean_text, '')) LIKE '%failover%' |
| OR LOWER(COALESCE(clean_text, '')) LIKE '%fallback%' |
| OR LOWER(COALESCE(clean_text, '')) LIKE '%no session found%' |
| OR LOWER(COALESCE(clean_text, '')) LIKE '%context limit%' |
| ORDER BY COALESCE(timestamp, indexed_at) DESC |
| LIMIT 5000 |
| """ |
| ).fetchall() |
| conn.close() |
|
|
| buckets: dict[str, list[dict]] = defaultdict(list) |
| tool_errors: Counter[str] = Counter() |
| for r in rows: |
| rd = dict(r) |
| text = (rd.get("clean_text") or rd.get("preview") or "").lower() |
| key = None |
| if rd.get("is_error"): |
| tool = rd.get("tool_name") or "unknown_tool" |
| key = f"tool_error:{tool}" |
| tool_errors[tool] += 1 |
| else: |
| key = _classify_failure_text(text) |
| if key: |
| buckets[key].append({ |
| "session_id": rd.get("session_id"), |
| "agent_id": rd.get("agent_id"), |
| "tool_name": rd.get("tool_name"), |
| "timestamp": rd.get("timestamp") or rd.get("indexed_at"), |
| "preview": (rd.get("preview") or rd.get("clean_text") or "")[:240], |
| }) |
|
|
| patterns = [] |
| for key, hits in sorted(buckets.items(), key=lambda kv: len(kv[1]), reverse=True): |
| patterns.append({ |
| "pattern": key, |
| "count": len(hits), |
| "sessions": sorted({h["session_id"] for h in hits if h.get("session_id")})[:20], |
| "examples": hits[:10], |
| "recommendation": _failure_recommendation(key), |
| }) |
|
|
| report = { |
| "generated_at": _now(), |
| "patterns": patterns[:limit], |
| "top_error_tools": [{"tool": tool, "count": count} for tool, count in tool_errors.most_common(25)], |
| } |
| _write_artifact("reports/failure-modes-latest.json", report) |
| return report |
|
|
|
|
| def _failure_recommendation(pattern: str) -> str: |
| if pattern.startswith("tool_error:"): |
| return "inspect repeated tool failures and add guardrails or repair wrapper" |
| if pattern == "model_failover_or_fallback": |
| return "audit model routing/fallback logs and expose failover in user-visible status" |
| if pattern == "approval_or_permission_stall": |
| return "improve approval prompts and stale approval recovery" |
| if pattern == "timeout": |
| return "identify timeout source and add bounded wait/retry or progress heartbeat" |
| if pattern == "stale_session_reference": |
| return "repair session lifecycle references and stale session cleanup" |
| if pattern == "context_limit": |
| return "summarize/archive before continuing session" |
| return "review clustered examples" |
|
|
|
|
| def _classify_failure_text(text: str) -> str | None: |
| """Classify failure text while avoiding overly broad buckets. |
| |
| The first intelligence pass bucketed any mention of "permission" or |
| "approve" as an approval stall. That was useful for discovery but too noisy |
| for ongoing degradation detection. Keep the public pattern names stable, but |
| require stronger textual evidence. |
| """ |
| lowered = text.lower() |
| if "failover" in lowered or "fallback" in lowered: |
| return "model_failover_or_fallback" |
| if any(phrase in lowered for phrase in ( |
| "approval pending", |
| "approval required", |
| "approve this", |
| "permission denied", |
| "requires permission", |
| "insufficient permission", |
| "not permitted", |
| )): |
| return "approval_or_permission_stall" |
| if any(phrase in lowered for phrase in ( |
| "timed out", |
| "timeout", |
| "deadline exceeded", |
| "context deadline", |
| )): |
| return "timeout" |
| if "no session found" in lowered or "unknown session" in lowered: |
| return "stale_session_reference" |
| if "context limit" in lowered or "context length" in lowered: |
| return "context_limit" |
| return None |
|
|
|
|
| def generate_active_sessions_bulk(limit: int = 40, activity_limit: int = 200) -> dict: |
| """Bulk endpoint for visual clients: recent sessions plus normalized activity.""" |
| sessions = get_recent_sessions(limit) |
| activity = { |
| row["session_id"]: get_session_activity(row["session_id"], activity_limit) |
| for row in sessions |
| } |
| return { |
| "generated_at": _now(), |
| "sessions": sessions, |
| "activity": activity, |
| } |
|
|
|
|
| def generate_intelligence_bundle(kind: str = "light") -> dict: |
| """Run the deterministic intelligence suite and write a compact bundle.""" |
| sprawl_limit = 1000 if kind == "deep" else 300 |
| context_limit = 500 if kind == "deep" else 150 |
| failure_limit = 500 if kind == "deep" else 150 |
| bundle = { |
| "generated_at": _now(), |
| "kind": kind, |
| "session_sprawl": generate_session_sprawl_report(limit=sprawl_limit), |
| "context_pressure": generate_context_pressure_report(limit=context_limit), |
| "failure_modes": generate_failure_mode_report(limit=failure_limit), |
| } |
| _write_artifact(f"reports/intelligence-{kind}-latest.json", bundle) |
| return bundle |
|
|