Spaces:
Sleeping
Sleeping
| import sqlite3 | |
| import json | |
| import os | |
| import lancedb | |
| from typing import List, Dict, Optional | |
| from datetime import datetime | |
| from app.services.llm import llm_service | |
| class MemoryService: | |
| DB_PATH = "data/memory.db" | |
| LANCE_PATH = "data/memory_lancedb" | |
| EPISODE_TABLE = "episodes" | |
| def __init__(self): | |
| self._init_sqlite() | |
| self._init_lancedb() | |
| def _init_sqlite(self): | |
| """Initialize the SQLite database with necessary tables.""" | |
| if not os.path.exists("data"): | |
| os.makedirs("data") | |
| conn = sqlite3.connect(self.DB_PATH) | |
| cursor = conn.cursor() | |
| # Table for conversation history (short-term memory) | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS interactions ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| user_id TEXT NOT NULL, | |
| role TEXT NOT NULL, | |
| content TEXT NOT NULL, | |
| timestamp DATETIME DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| ''') | |
| # Table for facts/long-term memory | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS facts ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| user_id TEXT NOT NULL, | |
| fact TEXT NOT NULL, | |
| timestamp DATETIME DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| ''') | |
| conn.commit() | |
| conn.close() | |
| def _init_lancedb(self): | |
| """Initialize LanceDB for episodic memory.""" | |
| self.db = lancedb.connect(self.LANCE_PATH) | |
| async def add_interaction(self, user_id: str, role: str, content: str): | |
| """Add a message interactions to the short-term memory.""" | |
| conn = sqlite3.connect(self.DB_PATH) | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| 'INSERT INTO interactions (user_id, role, content) VALUES (?, ?, ?)', | |
| (user_id, role, content) | |
| ) | |
| conn.commit() | |
| conn.close() | |
| async def get_short_term_memory(self, user_id: str, limit: int = 10) -> List[Dict[str, str]]: | |
| """Retrieve recent interactions for a user.""" | |
| conn = sqlite3.connect(self.DB_PATH) | |
| conn.row_factory = sqlite3.Row | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| 'SELECT role, content FROM interactions WHERE user_id = ? ORDER BY id DESC LIMIT ?', | |
| (user_id, limit) | |
| ) | |
| rows = cursor.fetchall() | |
| conn.close() | |
| return [{"role": row["role"], "content": row["content"]} for row in reversed(rows)] | |
| async def add_fact(self, user_id: str, fact: str): | |
| """Add a specific fact to long-term memory.""" | |
| conn = sqlite3.connect(self.DB_PATH) | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| 'INSERT INTO facts (user_id, fact) VALUES (?, ?)', | |
| (user_id, fact) | |
| ) | |
| conn.commit() | |
| conn.close() | |
| async def get_long_term_memory(self, user_id: str) -> List[str]: | |
| """Retrieve all stored facts for a user.""" | |
| conn = sqlite3.connect(self.DB_PATH) | |
| conn.row_factory = sqlite3.Row | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| 'SELECT fact FROM facts WHERE user_id = ? ORDER BY timestamp DESC', | |
| (user_id,) | |
| ) | |
| rows = cursor.fetchall() | |
| conn.close() | |
| return [row["fact"] for row in rows] | |
| # --- EPISODIC MEMORY (Phase 2 Upgrade) --- | |
| async def store_episode(self, user_id: str, content: str, insight: str, emotion: str = "neutral"): | |
| """ | |
| Summarizes a session/interaction into an 'Episode' and stores in LanceDB. | |
| """ | |
| # 1. Generate Embedding for the insight/content | |
| combined_text = f"Episode: {content} Insight: {insight} Emotion: {emotion}" | |
| vector = await llm_service.get_embedding(combined_text) | |
| if not vector: | |
| print("MemoryService: Failed to generate embedding for episode.") | |
| return | |
| # 2. Data to store | |
| data = [{ | |
| "vector": vector, | |
| "user_id": user_id, | |
| "content": content, | |
| "insight": insight, | |
| "emotion": emotion, | |
| "timestamp": datetime.now().isoformat() | |
| }] | |
| # 3. Create or Update Table | |
| if self.EPISODE_TABLE in self.db.table_names(): | |
| tbl = self.db.open_table(self.EPISODE_TABLE) | |
| tbl.add(data) | |
| else: | |
| self.db.create_table(self.EPISODE_TABLE, data=data) | |
| print(f"MemoryService: Episode stored for user {user_id}") | |
| async def retrieve_episodes(self, user_id: str, query: str, limit: int = 3) -> List[Dict]: | |
| """ | |
| Retrieves relevant spiritual episodes using semantic search. | |
| """ | |
| if self.EPISODE_TABLE not in self.db.table_names(): | |
| return [] | |
| query_vec = await llm_service.get_embedding(query) | |
| if not query_vec: | |
| return [] | |
| try: | |
| tbl = self.db.open_table(self.EPISODE_TABLE) | |
| # Filter by user_id | |
| results = (tbl.search(query_vec) | |
| .where(f"user_id = '{user_id}'", prefilter=True) | |
| .limit(limit) | |
| .to_list()) | |
| return results | |
| except Exception as e: | |
| print(f"MemoryService: Search error: {str(e)}") | |
| return [] | |
| # --- Legacy Compatibility --- | |
| async def get_history(self, session_id: str) -> List[Dict[str, str]]: | |
| return await self.get_short_term_memory(session_id, limit=50) | |
| async def add_message(self, session_id: str, role: str, content: str): | |
| await self.add_interaction(session_id, role, content) | |
| # Singleton instance | |
| memory_service = MemoryService() | |