ORA / app /services /memory.py
Abdalkaderdev's picture
Initial ORA deployment
5e0532d
import sqlite3
import json
import os
import lancedb
from typing import List, Dict, Optional
from datetime import datetime
from app.services.llm import llm_service
class MemoryService:
DB_PATH = "data/memory.db"
LANCE_PATH = "data/memory_lancedb"
EPISODE_TABLE = "episodes"
def __init__(self):
self._init_sqlite()
self._init_lancedb()
def _init_sqlite(self):
"""Initialize the SQLite database with necessary tables."""
if not os.path.exists("data"):
os.makedirs("data")
conn = sqlite3.connect(self.DB_PATH)
cursor = conn.cursor()
# Table for conversation history (short-term memory)
cursor.execute('''
CREATE TABLE IF NOT EXISTS interactions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
# Table for facts/long-term memory
cursor.execute('''
CREATE TABLE IF NOT EXISTS facts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
fact TEXT NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
def _init_lancedb(self):
"""Initialize LanceDB for episodic memory."""
self.db = lancedb.connect(self.LANCE_PATH)
async def add_interaction(self, user_id: str, role: str, content: str):
"""Add a message interactions to the short-term memory."""
conn = sqlite3.connect(self.DB_PATH)
cursor = conn.cursor()
cursor.execute(
'INSERT INTO interactions (user_id, role, content) VALUES (?, ?, ?)',
(user_id, role, content)
)
conn.commit()
conn.close()
async def get_short_term_memory(self, user_id: str, limit: int = 10) -> List[Dict[str, str]]:
"""Retrieve recent interactions for a user."""
conn = sqlite3.connect(self.DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute(
'SELECT role, content FROM interactions WHERE user_id = ? ORDER BY id DESC LIMIT ?',
(user_id, limit)
)
rows = cursor.fetchall()
conn.close()
return [{"role": row["role"], "content": row["content"]} for row in reversed(rows)]
async def add_fact(self, user_id: str, fact: str):
"""Add a specific fact to long-term memory."""
conn = sqlite3.connect(self.DB_PATH)
cursor = conn.cursor()
cursor.execute(
'INSERT INTO facts (user_id, fact) VALUES (?, ?)',
(user_id, fact)
)
conn.commit()
conn.close()
async def get_long_term_memory(self, user_id: str) -> List[str]:
"""Retrieve all stored facts for a user."""
conn = sqlite3.connect(self.DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute(
'SELECT fact FROM facts WHERE user_id = ? ORDER BY timestamp DESC',
(user_id,)
)
rows = cursor.fetchall()
conn.close()
return [row["fact"] for row in rows]
# --- EPISODIC MEMORY (Phase 2 Upgrade) ---
async def store_episode(self, user_id: str, content: str, insight: str, emotion: str = "neutral"):
"""
Summarizes a session/interaction into an 'Episode' and stores in LanceDB.
"""
# 1. Generate Embedding for the insight/content
combined_text = f"Episode: {content} Insight: {insight} Emotion: {emotion}"
vector = await llm_service.get_embedding(combined_text)
if not vector:
print("MemoryService: Failed to generate embedding for episode.")
return
# 2. Data to store
data = [{
"vector": vector,
"user_id": user_id,
"content": content,
"insight": insight,
"emotion": emotion,
"timestamp": datetime.now().isoformat()
}]
# 3. Create or Update Table
if self.EPISODE_TABLE in self.db.table_names():
tbl = self.db.open_table(self.EPISODE_TABLE)
tbl.add(data)
else:
self.db.create_table(self.EPISODE_TABLE, data=data)
print(f"MemoryService: Episode stored for user {user_id}")
async def retrieve_episodes(self, user_id: str, query: str, limit: int = 3) -> List[Dict]:
"""
Retrieves relevant spiritual episodes using semantic search.
"""
if self.EPISODE_TABLE not in self.db.table_names():
return []
query_vec = await llm_service.get_embedding(query)
if not query_vec:
return []
try:
tbl = self.db.open_table(self.EPISODE_TABLE)
# Filter by user_id
results = (tbl.search(query_vec)
.where(f"user_id = '{user_id}'", prefilter=True)
.limit(limit)
.to_list())
return results
except Exception as e:
print(f"MemoryService: Search error: {str(e)}")
return []
# --- Legacy Compatibility ---
async def get_history(self, session_id: str) -> List[Dict[str, str]]:
return await self.get_short_term_memory(session_id, limit=50)
async def add_message(self, session_id: str, role: str, content: str):
await self.add_interaction(session_id, role, content)
# Singleton instance
memory_service = MemoryService()