personabot-api / app /services /conversation_store.py
GitHub Actions
Deploy 583b552
4ef165a
"""
backend/app/services/conversation_store.py
SQLite-backed per-session conversation history with progressive summarisation.
Stage 2 additions:
- A `conversation_summaries` table stores one rolling summary paragraph per
session. After each completed turn, GeminiClient.update_conversation_summary()
is called asynchronously and the result is persisted here.
- get_recent() is unchanged (raw turns still available for the 3-turn fallback).
- get_summary() / set_summary() are thin wrappers on the new table.
The raw `interactions` table is still the source of truth for reranker training.
Summaries are only for live context injection and have no training significance.
"""
from __future__ import annotations
import json
import logging
import sqlite3
from datetime import datetime, timezone
logger = logging.getLogger(__name__)
_ANSWER_PREVIEW_LEN = 120
_DEFAULT_MAX_TURNS = 3
class ConversationStore:
"""
Thin read/write layer over SQLite for session history and rolling summaries.
One instance is created at startup and shared across all requests via app.state.
"""
def __init__(self, db_path: str, github_log=None) -> None:
self._db_path = db_path
self._github_log = github_log
self._ensure_summary_table()
def _ensure_summary_table(self) -> None:
"""Create the conversation_summaries table idempotently at startup."""
import os
db_dir = os.path.dirname(self._db_path)
if db_dir:
os.makedirs(db_dir, exist_ok=True)
try:
with sqlite3.connect(self._db_path) as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS conversation_summaries (
session_id TEXT PRIMARY KEY,
summary TEXT NOT NULL DEFAULT '',
updated_at TEXT NOT NULL
)
"""
)
except Exception as exc:
logger.warning("Could not create conversation_summaries table: %s", exc)
def get_summary(self, session_id: str) -> str:
"""Return the rolling summary for this session, or '' if none exists."""
try:
with sqlite3.connect(self._db_path) as conn:
row = conn.execute(
"SELECT summary FROM conversation_summaries WHERE session_id = ?",
(session_id,),
).fetchone()
return row[0] if row else ""
except Exception as exc:
logger.warning("get_summary failed: %s", exc)
return ""
def set_summary(self, session_id: str, summary: str) -> None:
"""Upsert the rolling summary for this session."""
try:
with sqlite3.connect(self._db_path) as conn:
conn.execute(
"""
INSERT INTO conversation_summaries (session_id, summary, updated_at)
VALUES (?, ?, ?)
ON CONFLICT(session_id) DO UPDATE SET
summary = excluded.summary,
updated_at = excluded.updated_at
""",
(session_id, summary, datetime.now(tz=timezone.utc).isoformat()),
)
except Exception as exc:
logger.warning("set_summary failed: %s", exc)
def get_recent(self, session_id: str, max_turns: int = _DEFAULT_MAX_TURNS) -> list[dict]:
"""
Return the last `max_turns` completed Q/A pairs for `session_id`,
oldest first. Each entry: {"q": str, "a": str}.
"""
try:
with sqlite3.connect(self._db_path) as conn:
rows = conn.execute(
"""
SELECT query, answer FROM interactions
WHERE session_id = ? AND answer != ''
ORDER BY id DESC
LIMIT ?
""",
(session_id, max_turns),
).fetchall()
except sqlite3.OperationalError:
return []
except Exception as exc:
logger.warning("ConversationStore.get_recent failed: %s", exc)
return []
turns = []
for query, answer in reversed(rows):
a_preview = answer[:_ANSWER_PREVIEW_LEN]
if len(answer) > _ANSWER_PREVIEW_LEN:
a_preview += "\u2026"
turns.append({"q": query, "a": a_preview})
return turns
def mark_last_negative(self, session_id: str) -> None:
"""Set feedback=-1 on the most recent interaction for this session."""
try:
with sqlite3.connect(self._db_path) as conn:
conn.execute(
"""
UPDATE interactions SET feedback = -1
WHERE id = (
SELECT id FROM interactions
WHERE session_id = ?
ORDER BY id DESC
LIMIT 1
)
""",
(session_id,),
)
except Exception as exc:
logger.warning("ConversationStore.mark_last_negative SQLite failed: %s", exc)
if self._github_log is not None:
self._github_log.append_feedback(session_id, feedback=-1)
def populate_from_records(self, records: list[dict]) -> None:
"""
Replay interaction records from the durable GitHub log into SQLite.
Called at startup when SQLite is empty after a Space restart.
"""
import os
db_dir = os.path.dirname(self._db_path)
if db_dir:
os.makedirs(db_dir, exist_ok=True)
interaction_records = [
r for r in records
if r.get("type") != "feedback" and r.get("query")
]
if not interaction_records:
return
try:
with sqlite3.connect(self._db_path) as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS interactions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT,
session_id TEXT,
query TEXT,
answer TEXT,
chunks_used TEXT,
rerank_scores TEXT,
reranked_chunks_json TEXT,
latency_ms INTEGER,
cached BOOLEAN,
feedback INTEGER DEFAULT 0,
path TEXT DEFAULT 'rag'
)
"""
)
feedback_corrections: dict[str, int] = {}
for r in records:
if r.get("type") == "feedback":
feedback_corrections[r["session_id"]] = r.get("feedback", 0)
for r in interaction_records:
sid = r.get("session_id", "")
feedback = feedback_corrections.get(sid, r.get("feedback", 0))
conn.execute(
"""
INSERT INTO interactions
(timestamp, session_id, query, answer, chunks_used,
rerank_scores, reranked_chunks_json, latency_ms, cached, feedback, path)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
r.get("timestamp", datetime.now(tz=timezone.utc).isoformat()),
sid,
r.get("query", ""),
r.get("answer", ""),
json.dumps(r.get("chunks_used", [])),
json.dumps(r.get("rerank_scores", [])),
json.dumps(r.get("reranked_chunks_json", [])),
r.get("latency_ms", 0),
r.get("cached", False),
feedback,
r.get("path", "rag"),
),
)
logger.info(
"Reconstructed %d interactions from durable GitHub log into SQLite.",
len(interaction_records),
)
except Exception as exc:
logger.warning("ConversationStore.populate_from_records failed: %s", exc)