File size: 9,861 Bytes
85a0eea
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
aecadf5
 
 
 
 
 
 
 
 
 
85a0eea
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303

# =============================================================================
# app/db_sync.py
# Internal SQLite IPC β€” app/* state & communication
# Universal MCP Hub (Sandboxed) - based on PyFundaments Architecture
# Copyright 2026 - Volkan KΓΌcΓΌkbudak
# Apache License V. 2 + ESOL 1.1
# Repo: https://github.com/VolkanSah/Universal-MCP-Hub-sandboxed
# =============================================================================
# ARCHITECTURE NOTE:
#   This file lives exclusively in app/ and is ONLY started by app/app.py.
#   NO direct access to fundaments/*, .env, or Guardian (main.py).
#   DB path comes from app/.pyfun [DB_SYNC] β†’ SQLITE_PATH via app/config.py.
#
# CRITICAL RULES:
#   - This is NOT postgresql.py β€” cloud DB is Guardian-only!
#   - db_sync ONLY manages its own tables (hub_state, tool_cache)
#   - NEVER touch Guardian tables (users, sessions) β€” those belong to user_handler.py
#   - SQLite path is shared with user_handler.py via SQLITE_PATH
#   - app/* modules call db_sync.write() / db_sync.read() β€” never aiosqlite directly
#
# TABLE OWNERSHIP:
#   users, sessions  β†’ Guardian (fundaments/user_handler.py) β€” DO NOT TOUCH!
#   hub_state        β†’ db_sync (app/* internal state)
#   tool_cache       β†’ db_sync (app/* tool response cache)
# =============================================================================

import aiosqlite
import logging
import json
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional

from . import config

logger = logging.getLogger("db_sync")

# =============================================================================
# Internal State
# =============================================================================
_db_path: Optional[str] = None
_initialized: bool = False


# =============================================================================
# Initialization β€” called by app/app.py (parameterless, sandboxed)
# =============================================================================

async def initialize() -> None:
    global _db_path, _initialized

    if _initialized:
        return

    db_cfg   = config.get_db_sync()
    raw_path = db_cfg.get("SQLITE_PATH", "app/.hub_state.db")
    
    # HF Spaces: SPACE_ID is set β†’ filesystem is read-only except /tmp/
    import os
    if os.getenv("SPACE_ID"):
        filename = os.path.basename(raw_path)
        _db_path = f"/tmp/{filename}"
        logger.info(f"HF Space detected β€” SQLite relocated to {_db_path}")
    else:
        _db_path = raw_path

    await _init_tables()

    _initialized = True
    logger.info(f"db_sync initialized β€” path: {_db_path}")


# =============================================================================
# SECTION 1 β€” Table Setup (app/* tables only!)
# =============================================================================

async def _init_tables() -> None:
    """
    Creates app/* internal tables if they don't exist.
    NEVER modifies Guardian tables (users, sessions).
    """
    async with aiosqlite.connect(_db_path) as db:

        # hub_state β€” generic key/value store for app/* modules
        await db.execute("""
            CREATE TABLE IF NOT EXISTS hub_state (
                key        TEXT PRIMARY KEY,
                value      TEXT,
                updated_at TEXT
            )
        """)

        # tool_cache β€” cached tool responses to reduce API calls
        await db.execute("""
            CREATE TABLE IF NOT EXISTS tool_cache (
                id         INTEGER PRIMARY KEY AUTOINCREMENT,
                tool_name  TEXT NOT NULL,
                prompt     TEXT NOT NULL,
                response   TEXT NOT NULL,
                provider   TEXT,
                model      TEXT,
                created_at TEXT
            )
        """)

        await db.execute("""
            CREATE INDEX IF NOT EXISTS idx_tool_cache_tool
            ON tool_cache(tool_name)
        """)

        await db.commit()

    logger.info("db_sync tables ready.")


# =============================================================================
# SECTION 2 β€” Key/Value Store (hub_state table)
# =============================================================================

async def write(key: str, value: Any) -> None:
    """
    Write a value to hub_state key/value store.
    Value is JSON-serialized β€” supports dicts, lists, strings, numbers.

    Args:
        key:   Unique key string (e.g. 'scheduler.last_run').
        value: Any JSON-serializable value.
    """
    _check_init()
    now = datetime.now(timezone.utc).isoformat()

    async with aiosqlite.connect(_db_path) as db:
        await db.execute("""
            INSERT OR REPLACE INTO hub_state (key, value, updated_at)
            VALUES (?, ?, ?)
        """, (key, json.dumps(value), now))
        await db.commit()


