File size: 14,233 Bytes
399b80c
 
 
 
 
 
 
 
 
 
 
 
6596274
399b80c
 
 
 
 
 
6596274
 
 
 
 
46e3043
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
399b80c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6596274
 
 
 
46e3043
 
 
6596274
 
 
 
 
 
 
 
 
 
 
 
399b80c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
"""
SQLite-backed persistence for tool quality data. Shares the same database file as SkillStore.

Storage location (default):
    <project_root>/.openspace/openspace.db

Tables managed by this module:
    tool_quality_records   — one row per tool (aggregate stats)
    tool_execution_history — rolling window of per-call records
    tool_quality_meta      — key-value metadata (global_execution_count)
"""

import os
import sqlite3
import threading
from datetime import datetime
from pathlib import Path
from typing import Dict, Optional, Tuple

try:
    import libsql_experimental as libsql
except ImportError:
    libsql = None

class _LibsqlCursorProxy:
    def __init__(self, cursor, conn_proxy):
        self._cursor = cursor
        self._conn_proxy = conn_proxy

    def execute(self, *args, **kwargs):
        self._cursor.execute(*args, **kwargs)
        return self

    def executescript(self, *args, **kwargs):
        self._cursor.executescript(*args, **kwargs)
        return self

    def fetchone(self):
        row = self._cursor.fetchone()
        if row is not None and self._conn_proxy.row_factory:
            return self._conn_proxy.row_factory(self, row)
        return row

    def fetchall(self):
        rows = self._cursor.fetchall()
        if self._conn_proxy.row_factory:
            return [self._conn_proxy.row_factory(self, row) for row in rows]
        return rows

    @property
    def description(self):
        return getattr(self._cursor, "description", [])

    @property
    def rowcount(self):
        return getattr(self._cursor, "rowcount", -1)

    @property
    def lastrowid(self):
        return getattr(self._cursor, "lastrowid", None)

class _LibsqlConnectionProxy:
    def __init__(self, conn):
        self._conn = conn
        self.row_factory = None

    def execute(self, *args, **kwargs):
        cursor = self.cursor()
        return cursor.execute(*args, **kwargs)

    def executescript(self, *args, **kwargs):
        cursor = self.cursor()
        return cursor.executescript(*args, **kwargs)

    def cursor(self):
        return _LibsqlCursorProxy(self._conn.cursor(), self)

    def commit(self):
        if hasattr(self._conn, "commit"):
            return self._conn.commit()

    def rollback(self):
        if hasattr(self._conn, "rollback"):
            return self._conn.rollback()

    def close(self):
        if hasattr(self._conn, "close"):
            return self._conn.close()

class _RowProxy:
    def __init__(self, row, description):
        self._row = row
        self._description = description
        self._col_map = {col[0]: idx for idx, col in enumerate(description)}

    def __getitem__(self, item):
        if isinstance(item, int):
            return self._row[item]
        if item in self._col_map:
            return self._row[self._col_map[item]]
        raise KeyError(item)

    def keys(self):
        return self._col_map.keys()

    def __iter__(self):
        return iter(self._row)

    def __len__(self):
        return len(self._row)

def _dict_factory(cursor, row):
    if hasattr(cursor, "description") and cursor.description:
        return _RowProxy(row, cursor.description)
    return row

from .types import ToolQualityRecord, ExecutionRecord, DescriptionQuality
from openspace.utils.logging import Logger
from openspace.config.constants import PROJECT_ROOT

logger = Logger.get_logger(__name__)


_DDL = """
CREATE TABLE IF NOT EXISTS tool_quality_records (
    tool_key                TEXT PRIMARY KEY,
    backend                 TEXT NOT NULL,
    server                  TEXT NOT NULL DEFAULT 'default',
    tool_name               TEXT NOT NULL,
    total_calls             INTEGER NOT NULL DEFAULT 0,
    success_count           INTEGER NOT NULL DEFAULT 0,
    total_execution_time_ms REAL    NOT NULL DEFAULT 0.0,
    llm_flagged_count       INTEGER NOT NULL DEFAULT 0,
    description_hash        TEXT,
    desc_clarity            REAL,
    desc_completeness       REAL,
    desc_evaluated_at       TEXT,
    desc_reasoning          TEXT,
    first_seen              TEXT NOT NULL,
    last_updated            TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_tqr_backend ON tool_quality_records(backend);
CREATE INDEX IF NOT EXISTS idx_tqr_flagged ON tool_quality_records(llm_flagged_count);

CREATE TABLE IF NOT EXISTS tool_execution_history (
    id                INTEGER PRIMARY KEY AUTOINCREMENT,
    tool_key          TEXT NOT NULL
        REFERENCES tool_quality_records(tool_key) ON DELETE CASCADE,
    timestamp         TEXT NOT NULL,
    success           INTEGER NOT NULL,
    execution_time_ms REAL    NOT NULL DEFAULT 0.0,
    error_message     TEXT
);
CREATE INDEX IF NOT EXISTS idx_teh_key ON tool_execution_history(tool_key);
CREATE INDEX IF NOT EXISTS idx_teh_ts  ON tool_execution_history(timestamp);

CREATE TABLE IF NOT EXISTS tool_quality_meta (
    key   TEXT PRIMARY KEY,
    value TEXT NOT NULL
);
"""


