Spaces:
Sleeping
Sleeping
File size: 23,094 Bytes
27946b1 82e3252 27946b1 82e3252 4f2f908 82e3252 4f2f908 82e3252 27946b1 82e3252 27946b1 82e3252 27946b1 82e3252 27946b1 38abeb1 82e3252 38abeb1 82e3252 38abeb1 27946b1 82e3252 27946b1 38abeb1 82e3252 27946b1 82e3252 4f2f908 82e3252 4f2f908 82e3252 53b2b9c 82e3252 4f2f908 82e3252 4f2f908 82e3252 4f2f908 82e3252 4f2f908 82e3252 4f2f908 82e3252 4f2f908 82e3252 27946b1 82e3252 27946b1 82e3252 27946b1 82e3252 27946b1 82e3252 27946b1 82e3252 27946b1 82e3252 27946b1 82e3252 27946b1 82e3252 27946b1 82e3252 27946b1 82e3252 27946b1 82e3252 27946b1 82e3252 27946b1 82e3252 27946b1 82e3252 38abeb1 82e3252 38abeb1 82e3252 38abeb1 71eab01 82e3252 38abeb1 82e3252 38abeb1 82e3252 38abeb1 82e3252 38abeb1 82e3252 38abeb1 82e3252 38abeb1 82e3252 38abeb1 82e3252 38abeb1 82e3252 38abeb1 82e3252 38abeb1 98d9b88 27946b1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 | # api/db.py
"""
Minimal PostgreSQL persistence for ClareVoice (pilot).
Uses psycopg2 SimpleConnectionPool + raw SQL.
Graceful degradation: if DATABASE_URL is unset or DB is unreachable,
all public functions silently no-op and the app continues normally.
Environment variable:
DATABASE_URL=postgres://user:password@host:5432/dbname
"""
from __future__ import annotations
import json
import logging
import os
import secrets
import uuid
from contextlib import contextmanager
from typing import Any, Dict, List, Optional
logger = logging.getLogger("clare.db")
DATABASE_URL: str = os.getenv("DATABASE_URL", "").strip()
# Module-level connection pool (None when DB is disabled)
_pool = None
def init_db() -> None:
"""
Called once on FastAPI startup.
Creates the connection pool and runs CREATE TABLE IF NOT EXISTS.
Safe to call when DATABASE_URL is absent — logs a warning and returns.
"""
global _pool
if not DATABASE_URL:
logger.warning("[db] DATABASE_URL not set — DB persistence disabled.")
return
try:
from psycopg2 import pool as pg_pool
_pool = pg_pool.SimpleConnectionPool(minconn=1, maxconn=10, dsn=DATABASE_URL)
logger.info("[db] Connection pool created.")
_create_tables()
logger.info("[db] Tables ready.")
except Exception as exc:
logger.error("[db] init failed — DB persistence disabled: %s", exc)
_pool = None
@contextmanager
def _get_conn():
"""Yield a pooled connection; commit on success, rollback on error."""
if _pool is None:
raise RuntimeError("DB pool not initialised")
conn = _pool.getconn()
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
_pool.putconn(conn)
# ---------------------------------------------------------------------------
# DDL
# ---------------------------------------------------------------------------
_CREATE_LOGIN_SESSIONS_SQL = """
CREATE TABLE IF NOT EXISTS login_sessions (
session_id TEXT PRIMARY KEY,
login_id TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS login_sessions_login_id_idx ON login_sessions (login_id);
"""
_CREATE_CHATS_SQL = """
CREATE TABLE IF NOT EXISTS chats (
chat_id TEXT PRIMARY KEY,
login_id TEXT NOT NULL,
name TEXT NOT NULL DEFAULT '',
chat_mode TEXT NOT NULL DEFAULT 'ask',
created_session_id TEXT REFERENCES login_sessions(session_id),
last_updated_session_id TEXT REFERENCES login_sessions(session_id),
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS chats_login_id_idx ON chats (login_id);
"""
_CREATE_INTERACTIONS_SQL = """
CREATE TABLE IF NOT EXISTS interactions (
id UUID PRIMARY KEY,
session_id TEXT NOT NULL REFERENCES login_sessions(session_id),
chat_id TEXT REFERENCES chats(chat_id) ON DELETE CASCADE,
login_id TEXT,
turn_index INTEGER NOT NULL,
user_message TEXT NOT NULL,
assistant_reply TEXT NOT NULL DEFAULT '',
learning_mode TEXT NOT NULL DEFAULT '',
total_tokens INTEGER NOT NULL DEFAULT 0,
estimated_cost DOUBLE PRECISION NOT NULL DEFAULT 0,
user_ts TIMESTAMPTZ,
first_token_ts TIMESTAMPTZ,
last_token_ts TIMESTAMPTZ,
suggestions_ts TIMESTAMPTZ,
doc_references TEXT[] NOT NULL DEFAULT '{}',
suggested_questions TEXT[] NOT NULL DEFAULT '{}',
error_flag BOOLEAN NOT NULL DEFAULT FALSE,
timeout_flag BOOLEAN NOT NULL DEFAULT FALSE,
run_id TEXT,
thumbs_rating TEXT,
free_text_feedback TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS interactions_session_id_idx ON interactions (session_id);
CREATE INDEX IF NOT EXISTS interactions_chat_id_idx ON interactions (chat_id);
CREATE INDEX IF NOT EXISTS interactions_login_id_idx ON interactions (login_id);
"""
_CREATE_SURVEY_RESPONSES_SQL = """
CREATE TABLE IF NOT EXISTS survey_responses (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
login_id TEXT,
submitted_at TIMESTAMPTZ NOT NULL DEFAULT now(),
q1 SMALLINT, q1_feedback TEXT,
q2 SMALLINT, q2_feedback TEXT,
q3 SMALLINT, q3_feedback TEXT,
q4 SMALLINT, q4_feedback TEXT,
q5 SMALLINT, q5_feedback TEXT,
q6 SMALLINT, q6_feedback TEXT,
q7 SMALLINT, q7_feedback TEXT,
q8 SMALLINT, q8_feedback TEXT,
q9 SMALLINT, q9_feedback TEXT,
q10 SMALLINT, q10_feedback TEXT,
q11 TEXT
);
CREATE INDEX IF NOT EXISTS survey_responses_login_id_idx ON survey_responses (login_id);
"""
def _create_tables() -> None:
with _get_conn() as conn:
with conn.cursor() as cur:
cur.execute(_CREATE_LOGIN_SESSIONS_SQL)
cur.execute(_CREATE_CHATS_SQL)
cur.execute(_CREATE_INTERACTIONS_SQL)
cur.execute(_CREATE_SURVEY_RESPONSES_SQL)
_migrate_schema()
def _migrate_schema() -> None:
"""Idempotent ALTER TABLE migrations for existing installations."""
with _get_conn() as conn:
with conn.cursor() as cur:
# Migrate chat_id UUID → TEXT (only if still UUID type)
cur.execute("""
DO $$
BEGIN
IF EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'chats'
AND column_name = 'chat_id'
AND data_type = 'uuid'
) THEN
ALTER TABLE interactions DROP CONSTRAINT IF EXISTS interactions_chat_id_fkey;
ALTER TABLE interactions ALTER COLUMN chat_id TYPE TEXT USING chat_id::TEXT;
ALTER TABLE chats ALTER COLUMN chat_id TYPE TEXT USING chat_id::TEXT;
END IF;
END $$;
""")
# Re-add FK if missing
cur.execute("""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_constraint WHERE conname = 'interactions_chat_id_fkey'
) THEN
ALTER TABLE interactions ADD CONSTRAINT interactions_chat_id_fkey
FOREIGN KEY (chat_id) REFERENCES chats(chat_id) ON DELETE CASCADE;
END IF;
END $$;
""")
# Drop removed columns if still present
cur.execute("ALTER TABLE interactions DROP COLUMN IF EXISTS latency_ms;")
cur.execute("ALTER TABLE interactions DROP COLUMN IF EXISTS retrieved_ids;")
cur.execute("ALTER TABLE interactions DROP COLUMN IF EXISTS cited_ids;")
# Add new columns if missing
cur.execute("ALTER TABLE interactions ADD COLUMN IF NOT EXISTS user_ts TIMESTAMPTZ;")
cur.execute("ALTER TABLE interactions ADD COLUMN IF NOT EXISTS first_token_ts TIMESTAMPTZ;")
cur.execute("ALTER TABLE interactions ADD COLUMN IF NOT EXISTS last_token_ts TIMESTAMPTZ;")
cur.execute("ALTER TABLE interactions ADD COLUMN IF NOT EXISTS suggestions_ts TIMESTAMPTZ;")
cur.execute("ALTER TABLE interactions ADD COLUMN IF NOT EXISTS doc_references TEXT[] NOT NULL DEFAULT '{}';")
cur.execute("ALTER TABLE interactions ADD COLUMN IF NOT EXISTS suggested_questions TEXT[] NOT NULL DEFAULT '{}';")
# Remove DEFAULT gen_random_uuid() from chats.chat_id if set
cur.execute("""
ALTER TABLE chats ALTER COLUMN chat_id DROP DEFAULT;
""")
# ---------------------------------------------------------------------------
# Public helpers
# ---------------------------------------------------------------------------
def upsert_session(
*,
session_id: str,
login_id: str,
learning_mode: str = "",
) -> None:
"""Create or update a login_session row. Safe no-op if DB is unavailable."""
if _pool is None:
return
try:
with _get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO login_sessions
(session_id, login_id)
VALUES (%s, %s)
ON CONFLICT (session_id) DO UPDATE SET
updated_at = now();
""",
(session_id, login_id),
)
except Exception as exc:
logger.error("[db] upsert_session failed: %s", exc)
# ---------------------------------------------------------------------------
# Chat CRUD stubs (schema ready; DB wiring deferred — localStorage is active store)
# ---------------------------------------------------------------------------
def create_chat(
*,
login_id: str,
name: str,
chat_mode: str,
created_session_id: Optional[str] = None,
) -> Optional[str]:
"""Insert a new chat row. Returns chat_id (8-char hex), or None if DB unavailable."""
if _pool is None:
return None
chat_id = secrets.token_hex(4)
try:
with _get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO chats (chat_id, login_id, name, chat_mode, created_session_id, last_updated_session_id)
VALUES (%s, %s, %s, %s, %s, %s);
""",
(chat_id, login_id, name, chat_mode, created_session_id, created_session_id),
)
return chat_id
except Exception as exc:
logger.error("[db] create_chat failed: %s", exc)
return None
def get_chats_for_user(login_id: str) -> List[Dict[str, Any]]:
"""Return all chat rows for a login_id, newest first."""
if _pool is None:
return []
try:
with _get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT chat_id, login_id, name, chat_mode,
created_session_id, last_updated_session_id,
created_at, updated_at
FROM chats
WHERE login_id = %s
ORDER BY updated_at DESC;
""",
(login_id,),
)
cols = [d[0] for d in cur.description]
return [dict(zip(cols, row)) for row in cur.fetchall()]
except Exception as exc:
logger.error("[db] get_chats_for_user failed: %s", exc)
return []
def rename_chat(
*,
chat_id: str,
name: str,
session_id: Optional[str] = None,
) -> None:
"""Rename a chat and update last_updated_session_id. Safe no-op if DB unavailable."""
if _pool is None:
return
try:
with _get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
UPDATE chats
SET name = %s,
last_updated_session_id = COALESCE(%s, last_updated_session_id),
updated_at = now()
WHERE chat_id = %s;
""",
(name, session_id, chat_id),
)
except Exception as exc:
logger.error("[db] rename_chat failed: %s", exc)
def delete_chat(*, chat_id: str) -> None:
"""Delete a chat row (interactions with this chat_id are set to NULL via FK). Safe no-op if DB unavailable."""
if _pool is None:
return
try:
with _get_conn() as conn:
with conn.cursor() as cur:
cur.execute("DELETE FROM chats WHERE chat_id = %s;", (chat_id,))
except Exception as exc:
logger.error("[db] delete_chat failed: %s", exc)
def insert_interaction(
*,
session_id: str,
chat_id: Optional[str] = None,
login_id: str,
user_message: str,
assistant_reply: str = "",
learning_mode: str = "",
total_tokens: int = 0,
estimated_cost: float = 0.0,
user_ts=None,
first_token_ts=None,
last_token_ts=None,
suggestions_ts=None,
doc_references: Optional[List[str]] = None,
suggested_questions: Optional[List[str]] = None,
error_flag: bool = False,
timeout_flag: bool = False,
run_id: Optional[str] = None,
) -> Optional[str]:
"""
Insert one interaction row. Returns the new UUID string, or None if DB unavailable.
"""
if _pool is None:
return None
interaction_id = str(uuid.uuid4())
try:
with _get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO interactions (
id, session_id, chat_id, login_id, turn_index,
user_message, assistant_reply, learning_mode,
total_tokens, estimated_cost,
user_ts, first_token_ts, last_token_ts, suggestions_ts,
doc_references, suggested_questions,
error_flag, timeout_flag, run_id
) VALUES (
%s, %s, %s, %s,
COALESCE((SELECT MAX(turn_index) + 1 FROM interactions WHERE chat_id = %s), 1),
%s, %s, %s,
%s, %s,
%s, %s, %s, %s,
%s, %s,
%s, %s, %s
);
""",
(
interaction_id, session_id, chat_id, login_id,
chat_id, # for the subquery
user_message, assistant_reply, learning_mode,
total_tokens, estimated_cost,
user_ts, first_token_ts, last_token_ts, suggestions_ts,
doc_references or [], suggested_questions or [],
error_flag, timeout_flag, run_id,
),
)
return interaction_id
except Exception as exc:
logger.error("[db] insert_interaction failed: %s", exc)
return None
def get_messages_for_chat(chat_id: str) -> List[Dict[str, Any]]:
"""
Return all interactions for a chat ordered by turn_index ASC.
Each row is expanded into [user_msg, assistant_msg] by the caller.
"""
if _pool is None:
return []
try:
with _get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, turn_index, user_message, assistant_reply,
user_ts, first_token_ts, last_token_ts, suggestions_ts,
doc_references, suggested_questions
FROM interactions
WHERE chat_id = %s
ORDER BY turn_index ASC;
""",
(chat_id,),
)
cols = [d[0] for d in cur.description]
return [dict(zip(cols, row)) for row in cur.fetchall()]
except Exception as exc:
logger.error("[db] get_messages_for_chat failed: %s", exc)
return []
def get_user_overview() -> List[Dict[str, Any]]:
"""Return one row per login_id with turn count, avg latency (from timestamps), avg survey rating."""
if _pool is None:
return []
try:
with _get_conn() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT
i.login_id,
COUNT(DISTINCT i.session_id) AS sessions,
COUNT(i.id) AS turns,
ROUND(AVG(
EXTRACT(EPOCH FROM (i.last_token_ts - i.user_ts)) * 1000
)::numeric, 0) AS avg_latency_ms,
(MAX(sr.submitted_at) IS NOT NULL) AS survey_completed,
ROUND(AVG(
CASE WHEN sr.q1 IS NOT NULL THEN
(sr.q1 + sr.q2 + sr.q3 + sr.q4 + sr.q5 +
sr.q6 + sr.q7 + sr.q8 + sr.q9 + sr.q10)::numeric / 10
END
), 2) AS avg_survey_rating,
MIN(i.created_at) AS first_turn_at,
MAX(i.created_at) AS last_turn_at,
ROUND(EXTRACT(EPOCH FROM (MAX(i.created_at) - MIN(i.created_at))) / 60, 1) AS duration_minutes
FROM interactions i
LEFT JOIN survey_responses sr ON sr.login_id = i.login_id
WHERE i.login_id IS NOT NULL
GROUP BY i.login_id
ORDER BY i.login_id;
""")
cols = [d[0] for d in cur.description]
return [dict(zip(cols, row)) for row in cur.fetchall()]
except Exception as exc:
logger.error("[db] get_user_overview failed: %s", exc)
return []
def get_interactions_for_user(login_id: str) -> List[Dict[str, Any]]:
"""Return all interaction rows for a login_id, ordered by time."""
if _pool is None:
return []
try:
with _get_conn() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT
id, session_id, turn_index,
user_message, assistant_reply,
learning_mode, total_tokens,
user_ts, first_token_ts, last_token_ts,
thumbs_rating, free_text_feedback, created_at
FROM interactions
WHERE login_id = %s
ORDER BY created_at ASC;
""", (login_id,))
cols = [d[0] for d in cur.description]
return [dict(zip(cols, row)) for row in cur.fetchall()]
except Exception as exc:
logger.error("[db] get_interactions_for_user failed: %s", exc)
return []
def insert_survey_response(*, login_id: Optional[str] = None, responses: Dict[str, Any]) -> Optional[str]:
"""Insert one survey response row. Returns the new UUID string, or None if DB unavailable."""
if _pool is None:
return None
row_id = str(uuid.uuid4())
try:
with _get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO survey_responses (
id, login_id,
q1, q1_feedback, q2, q2_feedback, q3, q3_feedback,
q4, q4_feedback, q5, q5_feedback, q6, q6_feedback,
q7, q7_feedback, q8, q8_feedback, q9, q9_feedback,
q10, q10_feedback, q11
) VALUES (
%s, %s,
%s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s,
%s, %s, %s
)
""",
(
row_id, login_id,
responses.get("q1"), responses.get("q1_feedback"),
responses.get("q2"), responses.get("q2_feedback"),
responses.get("q3"), responses.get("q3_feedback"),
responses.get("q4"), responses.get("q4_feedback"),
responses.get("q5"), responses.get("q5_feedback"),
responses.get("q6"), responses.get("q6_feedback"),
responses.get("q7"), responses.get("q7_feedback"),
responses.get("q8"), responses.get("q8_feedback"),
responses.get("q9"), responses.get("q9_feedback"),
responses.get("q10"), responses.get("q10_feedback"),
responses.get("q11"),
),
)
return row_id
except Exception as exc:
logger.error("[db] insert_survey_response failed: %s", exc)
return None
def get_history_for_session(session_id: str, max_turns: int = 10) -> List[tuple]:
"""
Return the last `max_turns` (user_message, assistant_reply) pairs for a session,
ordered by turn_index ASC. Returns [] if DB unavailable or no rows found.
"""
if _pool is None:
return []
try:
with _get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT user_message, assistant_reply
FROM interactions
WHERE session_id = %s
ORDER BY turn_index ASC
LIMIT %s
""",
(session_id, max_turns),
)
return list(cur.fetchall()) # List of (user_message, assistant_reply)
except Exception as exc:
logger.error("[db] get_history_for_session failed: %s", exc)
return []
def update_interaction_feedback(
*,
run_id: str,
thumbs_rating: str,
free_text_feedback: str = "",
) -> None:
"""
Update thumbs rating and free-text feedback on the matching interaction row.
Matched by run_id. Silent no-op if DB unavailable or run_id not found.
"""
if _pool is None or not run_id:
return
try:
with _get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
UPDATE interactions
SET thumbs_rating = %s,
free_text_feedback = %s
WHERE run_id = %s;
""",
(thumbs_rating, free_text_feedback, run_id),
)
except Exception as exc:
logger.error("[db] update_interaction_feedback failed: %s", exc)
|