File size: 9,609 Bytes
9b18003
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
90c690e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9b18003
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
"""
Persistence layer: Supabase (PostgreSQL) + Redis (Upstash).

Provides:
  - DB pool (asyncpg) for conversations, messages, executions
  - Redis pub/sub for realtime SSE/WebSocket bridging
  - Auto-init schema on startup
"""

from __future__ import annotations

import asyncio
import json
import logging
import os
import time
from typing import Any, AsyncGenerator, Dict, List, Optional

logger = logging.getLogger(__name__)

# ─── Connection state ─────────────────────────────────────────────────────────

_db_pool = None          # asyncpg.Pool
_redis = None            # redis.asyncio.Redis


async def init_db() -> bool:
    """Connect to Supabase PostgreSQL. Returns True on success."""
    global _db_pool
    db_url = os.environ.get("DATABASE_URL", "")
    if not db_url:
        logger.warning("DATABASE_URL not set β€” DB persistence disabled")
        return False
    try:
        import asyncpg
        from urllib.parse import urlparse, urlunparse, unquote

        # asyncpg requires DSN with real special chars decoded
        # but password @ must stay encoded β€” use dsn kwarg approach
        parsed = urlparse(db_url)
        # Build explicit kwargs to avoid double-@ parsing issues
        user = unquote(parsed.username or "")
        password = unquote(parsed.password or "")
        host = parsed.hostname or ""
        port = parsed.port or 5432
        database = (parsed.path or "/postgres").lstrip("/")

        _db_pool = await asyncpg.create_pool(
            user=user,
            password=password,
            host=host,
            port=port,
            database=database,
            min_size=1,
            max_size=8,
            ssl="require",
        )
        await _create_schema()
        logger.info("βœ… Supabase/PostgreSQL connected")
        return True
    except Exception as e:
        logger.error("❌ DB connection failed: %s", e)
        return False


async def init_redis() -> bool:
    """Connect to Upstash Redis. Returns True on success."""
    global _redis
    redis_url = os.environ.get("REDIS_URL", "")
    if not redis_url:
        logger.warning("REDIS_URL not set β€” Redis realtime disabled")
        return False
    try:
        import redis.asyncio as aioredis
        _redis = aioredis.from_url(
            redis_url,
            encoding="utf-8",
            decode_responses=True,
            ssl_cert_reqs=None,
        )
        await _redis.ping()
        logger.info("βœ… Redis (Upstash) connected")
        return True
    except Exception as e:
        logger.error("❌ Redis connection failed: %s", e)
        return False


async def close():
    global _db_pool, _redis
    if _db_pool:
        await _db_pool.close()
    if _redis:
        await _redis.close()


# ─── Schema init ──────────────────────────────────────────────────────────────

async def _create_schema():
    async with _db_pool.acquire() as conn:
        await conn.execute("""
            CREATE TABLE IF NOT EXISTS conversations (
                id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
                user_id TEXT NOT NULL DEFAULT 'anonymous',
                title TEXT DEFAULT 'New conversation',
                model TEXT DEFAULT 'gemini-2.0-flash',
                provider TEXT DEFAULT 'gemini',
                task_type TEXT DEFAULT 'general',
                created_at TIMESTAMPTZ DEFAULT NOW(),
                updated_at TIMESTAMPTZ DEFAULT NOW()
            );

            CREATE TABLE IF NOT EXISTS messages (
                id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
                conversation_id UUID REFERENCES conversations(id) ON DELETE CASCADE,
                role TEXT NOT NULL CHECK(role IN ('user','assistant','system','tool')),
                content TEXT NOT NULL,
                provider TEXT,
                model TEXT,
                metadata JSONB DEFAULT '{}',
                created_at TIMESTAMPTZ DEFAULT NOW()
            );

            CREATE TABLE IF NOT EXISTS executions (
                id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
                conversation_id UUID REFERENCES conversations(id) ON DELETE SET NULL,
                language TEXT DEFAULT 'python',
                code TEXT NOT NULL,
                output TEXT DEFAULT '',
                error TEXT DEFAULT '',
                exit_code INT DEFAULT 0,
                duration_ms INT DEFAULT 0,
                sandbox_id TEXT,
                created_at TIMESTAMPTZ DEFAULT NOW()
            );

            CREATE INDEX IF NOT EXISTS idx_messages_conv ON messages(conversation_id, created_at);
            CREATE INDEX IF NOT EXISTS idx_executions_conv ON executions(conversation_id);
            CREATE INDEX IF NOT EXISTS idx_conversations_user ON conversations(user_id, updated_at DESC);
        """)
    logger.info("βœ… DB schema ready")


