""" SQLite-backed persistence for tool quality data. Shares the same database file as SkillStore. Storage location (default): /.openspace/openspace.db Tables managed by this module: tool_quality_records — one row per tool (aggregate stats) tool_execution_history — rolling window of per-call records tool_quality_meta — key-value metadata (global_execution_count) """ import os import sqlite3 import threading from datetime import datetime from pathlib import Path from typing import Dict, Optional, Tuple try: import libsql_experimental as libsql except ImportError: libsql = None class _LibsqlCursorProxy: def __init__(self, cursor, conn_proxy): self._cursor = cursor self._conn_proxy = conn_proxy def execute(self, *args, **kwargs): self._cursor.execute(*args, **kwargs) return self def executescript(self, *args, **kwargs): self._cursor.executescript(*args, **kwargs) return self def fetchone(self): row = self._cursor.fetchone() if row is not None and self._conn_proxy.row_factory: return self._conn_proxy.row_factory(self, row) return row def fetchall(self): rows = self._cursor.fetchall() if self._conn_proxy.row_factory: return [self._conn_proxy.row_factory(self, row) for row in rows] return rows @property def description(self): return getattr(self._cursor, "description", []) @property def rowcount(self): return getattr(self._cursor, "rowcount", -1) @property def lastrowid(self): return getattr(self._cursor, "lastrowid", None) class _LibsqlConnectionProxy: def __init__(self, conn): self._conn = conn self.row_factory = None def execute(self, *args, **kwargs): cursor = self.cursor() return cursor.execute(*args, **kwargs) def executescript(self, *args, **kwargs): cursor = self.cursor() return cursor.executescript(*args, **kwargs) def cursor(self): return _LibsqlCursorProxy(self._conn.cursor(), self) def commit(self): if hasattr(self._conn, "commit"): return self._conn.commit() def rollback(self): if hasattr(self._conn, "rollback"): return self._conn.rollback() def close(self): if hasattr(self._conn, "close"): return self._conn.close() class _RowProxy: def __init__(self, row, description): self._row = row self._description = description self._col_map = {col[0]: idx for idx, col in enumerate(description)} def __getitem__(self, item): if isinstance(item, int): return self._row[item] if item in self._col_map: return self._row[self._col_map[item]] raise KeyError(item) def keys(self): return self._col_map.keys() def __iter__(self): return iter(self._row) def __len__(self): return len(self._row) def _dict_factory(cursor, row): if hasattr(cursor, "description") and cursor.description: return _RowProxy(row, cursor.description) return row from .types import ToolQualityRecord, ExecutionRecord, DescriptionQuality from openspace.utils.logging import Logger from openspace.config.constants import PROJECT_ROOT logger = Logger.get_logger(__name__) _DDL = """ CREATE TABLE IF NOT EXISTS tool_quality_records ( tool_key TEXT PRIMARY KEY, backend TEXT NOT NULL, server TEXT NOT NULL DEFAULT 'default', tool_name TEXT NOT NULL, total_calls INTEGER NOT NULL DEFAULT 0, success_count INTEGER NOT NULL DEFAULT 0, total_execution_time_ms REAL NOT NULL DEFAULT 0.0, llm_flagged_count INTEGER NOT NULL DEFAULT 0, description_hash TEXT, desc_clarity REAL, desc_completeness REAL, desc_evaluated_at TEXT, desc_reasoning TEXT, first_seen TEXT NOT NULL, last_updated TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_tqr_backend ON tool_quality_records(backend); CREATE INDEX IF NOT EXISTS idx_tqr_flagged ON tool_quality_records(llm_flagged_count); CREATE TABLE IF NOT EXISTS tool_execution_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, tool_key TEXT NOT NULL REFERENCES tool_quality_records(tool_key) ON DELETE CASCADE, timestamp TEXT NOT NULL, success INTEGER NOT NULL, execution_time_ms REAL NOT NULL DEFAULT 0.0, error_message TEXT ); CREATE INDEX IF NOT EXISTS idx_teh_key ON tool_execution_history(tool_key); CREATE INDEX IF NOT EXISTS idx_teh_ts ON tool_execution_history(timestamp); CREATE TABLE IF NOT EXISTS tool_quality_meta ( key TEXT PRIMARY KEY, value TEXT NOT NULL ); """ class QualityStore: """SQLite-backed persistence for tool quality data. By default uses the same ``.db`` file as ``SkillStore`` (``/.openspace/openspace.db``). Each subsystem creates its own tables independently. """ def __init__(self, db_path: Optional[Path] = None): if db_path is None: db_dir = PROJECT_ROOT / ".openspace" db_dir.mkdir(parents=True, exist_ok=True) db_path = db_dir / "openspace.db" self._db_path = Path(db_path) self._mu = threading.Lock() turso_url = os.environ.get("TURSO_DATABASE_URL") turso_token = os.environ.get("TURSO_AUTH_TOKEN") if turso_url and libsql is not None: raw_conn = libsql.connect(turso_url, auth_token=turso_token) self._conn = _LibsqlConnectionProxy(raw_conn) self._conn.row_factory = _dict_factory logger.debug(f"QualityStore ready (Turso) at {turso_url}") else: self._conn = sqlite3.connect( str(self._db_path), timeout=30.0, check_same_thread=False, ) self._conn.execute("PRAGMA journal_mode=WAL") self._conn.execute("PRAGMA busy_timeout=30000") self._conn.execute("PRAGMA foreign_keys=ON") self._conn.row_factory = sqlite3.Row logger.debug(f"QualityStore ready (SQLite) at {self._db_path}") self._init_tables() def _init_tables(self) -> None: with self._mu: self._conn.executescript(_DDL) self._conn.commit() def load_all(self) -> Tuple[Dict[str, ToolQualityRecord], int]: """Load all quality records and global execution count.""" with self._mu: rows = self._conn.execute( "SELECT * FROM tool_quality_records" ).fetchall() records: Dict[str, ToolQualityRecord] = {} for row in rows: tool_key = row["tool_key"] record = ToolQualityRecord( tool_key=tool_key, backend=row["backend"], server=row["server"], tool_name=row["tool_name"], total_calls=row["total_calls"], success_count=row["success_count"], total_execution_time_ms=row["total_execution_time_ms"], llm_flagged_count=row["llm_flagged_count"], description_hash=row["description_hash"], first_seen=datetime.fromisoformat(row["first_seen"]), last_updated=datetime.fromisoformat(row["last_updated"]), ) # Description quality (all-or-nothing: clarity present → all present) if row["desc_clarity"] is not None: record.description_quality = DescriptionQuality( clarity=row["desc_clarity"], completeness=row["desc_completeness"], evaluated_at=datetime.fromisoformat(row["desc_evaluated_at"]), reasoning=row["desc_reasoning"] or "", ) # Recent execution history (most recent N, restored chronologically) exec_rows = self._conn.execute( "SELECT timestamp, success, execution_time_ms, error_message " "FROM tool_execution_history " "WHERE tool_key = ? ORDER BY id DESC LIMIT ?", (tool_key, ToolQualityRecord.MAX_RECENT_EXECUTIONS), ).fetchall() record.recent_executions = [ ExecutionRecord( timestamp=datetime.fromisoformat(er["timestamp"]), success=bool(er["success"]), execution_time_ms=er["execution_time_ms"], error_message=er["error_message"], ) for er in reversed(exec_rows) ] records[tool_key] = record # Global metadata meta_row = self._conn.execute( "SELECT value FROM tool_quality_meta " "WHERE key = 'global_execution_count'" ).fetchone() global_count = int(meta_row["value"]) if meta_row else 0 logger.info( f"Loaded {len(records)} quality records from SQLite " f"(global_count={global_count})" ) return records, global_count async def save_all( self, records: Dict[str, ToolQualityRecord], global_execution_count: int = 0, ) -> None: """Persist all records (bulk).""" self._save_all_sync(records, global_execution_count) async def save_record( self, record: ToolQualityRecord, all_records: Dict[str, ToolQualityRecord], global_execution_count: int = 0, ) -> None: """Persist a single record (incremental — much cheaper than save_all).""" with self._mu: try: self._upsert_record(record) self._conn.execute( "INSERT OR REPLACE INTO tool_quality_meta " "(key, value) VALUES (?, ?)", ("global_execution_count", str(global_execution_count)), ) self._conn.commit() except Exception as e: self._conn.rollback() logger.error(f"Failed to save record {record.tool_key}: {e}") def clear(self) -> None: """Delete all quality data.""" with self._mu: self._conn.execute("DELETE FROM tool_execution_history") self._conn.execute("DELETE FROM tool_quality_records") self._conn.execute("DELETE FROM tool_quality_meta") self._conn.commit() logger.info("Quality data cleared") def close(self) -> None: """Close the database connection.""" try: self._conn.close() except Exception: pass def _save_all_sync( self, records: Dict[str, ToolQualityRecord], global_execution_count: int = 0, ) -> None: """Synchronous full save (used by async wrapper and migration).""" with self._mu: try: for record in records.values(): self._upsert_record(record) self._conn.execute( "INSERT OR REPLACE INTO tool_quality_meta " "(key, value) VALUES (?, ?)", ("global_execution_count", str(global_execution_count)), ) self._conn.commit() except Exception as e: self._conn.rollback() logger.error(f"Failed to bulk-save quality records: {e}") def _upsert_record(self, record: ToolQualityRecord) -> None: """Upsert one tool_quality_records row + its execution history. Caller MUST hold ``self._mu``. Does NOT commit — caller manages the transaction boundary. """ dq = record.description_quality self._conn.execute( """INSERT OR REPLACE INTO tool_quality_records (tool_key, backend, server, tool_name, total_calls, success_count, total_execution_time_ms, llm_flagged_count, description_hash, desc_clarity, desc_completeness, desc_evaluated_at, desc_reasoning, first_seen, last_updated) VALUES (?,?,?,?, ?,?,?, ?,?, ?,?,?,?, ?,?)""", ( record.tool_key, record.backend, record.server, record.tool_name, record.total_calls, record.success_count, record.total_execution_time_ms, record.llm_flagged_count, record.description_hash, dq.clarity if dq else None, dq.completeness if dq else None, dq.evaluated_at.isoformat() if dq else None, dq.reasoning if dq else None, record.first_seen.isoformat(), record.last_updated.isoformat(), ), ) # Sync execution history: delete + re-insert. # For ≤ MAX_RECENT_EXECUTIONS rows this is fast and avoids # complex diff logic between in-memory and DB state. self._conn.execute( "DELETE FROM tool_execution_history WHERE tool_key = ?", (record.tool_key,), ) if record.recent_executions: self._conn.executemany( "INSERT INTO tool_execution_history " "(tool_key, timestamp, success, execution_time_ms, error_message) " "VALUES (?,?,?,?,?)", [ ( record.tool_key, e.timestamp.isoformat(), int(e.success), e.execution_time_ms, e.error_message, ) for e in record.recent_executions ], )