Spaces:
Running
Running
| """ | |
| backend/app/services/conversation_store.py | |
| SQLite-backed per-session conversation history with progressive summarisation. | |
| Stage 2 additions: | |
| - A `conversation_summaries` table stores one rolling summary paragraph per | |
| session. After each completed turn, GeminiClient.update_conversation_summary() | |
| is called asynchronously and the result is persisted here. | |
| - get_recent() is unchanged (raw turns still available for the 3-turn fallback). | |
| - get_summary() / set_summary() are thin wrappers on the new table. | |
| The raw `interactions` table is still the source of truth for reranker training. | |
| Summaries are only for live context injection and have no training significance. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import logging | |
| import sqlite3 | |
| from datetime import datetime, timezone | |
| logger = logging.getLogger(__name__) | |
| _ANSWER_PREVIEW_LEN = 120 | |
| _DEFAULT_MAX_TURNS = 3 | |
| class ConversationStore: | |
| """ | |
| Thin read/write layer over SQLite for session history and rolling summaries. | |
| One instance is created at startup and shared across all requests via app.state. | |
| """ | |
| def __init__(self, db_path: str, github_log=None) -> None: | |
| self._db_path = db_path | |
| self._github_log = github_log | |
| self._ensure_summary_table() | |
| def _ensure_summary_table(self) -> None: | |
| """Create the conversation_summaries table idempotently at startup.""" | |
| import os | |
| db_dir = os.path.dirname(self._db_path) | |
| if db_dir: | |
| os.makedirs(db_dir, exist_ok=True) | |
| try: | |
| with sqlite3.connect(self._db_path) as conn: | |
| conn.execute( | |
| """ | |
| CREATE TABLE IF NOT EXISTS conversation_summaries ( | |
| session_id TEXT PRIMARY KEY, | |
| summary TEXT NOT NULL DEFAULT '', | |
| updated_at TEXT NOT NULL | |
| ) | |
| """ | |
| ) | |
| except Exception as exc: | |
| logger.warning("Could not create conversation_summaries table: %s", exc) | |
| def get_summary(self, session_id: str) -> str: | |
| """Return the rolling summary for this session, or '' if none exists.""" | |
| try: | |
| with sqlite3.connect(self._db_path) as conn: | |
| row = conn.execute( | |
| "SELECT summary FROM conversation_summaries WHERE session_id = ?", | |
| (session_id,), | |
| ).fetchone() | |
| return row[0] if row else "" | |
| except Exception as exc: | |
| logger.warning("get_summary failed: %s", exc) | |
| return "" | |
| def set_summary(self, session_id: str, summary: str) -> None: | |
| """Upsert the rolling summary for this session.""" | |
| try: | |
| with sqlite3.connect(self._db_path) as conn: | |
| conn.execute( | |
| """ | |
| INSERT INTO conversation_summaries (session_id, summary, updated_at) | |
| VALUES (?, ?, ?) | |
| ON CONFLICT(session_id) DO UPDATE SET | |
| summary = excluded.summary, | |
| updated_at = excluded.updated_at | |
| """, | |
| (session_id, summary, datetime.now(tz=timezone.utc).isoformat()), | |
| ) | |
| except Exception as exc: | |
| logger.warning("set_summary failed: %s", exc) | |
| def get_recent(self, session_id: str, max_turns: int = _DEFAULT_MAX_TURNS) -> list[dict]: | |
| """ | |
| Return the last `max_turns` completed Q/A pairs for `session_id`, | |
| oldest first. Each entry: {"q": str, "a": str}. | |
| """ | |
| try: | |
| with sqlite3.connect(self._db_path) as conn: | |
| rows = conn.execute( | |
| """ | |
| SELECT query, answer FROM interactions | |
| WHERE session_id = ? AND answer != '' | |
| ORDER BY id DESC | |
| LIMIT ? | |
| """, | |
| (session_id, max_turns), | |
| ).fetchall() | |
| except sqlite3.OperationalError: | |
| return [] | |
| except Exception as exc: | |
| logger.warning("ConversationStore.get_recent failed: %s", exc) | |
| return [] | |
| turns = [] | |
| for query, answer in reversed(rows): | |
| a_preview = answer[:_ANSWER_PREVIEW_LEN] | |
| if len(answer) > _ANSWER_PREVIEW_LEN: | |
| a_preview += "\u2026" | |
| turns.append({"q": query, "a": a_preview}) | |
| return turns | |
| def mark_last_negative(self, session_id: str) -> None: | |
| """Set feedback=-1 on the most recent interaction for this session.""" | |
| try: | |
| with sqlite3.connect(self._db_path) as conn: | |
| conn.execute( | |
| """ | |
| UPDATE interactions SET feedback = -1 | |
| WHERE id = ( | |
| SELECT id FROM interactions | |
| WHERE session_id = ? | |
| ORDER BY id DESC | |
| LIMIT 1 | |
| ) | |
| """, | |
| (session_id,), | |
| ) | |
| except Exception as exc: | |
| logger.warning("ConversationStore.mark_last_negative SQLite failed: %s", exc) | |
| if self._github_log is not None: | |
| self._github_log.append_feedback(session_id, feedback=-1) | |
| def populate_from_records(self, records: list[dict]) -> None: | |
| """ | |
| Replay interaction records from the durable GitHub log into SQLite. | |
| Called at startup when SQLite is empty after a Space restart. | |
| """ | |
| import os | |
| db_dir = os.path.dirname(self._db_path) | |
| if db_dir: | |
| os.makedirs(db_dir, exist_ok=True) | |
| interaction_records = [ | |
| r for r in records | |
| if r.get("type") != "feedback" and r.get("query") | |
| ] | |
| if not interaction_records: | |
| return | |
| try: | |
| with sqlite3.connect(self._db_path) as conn: | |
| conn.execute( | |
| """ | |
| CREATE TABLE IF NOT EXISTS interactions ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| timestamp TEXT, | |
| session_id TEXT, | |
| query TEXT, | |
| answer TEXT, | |
| chunks_used TEXT, | |
| rerank_scores TEXT, | |
| reranked_chunks_json TEXT, | |
| latency_ms INTEGER, | |
| cached BOOLEAN, | |
| feedback INTEGER DEFAULT 0, | |
| path TEXT DEFAULT 'rag' | |
| ) | |
| """ | |
| ) | |
| feedback_corrections: dict[str, int] = {} | |
| for r in records: | |
| if r.get("type") == "feedback": | |
| feedback_corrections[r["session_id"]] = r.get("feedback", 0) | |
| for r in interaction_records: | |
| sid = r.get("session_id", "") | |
| feedback = feedback_corrections.get(sid, r.get("feedback", 0)) | |
| conn.execute( | |
| """ | |
| INSERT INTO interactions | |
| (timestamp, session_id, query, answer, chunks_used, | |
| rerank_scores, reranked_chunks_json, latency_ms, cached, feedback, path) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, | |
| ( | |
| r.get("timestamp", datetime.now(tz=timezone.utc).isoformat()), | |
| sid, | |
| r.get("query", ""), | |
| r.get("answer", ""), | |
| json.dumps(r.get("chunks_used", [])), | |
| json.dumps(r.get("rerank_scores", [])), | |
| json.dumps(r.get("reranked_chunks_json", [])), | |
| r.get("latency_ms", 0), | |
| r.get("cached", False), | |
| feedback, | |
| r.get("path", "rag"), | |
| ), | |
| ) | |
| logger.info( | |
| "Reconstructed %d interactions from durable GitHub log into SQLite.", | |
| len(interaction_records), | |
| ) | |
| except Exception as exc: | |
| logger.warning("ConversationStore.populate_from_records failed: %s", exc) | |