async def read(key: str, default: Any = None) -> Any:
    """
    Read a value from hub_state key/value store.
    Returns default if key does not exist.

    Args:
        key:     Key string to look up.
        default: Default value if key not found. Default: None.

    Returns:
        Deserialized value, or default if not found.
    """
    _check_init()

    async with aiosqlite.connect(_db_path) as db:
        cursor = await db.execute(
            "SELECT value FROM hub_state WHERE key = ?", (key,)
        )
        row = await cursor.fetchone()

    if row is None:
        return default

    try:
        return json.loads(row[0])
    except (json.JSONDecodeError, TypeError):
        return row[0]


async def delete(key: str) -> None:
    """
    Delete a key from hub_state.

    Args:
        key: Key string to delete.
    """
    _check_init()

    async with aiosqlite.connect(_db_path) as db:
        await db.execute("DELETE FROM hub_state WHERE key = ?", (key,))
        await db.commit()


# =============================================================================
# SECTION 3 β€” Tool Cache (tool_cache table)
# =============================================================================

async def cache_write(
    tool_name: str,
    prompt: str,
    response: str,
    provider: str = None,
    model: str = None,
) -> None:
    """
    Cache a tool response to reduce redundant API calls.

    Args:
        tool_name: Tool name (e.g. 'llm_complete', 'web_search').
        prompt:    The input prompt/query that was used.
        response:  The response to cache.
        provider:  Provider name used (optional).
        model:     Model name used (optional).
    """
    _check_init()

    db_cfg      = config.get_db_sync()
    max_entries = int(db_cfg.get("MAX_CACHE_ENTRIES", "1000"))
    now         = datetime.now(timezone.utc).isoformat()

    async with aiosqlite.connect(_db_path) as db:
        await db.execute("""
            INSERT INTO tool_cache (tool_name, prompt, response, provider, model, created_at)
            VALUES (?, ?, ?, ?, ?, ?)
        """, (tool_name, prompt, response, provider, model, now))

        # Enforce MAX_CACHE_ENTRIES β€” delete oldest if exceeded
        await db.execute("""
            DELETE FROM tool_cache WHERE id NOT IN (
                SELECT id FROM tool_cache ORDER BY created_at DESC LIMIT ?
            )
        """, (max_entries,))

        await db.commit()


async def cache_read(tool_name: str, prompt: str) -> Optional[str]:
    """
    Read a cached tool response.
    Returns None if no cache entry exists.

    Args:
        tool_name: Tool name to look up.
        prompt:    The exact prompt/query to match.

    Returns:
        Cached response string, or None if not found.
    """
    _check_init()

    async with aiosqlite.connect(_db_path) as db:
        cursor = await db.execute("""
            SELECT response FROM tool_cache
            WHERE tool_name = ? AND prompt = ?
            ORDER BY created_at DESC LIMIT 1
        """, (tool_name, prompt))
        row = await cursor.fetchone()

    return row[0] if row else None


# =============================================================================
# SECTION 4 β€” Read-Only Query (for mcp.py db_query tool)
# =============================================================================

async def query(sql: str) -> List[Dict]:
    """
    Execute a read-only SELECT query on the internal hub state database.
    Only SELECT statements are permitted β€” write operations are blocked.
    Called by mcp.py db_query tool when db_sync.py is active.

    Args:
        sql: SQL SELECT statement to execute.

    Returns:
        List of result rows as dicts.

    Raises:
        ValueError: If the query is not a SELECT statement.
    """
    _check_init()

    if not sql.strip().upper().startswith("SELECT"):
        raise ValueError("Only SELECT queries are permitted in db_query tool.")

    async with aiosqlite.connect(_db_path) as db:
        db.row_factory = aiosqlite.Row
        cursor = await db.execute(sql)
        rows   = await cursor.fetchall()
        return [dict(r) for r in rows]


# =============================================================================
# SECTION 5 β€” Helpers
# =============================================================================

def _check_init() -> None:
    """Raise RuntimeError if db_sync was not initialized."""
    if not _initialized or not _db_path:
        raise RuntimeError("db_sync not initialized β€” call initialize() first.")


def is_ready() -> bool:
    """Returns True if db_sync is initialized and ready."""
    return _initialized and _db_path is not None


# =============================================================================
# Direct execution guard
# =============================================================================

if __name__ == "__main__":
    print("WARNING: Run via main.py β†’ app.py, not directly.")