import sqlite3 import json import os import lancedb from typing import List, Dict, Optional from datetime import datetime from app.services.llm import llm_service class MemoryService: DB_PATH = "data/memory.db" LANCE_PATH = "data/memory_lancedb" EPISODE_TABLE = "episodes" def __init__(self): self._init_sqlite() self._init_lancedb() def _init_sqlite(self): """Initialize the SQLite database with necessary tables.""" if not os.path.exists("data"): os.makedirs("data") conn = sqlite3.connect(self.DB_PATH) cursor = conn.cursor() # Table for conversation history (short-term memory) cursor.execute(''' CREATE TABLE IF NOT EXISTS interactions ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id TEXT NOT NULL, role TEXT NOT NULL, content TEXT NOT NULL, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP ) ''') # Table for facts/long-term memory cursor.execute(''' CREATE TABLE IF NOT EXISTS facts ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id TEXT NOT NULL, fact TEXT NOT NULL, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP ) ''') conn.commit() conn.close() def _init_lancedb(self): """Initialize LanceDB for episodic memory.""" self.db = lancedb.connect(self.LANCE_PATH) async def add_interaction(self, user_id: str, role: str, content: str): """Add a message interactions to the short-term memory.""" conn = sqlite3.connect(self.DB_PATH) cursor = conn.cursor() cursor.execute( 'INSERT INTO interactions (user_id, role, content) VALUES (?, ?, ?)', (user_id, role, content) ) conn.commit() conn.close() async def get_short_term_memory(self, user_id: str, limit: int = 10) -> List[Dict[str, str]]: """Retrieve recent interactions for a user.""" conn = sqlite3.connect(self.DB_PATH) conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute( 'SELECT role, content FROM interactions WHERE user_id = ? ORDER BY id DESC LIMIT ?', (user_id, limit) ) rows = cursor.fetchall() conn.close() return [{"role": row["role"], "content": row["content"]} for row in reversed(rows)] async def add_fact(self, user_id: str, fact: str): """Add a specific fact to long-term memory.""" conn = sqlite3.connect(self.DB_PATH) cursor = conn.cursor() cursor.execute( 'INSERT INTO facts (user_id, fact) VALUES (?, ?)', (user_id, fact) ) conn.commit() conn.close() async def get_long_term_memory(self, user_id: str) -> List[str]: """Retrieve all stored facts for a user.""" conn = sqlite3.connect(self.DB_PATH) conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute( 'SELECT fact FROM facts WHERE user_id = ? ORDER BY timestamp DESC', (user_id,) ) rows = cursor.fetchall() conn.close() return [row["fact"] for row in rows] # --- EPISODIC MEMORY (Phase 2 Upgrade) --- async def store_episode(self, user_id: str, content: str, insight: str, emotion: str = "neutral"): """ Summarizes a session/interaction into an 'Episode' and stores in LanceDB. """ # 1. Generate Embedding for the insight/content combined_text = f"Episode: {content} Insight: {insight} Emotion: {emotion}" vector = await llm_service.get_embedding(combined_text) if not vector: print("MemoryService: Failed to generate embedding for episode.") return # 2. Data to store data = [{ "vector": vector, "user_id": user_id, "content": content, "insight": insight, "emotion": emotion, "timestamp": datetime.now().isoformat() }] # 3. Create or Update Table if self.EPISODE_TABLE in self.db.table_names(): tbl = self.db.open_table(self.EPISODE_TABLE) tbl.add(data) else: self.db.create_table(self.EPISODE_TABLE, data=data) print(f"MemoryService: Episode stored for user {user_id}") async def retrieve_episodes(self, user_id: str, query: str, limit: int = 3) -> List[Dict]: """ Retrieves relevant spiritual episodes using semantic search. """ if self.EPISODE_TABLE not in self.db.table_names(): return [] query_vec = await llm_service.get_embedding(query) if not query_vec: return [] try: tbl = self.db.open_table(self.EPISODE_TABLE) # Filter by user_id results = (tbl.search(query_vec) .where(f"user_id = '{user_id}'", prefilter=True) .limit(limit) .to_list()) return results except Exception as e: print(f"MemoryService: Search error: {str(e)}") return [] # --- Legacy Compatibility --- async def get_history(self, session_id: str) -> List[Dict[str, str]]: return await self.get_short_term_memory(session_id, limit=50) async def add_message(self, session_id: str, role: str, content: str): await self.add_interaction(session_id, role, content) # Singleton instance memory_service = MemoryService()