# ─── Conversation helpers ─────────────────────────────────────────────────────

async def create_conversation(
    user_id: str = "anonymous",
    title: str = "New conversation",
    model: str = "gemini-2.0-flash",
    provider: str = "gemini",
) -> Optional[dict]:
    if not _db_pool:
        return None
    try:
        async with _db_pool.acquire() as conn:
            row = await conn.fetchrow(
                """INSERT INTO conversations(user_id, title, model, provider)
                   VALUES($1,$2,$3,$4) RETURNING *""",
                user_id, title, model, provider
            )
        return dict(row)
    except Exception as e:
        logger.error("create_conversation failed: %s", e)
        return None


async def list_conversations(user_id: str = "anonymous", limit: int = 50) -> List[dict]:
    if not _db_pool:
        return []
    try:
        async with _db_pool.acquire() as conn:
            rows = await conn.fetch(
                "SELECT * FROM conversations WHERE user_id=$1 ORDER BY updated_at DESC LIMIT $2",
                user_id, limit
            )
        return [dict(r) for r in rows]
    except Exception as e:
        logger.error("list_conversations failed: %s", e)
        return []


async def get_conversation_messages(conv_id: str, limit: int = 40) -> List[dict]:
    if not _db_pool:
        return []
    try:
        async with _db_pool.acquire() as conn:
            rows = await conn.fetch(
                "SELECT * FROM messages WHERE conversation_id=$1 ORDER BY created_at ASC LIMIT $2",
                conv_id, limit
            )
        return [dict(r) for r in rows]
    except Exception as e:
        logger.error("get_conversation_messages failed: %s", e)
        return []


async def save_message(
    conv_id: str,
    role: str,
    content: str,
    provider: Optional[str] = None,
    model: Optional[str] = None,
) -> bool:
    if not _db_pool:
        return False
    try:
        async with _db_pool.acquire() as conn:
            await conn.execute(
                """INSERT INTO messages(conversation_id, role, content, provider, model)
                   VALUES($1,$2,$3,$4,$5)""",
                conv_id, role, content, provider, model
            )
            await conn.execute(
                "UPDATE conversations SET updated_at=NOW() WHERE id=$1", conv_id
            )
        return True
    except Exception as e:
        logger.error("save_message failed: %s", e)
        return False


async def save_execution(
    conv_id: Optional[str],
    language: str,
    code: str,
    output: str,
    error: str,
    exit_code: int,
    duration_ms: int,
) -> bool:
    if not _db_pool:
        return False
    try:
        async with _db_pool.acquire() as conn:
            await conn.execute(
                """INSERT INTO executions(conversation_id, language, code, output, error, exit_code, duration_ms)
                   VALUES($1,$2,$3,$4,$5,$6,$7)""",
                conv_id, language, code, output, error, exit_code, duration_ms
            )
        return True
    except Exception as e:
        logger.error("save_execution failed: %s", e)
        return False


async def delete_conversation(conv_id: str) -> bool:
    if not _db_pool:
        return False
    try:
        async with _db_pool.acquire() as conn:
            await conn.execute("DELETE FROM conversations WHERE id=$1", conv_id)
        return True
    except Exception as e:
        logger.error("delete_conversation failed: %s", e)
        return False


# ─── Redis helpers ────────────────────────────────────────────────────────────

async def publish_event(room: str, event: dict) -> bool:
    if not _redis:
        return False
    try:
        await _redis.publish(f"room:{room}", json.dumps(event))
        return True
    except Exception as e:
        logger.warning("Redis publish failed: %s", e)
        return False


async def redis_ping() -> bool:
    if not _redis:
        return False
    try:
        await _redis.ping()
        return True
    except Exception:
        return False


def db_connected() -> bool:
    return _db_pool is not None


def redis_connected() -> bool:
    return _redis is not None


def get_redis():
    return _redis


def get_db():
    return _db_pool