""" backend/app/services/conversation_store.py SQLite-backed per-session conversation history with progressive summarisation. Stage 2 additions: - A `conversation_summaries` table stores one rolling summary paragraph per session. After each completed turn, GeminiClient.update_conversation_summary() is called asynchronously and the result is persisted here. - get_recent() is unchanged (raw turns still available for the 3-turn fallback). - get_summary() / set_summary() are thin wrappers on the new table. The raw `interactions` table is still the source of truth for reranker training. Summaries are only for live context injection and have no training significance. """ from __future__ import annotations import json import logging import sqlite3 from datetime import datetime, timezone logger = logging.getLogger(__name__) _ANSWER_PREVIEW_LEN = 120 _DEFAULT_MAX_TURNS = 3 class ConversationStore: """ Thin read/write layer over SQLite for session history and rolling summaries. One instance is created at startup and shared across all requests via app.state. """ def __init__(self, db_path: str, github_log=None) -> None: self._db_path = db_path self._github_log = github_log self._ensure_summary_table() def _ensure_summary_table(self) -> None: """Create the conversation_summaries table idempotently at startup.""" import os db_dir = os.path.dirname(self._db_path) if db_dir: os.makedirs(db_dir, exist_ok=True) try: with sqlite3.connect(self._db_path) as conn: conn.execute( """ CREATE TABLE IF NOT EXISTS conversation_summaries ( session_id TEXT PRIMARY KEY, summary TEXT NOT NULL DEFAULT '', updated_at TEXT NOT NULL ) """ ) except Exception as exc: logger.warning("Could not create conversation_summaries table: %s", exc) def get_summary(self, session_id: str) -> str: """Return the rolling summary for this session, or '' if none exists.""" try: with sqlite3.connect(self._db_path) as conn: row = conn.execute( "SELECT summary FROM conversation_summaries WHERE session_id = ?", (session_id,), ).fetchone() return row[0] if row else "" except Exception as exc: logger.warning("get_summary failed: %s", exc) return "" def set_summary(self, session_id: str, summary: str) -> None: """Upsert the rolling summary for this session.""" try: with sqlite3.connect(self._db_path) as conn: conn.execute( """ INSERT INTO conversation_summaries (session_id, summary, updated_at) VALUES (?, ?, ?) ON CONFLICT(session_id) DO UPDATE SET summary = excluded.summary, updated_at = excluded.updated_at """, (session_id, summary, datetime.now(tz=timezone.utc).isoformat()), ) except Exception as exc: logger.warning("set_summary failed: %s", exc) def get_recent(self, session_id: str, max_turns: int = _DEFAULT_MAX_TURNS) -> list[dict]: """ Return the last `max_turns` completed Q/A pairs for `session_id`, oldest first. Each entry: {"q": str, "a": str}. """ try: with sqlite3.connect(self._db_path) as conn: rows = conn.execute( """ SELECT query, answer FROM interactions WHERE session_id = ? AND answer != '' ORDER BY id DESC LIMIT ? """, (session_id, max_turns), ).fetchall() except sqlite3.OperationalError: return [] except Exception as exc: logger.warning("ConversationStore.get_recent failed: %s", exc) return [] turns = [] for query, answer in reversed(rows): a_preview = answer[:_ANSWER_PREVIEW_LEN] if len(answer) > _ANSWER_PREVIEW_LEN: a_preview += "\u2026" turns.append({"q": query, "a": a_preview}) return turns def mark_last_negative(self, session_id: str) -> None: """Set feedback=-1 on the most recent interaction for this session.""" try: with sqlite3.connect(self._db_path) as conn: conn.execute( """ UPDATE interactions SET feedback = -1 WHERE id = ( SELECT id FROM interactions WHERE session_id = ? ORDER BY id DESC LIMIT 1 ) """, (session_id,), ) except Exception as exc: logger.warning("ConversationStore.mark_last_negative SQLite failed: %s", exc) if self._github_log is not None: self._github_log.append_feedback(session_id, feedback=-1) def populate_from_records(self, records: list[dict]) -> None: """ Replay interaction records from the durable GitHub log into SQLite. Called at startup when SQLite is empty after a Space restart. """ import os db_dir = os.path.dirname(self._db_path) if db_dir: os.makedirs(db_dir, exist_ok=True) interaction_records = [ r for r in records if r.get("type") != "feedback" and r.get("query") ] if not interaction_records: return try: with sqlite3.connect(self._db_path) as conn: conn.execute( """ CREATE TABLE IF NOT EXISTS interactions ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT, session_id TEXT, query TEXT, answer TEXT, chunks_used TEXT, rerank_scores TEXT, reranked_chunks_json TEXT, latency_ms INTEGER, cached BOOLEAN, feedback INTEGER DEFAULT 0, path TEXT DEFAULT 'rag' ) """ ) feedback_corrections: dict[str, int] = {} for r in records: if r.get("type") == "feedback": feedback_corrections[r["session_id"]] = r.get("feedback", 0) for r in interaction_records: sid = r.get("session_id", "") feedback = feedback_corrections.get(sid, r.get("feedback", 0)) conn.execute( """ INSERT INTO interactions (timestamp, session_id, query, answer, chunks_used, rerank_scores, reranked_chunks_json, latency_ms, cached, feedback, path) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( r.get("timestamp", datetime.now(tz=timezone.utc).isoformat()), sid, r.get("query", ""), r.get("answer", ""), json.dumps(r.get("chunks_used", [])), json.dumps(r.get("rerank_scores", [])), json.dumps(r.get("reranked_chunks_json", [])), r.get("latency_ms", 0), r.get("cached", False), feedback, r.get("path", "rag"), ), ) logger.info( "Reconstructed %d interactions from durable GitHub log into SQLite.", len(interaction_records), ) except Exception as exc: logger.warning("ConversationStore.populate_from_records failed: %s", exc)