Ordo
Initial public release
63c75d5
import sqlite3
from config import settings
def _compact_title(text: str, *, fallback: str = "") -> str:
title = " ".join((text or "").strip().split())
if not title:
return fallback
prefixes = (
"Conversation info",
"Current user request:",
"OpenClaw assembled context",
"Treat the conversation context",
)
for prefix in prefixes:
if title.startswith(prefix):
return fallback
return title[:96]
def _derive_display_title(session_id: str, activity: list[dict]) -> str:
for row in activity:
role = row.get("role") or ""
if role not in {"user", "assistant"}:
continue
title = _compact_title(row.get("clean_text") or row.get("preview") or "")
if title:
return title
return session_id[:32]
def get_conn() -> sqlite3.Connection:
conn = sqlite3.connect(settings.db_path)
conn.row_factory = sqlite3.Row
return conn
def spool_entries(rows: list[dict]):
"""Bulk-insert spooled entries. Skips duplicates via UNIQUE constraint."""
if not rows:
return 0
conn = get_conn()
inserted = 0
for row in rows:
try:
cur = conn.execute(
"""
INSERT OR IGNORE INTO spooled_entries
(session_id, agent_id, entry_idx, entry_type, role, timestamp,
tool_name, clean_text, original_length, preview, is_error)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
row["session_id"],
row["agent_id"],
row.get("entry_idx", 0),
row.get("entry_type"),
row.get("role"),
row.get("timestamp"),
row.get("tool_name"),
row["clean_text"],
row.get("original_length"),
row["preview"],
int(bool(row.get("is_error", False))),
),
)
inserted += cur.rowcount > 0
except Exception:
pass
conn.commit()
conn.close()
return inserted
def get_file_state(file_path: str) -> dict | None:
conn = get_conn()
row = conn.execute(
"SELECT value FROM spool_state WHERE key = ?",
(f"file::{file_path}",),
).fetchone()
conn.close()
if not row:
return None
value = row["value"]
try:
parts = value.split(":")
state = {"mtime": int(parts[0]), "size": int(parts[1])}
if len(parts) > 2:
state["last_entry_idx"] = int(parts[2])
return state
except Exception:
return None
def set_file_state(file_path: str, *, mtime: int, size: int, last_entry_idx: int = -1):
conn = get_conn()
conn.execute(
"INSERT OR REPLACE INTO spool_state (key, value) VALUES (?, ?)",
(f"file::{file_path}", f"{mtime}:{size}:{last_entry_idx}"),
)
conn.commit()
conn.close()
def get_recent_sessions(limit: int = 25) -> list[dict]:
conn = get_conn()
rows = conn.execute(
"""
SELECT session_id, agent_id,
MAX(COALESCE(timestamp, indexed_at)) AS last_event_at,
COUNT(*) AS event_count,
SUM(CASE WHEN role = 'toolResult' THEN 1 ELSE 0 END) AS tool_result_count,
SUM(CASE WHEN is_error = 1 THEN 1 ELSE 0 END) AS error_count,
SUM(CASE WHEN role = 'toolResult' AND original_length > 5000 THEN 1 ELSE 0 END) AS noisy_tool_results,
MAX(entry_idx) AS last_entry_idx
FROM spooled_entries
GROUP BY session_id, agent_id
ORDER BY MAX(COALESCE(timestamp, indexed_at)) DESC
LIMIT ?
""",
(limit,),
).fetchall()
conn.close()
# Attach last summary text to each session
result = []
for r in rows:
rd = dict(r)
# Add health hints
tool_count = rd.get("tool_result_count", 0) or 0
error_count = rd.get("error_count", 0) or 0
noisy = rd.get("noisy_tool_results", 0) or 0
event_count = rd.get("event_count", 0) or 0
hints = []
if error_count > 0:
hints.append(f"{error_count} error(s)")
if noisy > 2:
hints.append(f"{noisy} noisy tool outputs")
if tool_count > 0 and event_count > 0 and tool_count / event_count > 0.7:
hints.append("tool-heavy")
rd["hints"] = hints
rd["health"] = "error" if error_count > 0 else "warning" if noisy > 2 else "ok"
rd["display_title"] = _derive_display_title(rd["session_id"], get_session_activity(rd["session_id"], limit=50))
result.append(rd)
return result
def get_session_summary(session_id: str) -> dict | None:
"""Return the same summary shape as get_recent_sessions for one session."""
conn = get_conn()
row = conn.execute(
"""
SELECT session_id, agent_id,
MAX(COALESCE(timestamp, indexed_at)) AS last_event_at,
COUNT(*) AS event_count,
SUM(CASE WHEN role = 'toolResult' THEN 1 ELSE 0 END) AS tool_result_count,
SUM(CASE WHEN is_error = 1 THEN 1 ELSE 0 END) AS error_count,
SUM(CASE WHEN role = 'toolResult' AND original_length > 5000 THEN 1 ELSE 0 END) AS noisy_tool_results,
MAX(entry_idx) AS last_entry_idx
FROM spooled_entries
WHERE session_id = ?
GROUP BY session_id, agent_id
LIMIT 1
""",
(session_id,),
).fetchone()
conn.close()
if not row:
return None
rd = dict(row)
tool_count = rd.get("tool_result_count", 0) or 0
error_count = rd.get("error_count", 0) or 0
noisy = rd.get("noisy_tool_results", 0) or 0
event_count = rd.get("event_count", 0) or 0
hints = []
if error_count > 0:
hints.append(f"{error_count} error(s)")
if noisy > 2:
hints.append(f"{noisy} noisy tool outputs")
if tool_count > 0 and event_count > 0 and tool_count / event_count > 0.7:
hints.append("tool-heavy")
rd["hints"] = hints
rd["health"] = "error" if error_count > 0 else "warning" if noisy > 2 else "ok"
rd["display_title"] = _derive_display_title(rd["session_id"], get_session_activity(rd["session_id"], limit=50))
return rd
def get_session_activity(session_id: str, limit: int = 200) -> list[dict]:
conn = get_conn()
rows = conn.execute(
"""
SELECT session_id, agent_id, entry_idx, role, entry_type, timestamp,
tool_name, clean_text, preview, original_length, is_error, indexed_at
FROM spooled_entries
WHERE session_id = ?
ORDER BY entry_idx DESC
LIMIT ?
""",
(session_id, limit),
).fetchall()
conn.close()
return [dict(r) for r in reversed(rows)]