File size: 5,781 Bytes
5e0532d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
import sqlite3
import json
import os
import lancedb
from typing import List, Dict, Optional
from datetime import datetime
from app.services.llm import llm_service

class MemoryService:
    DB_PATH = "data/memory.db"
    LANCE_PATH = "data/memory_lancedb"
    EPISODE_TABLE = "episodes"

    def __init__(self):
        self._init_sqlite()
        self._init_lancedb()

    def _init_sqlite(self):
        """Initialize the SQLite database with necessary tables."""
        if not os.path.exists("data"):
            os.makedirs("data")
            
        conn = sqlite3.connect(self.DB_PATH)
        cursor = conn.cursor()
        
        # Table for conversation history (short-term memory)
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS interactions (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                user_id TEXT NOT NULL,
                role TEXT NOT NULL,
                content TEXT NOT NULL,
                timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        
        # Table for facts/long-term memory
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS facts (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                user_id TEXT NOT NULL,
                fact TEXT NOT NULL,
                timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        
        conn.commit()
        conn.close()

    def _init_lancedb(self):
        """Initialize LanceDB for episodic memory."""
        self.db = lancedb.connect(self.LANCE_PATH)

    async def add_interaction(self, user_id: str, role: str, content: str):
        """Add a message interactions to the short-term memory."""
        conn = sqlite3.connect(self.DB_PATH)
        cursor = conn.cursor()
        cursor.execute(
            'INSERT INTO interactions (user_id, role, content) VALUES (?, ?, ?)',
            (user_id, role, content)
        )
        conn.commit()
        conn.close()

    async def get_short_term_memory(self, user_id: str, limit: int = 10) -> List[Dict[str, str]]:
        """Retrieve recent interactions for a user."""
        conn = sqlite3.connect(self.DB_PATH)
        conn.row_factory = sqlite3.Row
        cursor = conn.cursor()
        cursor.execute(
            'SELECT role, content FROM interactions WHERE user_id = ? ORDER BY id DESC LIMIT ?',
            (user_id, limit)
        )
        rows = cursor.fetchall()
        conn.close()
        return [{"role": row["role"], "content": row["content"]} for row in reversed(rows)]

    async def add_fact(self, user_id: str, fact: str):
        """Add a specific fact to long-term memory."""
        conn = sqlite3.connect(self.DB_PATH)
        cursor = conn.cursor()
        cursor.execute(
            'INSERT INTO facts (user_id, fact) VALUES (?, ?)',
            (user_id, fact)
        )
        conn.commit()
        conn.close()

    async def get_long_term_memory(self, user_id: str) -> List[str]:
        """Retrieve all stored facts for a user."""
        conn = sqlite3.connect(self.DB_PATH)
        conn.row_factory = sqlite3.Row
        cursor = conn.cursor()
        cursor.execute(
            'SELECT fact FROM facts WHERE user_id = ? ORDER BY timestamp DESC',
            (user_id,)
        )
        rows = cursor.fetchall()
        conn.close()
        return [row["fact"] for row in rows]

    # --- EPISODIC MEMORY (Phase 2 Upgrade) ---

    async def store_episode(self, user_id: str, content: str, insight: str, emotion: str = "neutral"):
        """
        Summarizes a session/interaction into an 'Episode' and stores in LanceDB.
        """
        # 1. Generate Embedding for the insight/content
        combined_text = f"Episode: {content} Insight: {insight} Emotion: {emotion}"
        vector = await llm_service.get_embedding(combined_text)
        
        if not vector:
            print("MemoryService: Failed to generate embedding for episode.")
            return

        # 2. Data to store
        data = [{
            "vector": vector,
            "user_id": user_id,
            "content": content,
            "insight": insight,
            "emotion": emotion,
            "timestamp": datetime.now().isoformat()
        }]

        # 3. Create or Update Table
        if self.EPISODE_TABLE in self.db.table_names():
            tbl = self.db.open_table(self.EPISODE_TABLE)
            tbl.add(data)
        else:
            self.db.create_table(self.EPISODE_TABLE, data=data)
        
        print(f"MemoryService: Episode stored for user {user_id}")

    async def retrieve_episodes(self, user_id: str, query: str, limit: int = 3) -> List[Dict]:
        """
        Retrieves relevant spiritual episodes using semantic search.
        """
        if self.EPISODE_TABLE not in self.db.table_names():
            return []

        query_vec = await llm_service.get_embedding(query)
        if not query_vec:
            return []

        try:
            tbl = self.db.open_table(self.EPISODE_TABLE)
            
            # Filter by user_id
            results = (tbl.search(query_vec)
                       .where(f"user_id = '{user_id}'", prefilter=True)
                       .limit(limit)
                       .to_list())
            
            return results
        except Exception as e:
            print(f"MemoryService: Search error: {str(e)}")
            return []

    # --- Legacy Compatibility ---
    async def get_history(self, session_id: str) -> List[Dict[str, str]]:
        return await self.get_short_term_memory(session_id, limit=50)

    async def add_message(self, session_id: str, role: str, content: str):
        await self.add_interaction(session_id, role, content)

# Singleton instance
memory_service = MemoryService()