class QualityStore:
    """SQLite-backed persistence for tool quality data.

    By default uses the same ``.db`` file as ``SkillStore``
    (``<project_root>/.openspace/openspace.db``).
    Each subsystem creates its own tables independently.
    """

    def __init__(self, db_path: Optional[Path] = None):
        if db_path is None:
            db_dir = PROJECT_ROOT / ".openspace"
            db_dir.mkdir(parents=True, exist_ok=True)
            db_path = db_dir / "openspace.db"

        self._db_path = Path(db_path)
        self._mu = threading.Lock()

        turso_url = os.environ.get("TURSO_DATABASE_URL")
        turso_token = os.environ.get("TURSO_AUTH_TOKEN")

        if turso_url and libsql is not None:
            raw_conn = libsql.connect(turso_url, auth_token=turso_token)
            self._conn = _LibsqlConnectionProxy(raw_conn)
            self._conn.row_factory = _dict_factory
            logger.debug(f"QualityStore ready (Turso) at {turso_url}")
        else:
            self._conn = sqlite3.connect(
                str(self._db_path),
                timeout=30.0,
                check_same_thread=False,
            )
            self._conn.execute("PRAGMA journal_mode=WAL")
            self._conn.execute("PRAGMA busy_timeout=30000")
            self._conn.execute("PRAGMA foreign_keys=ON")
            self._conn.row_factory = sqlite3.Row
            logger.debug(f"QualityStore ready (SQLite) at {self._db_path}")

        self._init_tables()

    def _init_tables(self) -> None:
        with self._mu:
            self._conn.executescript(_DDL)
            self._conn.commit()

    def load_all(self) -> Tuple[Dict[str, ToolQualityRecord], int]:
        """Load all quality records and global execution count."""
        with self._mu:
            rows = self._conn.execute(
                "SELECT * FROM tool_quality_records"
            ).fetchall()

            records: Dict[str, ToolQualityRecord] = {}
            for row in rows:
                tool_key = row["tool_key"]
                record = ToolQualityRecord(
                    tool_key=tool_key,
                    backend=row["backend"],
                    server=row["server"],
                    tool_name=row["tool_name"],
                    total_calls=row["total_calls"],
                    success_count=row["success_count"],
                    total_execution_time_ms=row["total_execution_time_ms"],
                    llm_flagged_count=row["llm_flagged_count"],
                    description_hash=row["description_hash"],
                    first_seen=datetime.fromisoformat(row["first_seen"]),
                    last_updated=datetime.fromisoformat(row["last_updated"]),
                )

                # Description quality (all-or-nothing: clarity present → all present)
                if row["desc_clarity"] is not None:
                    record.description_quality = DescriptionQuality(
                        clarity=row["desc_clarity"],
                        completeness=row["desc_completeness"],
                        evaluated_at=datetime.fromisoformat(row["desc_evaluated_at"]),
                        reasoning=row["desc_reasoning"] or "",
                    )

                # Recent execution history (most recent N, restored chronologically)
                exec_rows = self._conn.execute(
                    "SELECT timestamp, success, execution_time_ms, error_message "
                    "FROM tool_execution_history "
                    "WHERE tool_key = ? ORDER BY id DESC LIMIT ?",
                    (tool_key, ToolQualityRecord.MAX_RECENT_EXECUTIONS),
                ).fetchall()
                record.recent_executions = [
                    ExecutionRecord(
                        timestamp=datetime.fromisoformat(er["timestamp"]),
                        success=bool(er["success"]),
                        execution_time_ms=er["execution_time_ms"],
                        error_message=er["error_message"],
                    )
                    for er in reversed(exec_rows)
                ]

                records[tool_key] = record

            # Global metadata
            meta_row = self._conn.execute(
                "SELECT value FROM tool_quality_meta "
                "WHERE key = 'global_execution_count'"
            ).fetchone()
            global_count = int(meta_row["value"]) if meta_row else 0

            logger.info(
                f"Loaded {len(records)} quality records from SQLite "
                f"(global_count={global_count})"
            )
            return records, global_count

    async def save_all(
        self,
        records: Dict[str, ToolQualityRecord],
        global_execution_count: int = 0,
    ) -> None:
        """Persist all records (bulk)."""
        self._save_all_sync(records, global_execution_count)

    async def save_record(
        self,
        record: ToolQualityRecord,
        all_records: Dict[str, ToolQualityRecord],
        global_execution_count: int = 0,
    ) -> None:
        """Persist a single record (incremental — much cheaper than save_all)."""
        with self._mu:
            try:
                self._upsert_record(record)
                self._conn.execute(
                    "INSERT OR REPLACE INTO tool_quality_meta "
                    "(key, value) VALUES (?, ?)",
                    ("global_execution_count", str(global_execution_count)),
                )
                self._conn.commit()
            except Exception as e:
                self._conn.rollback()
                logger.error(f"Failed to save record {record.tool_key}: {e}")

    def clear(self) -> None:
        """Delete all quality data."""
        with self._mu:
            self._conn.execute("DELETE FROM tool_execution_history")
            self._conn.execute("DELETE FROM tool_quality_records")
            self._conn.execute("DELETE FROM tool_quality_meta")
            self._conn.commit()
        logger.info("Quality data cleared")

    def close(self) -> None:
        """Close the database connection."""
        try:
            self._conn.close()
        except Exception:
            pass

    def _save_all_sync(
        self,
        records: Dict[str, ToolQualityRecord],
        global_execution_count: int = 0,
    ) -> None:
        """Synchronous full save (used by async wrapper and migration)."""
        with self._mu:
            try:
                for record in records.values():
                    self._upsert_record(record)
                self._conn.execute(
                    "INSERT OR REPLACE INTO tool_quality_meta "
                    "(key, value) VALUES (?, ?)",
                    ("global_execution_count", str(global_execution_count)),
                )
                self._conn.commit()
            except Exception as e:
                self._conn.rollback()
                logger.error(f"Failed to bulk-save quality records: {e}")

    def _upsert_record(self, record: ToolQualityRecord) -> None:
        """Upsert one tool_quality_records row + its execution history.

        Caller MUST hold ``self._mu``.  Does NOT commit — caller manages
        the transaction boundary.
        """
        dq = record.description_quality
        self._conn.execute(
            """INSERT OR REPLACE INTO tool_quality_records
            (tool_key, backend, server, tool_name,
             total_calls, success_count, total_execution_time_ms,
             llm_flagged_count, description_hash,
             desc_clarity, desc_completeness, desc_evaluated_at, desc_reasoning,
             first_seen, last_updated)
            VALUES (?,?,?,?, ?,?,?, ?,?, ?,?,?,?, ?,?)""",
            (
                record.tool_key,
                record.backend,
                record.server,
                record.tool_name,
                record.total_calls,
                record.success_count,
                record.total_execution_time_ms,
                record.llm_flagged_count,
                record.description_hash,
                dq.clarity if dq else None,
                dq.completeness if dq else None,
                dq.evaluated_at.isoformat() if dq else None,
                dq.reasoning if dq else None,
                record.first_seen.isoformat(),
                record.last_updated.isoformat(),
            ),
        )

        # Sync execution history: delete + re-insert.
        # For ≤ MAX_RECENT_EXECUTIONS rows this is fast and avoids
        # complex diff logic between in-memory and DB state.
        self._conn.execute(
            "DELETE FROM tool_execution_history WHERE tool_key = ?",
            (record.tool_key,),
        )
        if record.recent_executions:
            self._conn.executemany(
                "INSERT INTO tool_execution_history "
                "(tool_key, timestamp, success, execution_time_ms, error_message) "
                "VALUES (?,?,?,?,?)",
                [
                    (
                        record.tool_key,
                        e.timestamp.isoformat(),
                        int(e.success),
                        e.execution_time_ms,
                        e.error_message,
                    )
                    for e in record.recent_executions
                ],
            )