ORA / app /services /memory.py
Abdalkaderdev's picture
Initial ORA deployment
5e0532d
raw
history blame
5.78 kB
import sqlite3
import json
import os
import lancedb
from typing import List, Dict, Optional
from datetime import datetime
from app.services.llm import llm_service
class MemoryService:
DB_PATH = "data/memory.db"
LANCE_PATH = "data/memory_lancedb"
EPISODE_TABLE = "episodes"
def __init__(self):
self._init_sqlite()
self._init_lancedb()
def _init_sqlite(self):
"""Initialize the SQLite database with necessary tables."""
if not os.path.exists("data"):
os.makedirs("data")
conn = sqlite3.connect(self.DB_PATH)
cursor = conn.cursor()
# Table for conversation history (short-term memory)
cursor.execute('''
CREATE TABLE IF NOT EXISTS interactions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
# Table for facts/long-term memory
cursor.execute('''
CREATE TABLE IF NOT EXISTS facts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
fact TEXT NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
def _init_lancedb(self):
"""Initialize LanceDB for episodic memory."""
self.db = lancedb.connect(self.LANCE_PATH)
async def add_interaction(self, user_id: str, role: str, content: str):
"""Add a message interactions to the short-term memory."""
conn = sqlite3.connect(self.DB_PATH)
cursor = conn.cursor()
cursor.execute(
'INSERT INTO interactions (user_id, role, content) VALUES (?, ?, ?)',
(user_id, role, content)
)
conn.commit()
conn.close()
async def get_short_term_memory(self, user_id: str, limit: int = 10) -> List[Dict[str, str]]:
"""Retrieve recent interactions for a user."""
conn = sqlite3.connect(self.DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute(
'SELECT role, content FROM interactions WHERE user_id = ? ORDER BY id DESC LIMIT ?',
(user_id, limit)
)
rows = cursor.fetchall()
conn.close()
return [{"role": row["role"], "content": row["content"]} for row in reversed(rows)]
async def add_fact(self, user_id: str, fact: str):
"""Add a specific fact to long-term memory."""
conn = sqlite3.connect(self.DB_PATH)
cursor = conn.cursor()
cursor.execute(
'INSERT INTO facts (user_id, fact) VALUES (?, ?)',
(user_id, fact)
)
conn.commit()
conn.close()
async def get_long_term_memory(self, user_id: str) -> List[str]:
"""Retrieve all stored facts for a user."""
conn = sqlite3.connect(self.DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute(
'SELECT fact FROM facts WHERE user_id = ? ORDER BY timestamp DESC',
(user_id,)
)
rows = cursor.fetchall()
conn.close()
return [row["fact"] for row in rows]
# --- EPISODIC MEMORY (Phase 2 Upgrade) ---
async def store_episode(self, user_id: str, content: str, insight: str, emotion: str = "neutral"):
"""
Summarizes a session/interaction into an 'Episode' and stores in LanceDB.
"""
# 1. Generate Embedding for the insight/content
combined_text = f"Episode: {content} Insight: {insight} Emotion: {emotion}"
vector = await llm_service.get_embedding(combined_text)
if not vector:
print("MemoryService: Failed to generate embedding for episode.")
return
# 2. Data to store
data = [{
"vector": vector,
"user_id": user_id,
"content": content,
"insight": insight,
"emotion": emotion,
"timestamp": datetime.now().isoformat()
}]
# 3. Create or Update Table
if self.EPISODE_TABLE in self.db.table_names():
tbl = self.db.open_table(self.EPISODE_TABLE)
tbl.add(data)
else:
self.db.create_table(self.EPISODE_TABLE, data=data)
print(f"MemoryService: Episode stored for user {user_id}")
async def retrieve_episodes(self, user_id: str, query: str, limit: int = 3) -> List[Dict]:
"""
Retrieves relevant spiritual episodes using semantic search.
"""
if self.EPISODE_TABLE not in self.db.table_names():
return []
query_vec = await llm_service.get_embedding(query)
if not query_vec:
return []
try:
tbl = self.db.open_table(self.EPISODE_TABLE)
# Filter by user_id
results = (tbl.search(query_vec)
.where(f"user_id = '{user_id}'", prefilter=True)
.limit(limit)
.to_list())
return results
except Exception as e:
print(f"MemoryService: Search error: {str(e)}")
return []
# --- Legacy Compatibility ---
async def get_history(self, session_id: str) -> List[Dict[str, str]]:
return await self.get_short_term_memory(session_id, limit=50)
async def add_message(self, session_id: str, role: str, content: str):
await self.add_interaction(session_id, role, content)
# Singleton instance
memory_service = MemoryService()