Spaces:
Running
Running
File size: 8,644 Bytes
65543f1 4ef165a 65543f1 3d134a6 65543f1 3d134a6 65543f1 4ef165a 65543f1 3d134a6 65543f1 3d134a6 4ef165a 65543f1 4ef165a 65543f1 4ef165a 65543f1 4ef165a 65543f1 3d134a6 4ef165a 3d134a6 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 | """
backend/app/services/conversation_store.py
SQLite-backed per-session conversation history with progressive summarisation.
Stage 2 additions:
- A `conversation_summaries` table stores one rolling summary paragraph per
session. After each completed turn, GeminiClient.update_conversation_summary()
is called asynchronously and the result is persisted here.
- get_recent() is unchanged (raw turns still available for the 3-turn fallback).
- get_summary() / set_summary() are thin wrappers on the new table.
The raw `interactions` table is still the source of truth for reranker training.
Summaries are only for live context injection and have no training significance.
"""
from __future__ import annotations
import json
import logging
import sqlite3
from datetime import datetime, timezone
logger = logging.getLogger(__name__)
_ANSWER_PREVIEW_LEN = 120
_DEFAULT_MAX_TURNS = 3
class ConversationStore:
"""
Thin read/write layer over SQLite for session history and rolling summaries.
One instance is created at startup and shared across all requests via app.state.
"""
def __init__(self, db_path: str, github_log=None) -> None:
self._db_path = db_path
self._github_log = github_log
self._ensure_summary_table()
def _ensure_summary_table(self) -> None:
"""Create the conversation_summaries table idempotently at startup."""
import os
db_dir = os.path.dirname(self._db_path)
if db_dir:
os.makedirs(db_dir, exist_ok=True)
try:
with sqlite3.connect(self._db_path) as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS conversation_summaries (
session_id TEXT PRIMARY KEY,
summary TEXT NOT NULL DEFAULT '',
updated_at TEXT NOT NULL
)
"""
)
except Exception as exc:
logger.warning("Could not create conversation_summaries table: %s", exc)
def get_summary(self, session_id: str) -> str:
"""Return the rolling summary for this session, or '' if none exists."""
try:
with sqlite3.connect(self._db_path) as conn:
row = conn.execute(
"SELECT summary FROM conversation_summaries WHERE session_id = ?",
(session_id,),
).fetchone()
return row[0] if row else ""
except Exception as exc:
logger.warning("get_summary failed: %s", exc)
return ""
def set_summary(self, session_id: str, summary: str) -> None:
"""Upsert the rolling summary for this session."""
try:
with sqlite3.connect(self._db_path) as conn:
conn.execute(
"""
INSERT INTO conversation_summaries (session_id, summary, updated_at)
VALUES (?, ?, ?)
ON CONFLICT(session_id) DO UPDATE SET
summary = excluded.summary,
updated_at = excluded.updated_at
""",
(session_id, summary, datetime.now(tz=timezone.utc).isoformat()),
)
except Exception as exc:
logger.warning("set_summary failed: %s", exc)
def get_recent(self, session_id: str, max_turns: int = _DEFAULT_MAX_TURNS) -> list[dict]:
"""
Return the last `max_turns` completed Q/A pairs for `session_id`,
oldest first. Each entry: {"q": str, "a": str}.
"""
try:
with sqlite3.connect(self._db_path) as conn:
rows = conn.execute(
"""
SELECT query, answer FROM interactions
WHERE session_id = ? AND answer != ''
ORDER BY id DESC
LIMIT ?
""",
(session_id, max_turns),
).fetchall()
except sqlite3.OperationalError:
return []
except Exception as exc:
logger.warning("ConversationStore.get_recent failed: %s", exc)
return []
turns = []
for query, answer in reversed(rows):
a_preview = answer[:_ANSWER_PREVIEW_LEN]
if len(answer) > _ANSWER_PREVIEW_LEN:
a_preview += "\u2026"
turns.append({"q": query, "a": a_preview})
return turns
def mark_last_negative(self, session_id: str) -> None:
"""Set feedback=-1 on the most recent interaction for this session."""
try:
with sqlite3.connect(self._db_path) as conn:
conn.execute(
"""
UPDATE interactions SET feedback = -1
WHERE id = (
SELECT id FROM interactions
WHERE session_id = ?
ORDER BY id DESC
LIMIT 1
)
""",
(session_id,),
)
except Exception as exc:
logger.warning("ConversationStore.mark_last_negative SQLite failed: %s", exc)
if self._github_log is not None:
self._github_log.append_feedback(session_id, feedback=-1)
def populate_from_records(self, records: list[dict]) -> None:
"""
Replay interaction records from the durable GitHub log into SQLite.
Called at startup when SQLite is empty after a Space restart.
"""
import os
db_dir = os.path.dirname(self._db_path)
if db_dir:
os.makedirs(db_dir, exist_ok=True)
interaction_records = [
r for r in records
if r.get("type") != "feedback" and r.get("query")
]
if not interaction_records:
return
try:
with sqlite3.connect(self._db_path) as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS interactions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT,
session_id TEXT,
query TEXT,
answer TEXT,
chunks_used TEXT,
rerank_scores TEXT,
reranked_chunks_json TEXT,
latency_ms INTEGER,
cached BOOLEAN,
feedback INTEGER DEFAULT 0,
path TEXT DEFAULT 'rag'
)
"""
)
feedback_corrections: dict[str, int] = {}
for r in records:
if r.get("type") == "feedback":
feedback_corrections[r["session_id"]] = r.get("feedback", 0)
for r in interaction_records:
sid = r.get("session_id", "")
feedback = feedback_corrections.get(sid, r.get("feedback", 0))
conn.execute(
"""
INSERT INTO interactions
(timestamp, session_id, query, answer, chunks_used,
rerank_scores, reranked_chunks_json, latency_ms, cached, feedback, path)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
r.get("timestamp", datetime.now(tz=timezone.utc).isoformat()),
sid,
r.get("query", ""),
r.get("answer", ""),
json.dumps(r.get("chunks_used", [])),
json.dumps(r.get("rerank_scores", [])),
json.dumps(r.get("reranked_chunks_json", [])),
r.get("latency_ms", 0),
r.get("cached", False),
feedback,
r.get("path", "rag"),
),
)
logger.info(
"Reconstructed %d interactions from durable GitHub log into SQLite.",
len(interaction_records),
)
except Exception as exc:
logger.warning("ConversationStore.populate_from_records failed: %s", exc)
|