Spaces:
Sleeping
Sleeping
File size: 5,781 Bytes
5e0532d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
import sqlite3
import json
import os
import lancedb
from typing import List, Dict, Optional
from datetime import datetime
from app.services.llm import llm_service
class MemoryService:
DB_PATH = "data/memory.db"
LANCE_PATH = "data/memory_lancedb"
EPISODE_TABLE = "episodes"
def __init__(self):
self._init_sqlite()
self._init_lancedb()
def _init_sqlite(self):
"""Initialize the SQLite database with necessary tables."""
if not os.path.exists("data"):
os.makedirs("data")
conn = sqlite3.connect(self.DB_PATH)
cursor = conn.cursor()
# Table for conversation history (short-term memory)
cursor.execute('''
CREATE TABLE IF NOT EXISTS interactions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
# Table for facts/long-term memory
cursor.execute('''
CREATE TABLE IF NOT EXISTS facts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
fact TEXT NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
def _init_lancedb(self):
"""Initialize LanceDB for episodic memory."""
self.db = lancedb.connect(self.LANCE_PATH)
async def add_interaction(self, user_id: str, role: str, content: str):
"""Add a message interactions to the short-term memory."""
conn = sqlite3.connect(self.DB_PATH)
cursor = conn.cursor()
cursor.execute(
'INSERT INTO interactions (user_id, role, content) VALUES (?, ?, ?)',
(user_id, role, content)
)
conn.commit()
conn.close()
async def get_short_term_memory(self, user_id: str, limit: int = 10) -> List[Dict[str, str]]:
"""Retrieve recent interactions for a user."""
conn = sqlite3.connect(self.DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute(
'SELECT role, content FROM interactions WHERE user_id = ? ORDER BY id DESC LIMIT ?',
(user_id, limit)
)
rows = cursor.fetchall()
conn.close()
return [{"role": row["role"], "content": row["content"]} for row in reversed(rows)]
async def add_fact(self, user_id: str, fact: str):
"""Add a specific fact to long-term memory."""
conn = sqlite3.connect(self.DB_PATH)
cursor = conn.cursor()
cursor.execute(
'INSERT INTO facts (user_id, fact) VALUES (?, ?)',
(user_id, fact)
)
conn.commit()
conn.close()
async def get_long_term_memory(self, user_id: str) -> List[str]:
"""Retrieve all stored facts for a user."""
conn = sqlite3.connect(self.DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute(
'SELECT fact FROM facts WHERE user_id = ? ORDER BY timestamp DESC',
(user_id,)
)
rows = cursor.fetchall()
conn.close()
return [row["fact"] for row in rows]
# --- EPISODIC MEMORY (Phase 2 Upgrade) ---
async def store_episode(self, user_id: str, content: str, insight: str, emotion: str = "neutral"):
"""
Summarizes a session/interaction into an 'Episode' and stores in LanceDB.
"""
# 1. Generate Embedding for the insight/content
combined_text = f"Episode: {content} Insight: {insight} Emotion: {emotion}"
vector = await llm_service.get_embedding(combined_text)
if not vector:
print("MemoryService: Failed to generate embedding for episode.")
return
# 2. Data to store
data = [{
"vector": vector,
"user_id": user_id,
"content": content,
"insight": insight,
"emotion": emotion,
"timestamp": datetime.now().isoformat()
}]
# 3. Create or Update Table
if self.EPISODE_TABLE in self.db.table_names():
tbl = self.db.open_table(self.EPISODE_TABLE)
tbl.add(data)
else:
self.db.create_table(self.EPISODE_TABLE, data=data)
print(f"MemoryService: Episode stored for user {user_id}")
async def retrieve_episodes(self, user_id: str, query: str, limit: int = 3) -> List[Dict]:
"""
Retrieves relevant spiritual episodes using semantic search.
"""
if self.EPISODE_TABLE not in self.db.table_names():
return []
query_vec = await llm_service.get_embedding(query)
if not query_vec:
return []
try:
tbl = self.db.open_table(self.EPISODE_TABLE)
# Filter by user_id
results = (tbl.search(query_vec)
.where(f"user_id = '{user_id}'", prefilter=True)
.limit(limit)
.to_list())
return results
except Exception as e:
print(f"MemoryService: Search error: {str(e)}")
return []
# --- Legacy Compatibility ---
async def get_history(self, session_id: str) -> List[Dict[str, str]]:
return await self.get_short_term_memory(session_id, limit=50)
async def add_message(self, session_id: str, role: str, content: str):
await self.add_interaction(session_id, role, content)
# Singleton instance
memory_service = MemoryService()
|