File size: 8,258 Bytes
8bf4d58
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
"""Long-term memory using vector store for persistent context."""

import logging
from typing import List, Dict, Optional, Any
from datetime import datetime
import uuid
from src.core.config import get_settings
from src.retrieval.vector_store import get_vector_store

logger = logging.getLogger(__name__)


class LongTermMemory:
    """Manages long-term memory using vector store for semantic search."""

    def __init__(self, collection_name: Optional[str] = None):
        """Initialize long-term memory."""
        self.settings = get_settings()
        self.enabled = self.settings.long_term_memory_enabled

        if not self.enabled:
            logger.info("Long-term memory is disabled")
            return

        # Use a separate collection for long-term memory
        memory_collection = collection_name or f"{self.settings.chroma_collection_name}_memory"
        self.vector_store = get_vector_store()
        # Note: We'll use the same vector store but with different collection
        # For simplicity, we'll use metadata to distinguish memory entries
        self.memory_collection_name = memory_collection

    def store_conversation(
        self,
        session_id: str,
        messages: List[Dict[str, Any]],
        summary: Optional[str] = None,
    ) -> str:
        """
        Store a conversation in long-term memory.

        Args:
            session_id: Session identifier
            messages: List of messages
            summary: Optional conversation summary

        Returns:
            Memory entry ID
        """
        if not self.enabled:
            return ""

        try:
            # Create a text representation of the conversation
            conversation_text = self._format_conversation(messages, summary)

            # Generate a unique ID
            memory_id = str(uuid.uuid4())

            # Store in vector store with metadata
            metadata = {
                "session_id": session_id,
                "timestamp": datetime.now().isoformat(),
                "message_count": len(messages),
                "type": "conversation",
            }
            if summary:
                metadata["summary"] = summary

            self.vector_store.add_documents(
                documents=[conversation_text],
                metadatas=[metadata],
                ids=[memory_id],
            )

            logger.info(f"Stored conversation in long-term memory: {memory_id}")
            return memory_id
        except Exception as e:
            logger.error(f"Error storing conversation: {e}")
            return ""

    def search_memories(
        self,
        query: str,
        session_id: Optional[str] = None,
        n_results: int = 5,
    ) -> List[Dict[str, Any]]:
        """
        Search for relevant memories.

        Args:
            query: Search query
            session_id: Optional session ID to filter by
            n_results: Number of results to return

        Returns:
            List of memory entries
        """
        if not self.enabled:
            return []

        try:
            # Build filter if session_id is provided
            filter_dict = None
            if session_id:
                filter_dict = {"session_id": session_id}

            # Search vector store
            results = self.vector_store.search(
                query=query,
                n_results=n_results,
                filter=filter_dict,
            )

            # Format results
            memories = []
            for i, doc_id in enumerate(results["ids"]):
                memories.append({
                    "id": doc_id,
                    "content": results["documents"][i],
                    "metadata": results["metadatas"][i],
                    "distance": results["distances"][i],
                })

            return memories
        except Exception as e:
            logger.error(f"Error searching memories: {e}")
            return []

    def get_session_memories(
        self,
        session_id: str,
        limit: int = 10,
    ) -> List[Dict[str, Any]]:
        """
        Get all memories for a specific session.

        Args:
            session_id: Session identifier
            limit: Maximum number of memories to return

        Returns:
            List of memory entries
        """
        if not self.enabled:
            return []

        try:
            # Search with session filter
            results = self.vector_store.search(
                query="",  # Empty query to get all
                n_results=limit,
                filter={"session_id": session_id},
            )

            memories = []
            for i, doc_id in enumerate(results["ids"]):
                memories.append({
                    "id": doc_id,
                    "content": results["documents"][i],
                    "metadata": results["metadatas"][i],
                })

            # Sort by timestamp
            memories.sort(
                key=lambda x: x["metadata"].get("timestamp", ""),
                reverse=True,
            )

            return memories
        except Exception as e:
            logger.error(f"Error getting session memories: {e}")
            return []

    def delete_memory(self, memory_id: str) -> bool:
        """
        Delete a specific memory entry.

        Args:
            memory_id: Memory entry ID

        Returns:
            True if successful
        """
        if not self.enabled:
            return False

        try:
            self.vector_store.delete(ids=[memory_id])
            logger.info(f"Deleted memory: {memory_id}")
            return True
        except Exception as e:
            logger.error(f"Error deleting memory: {e}")
            return False

    def delete_session_memories(self, session_id: str) -> int:
        """
        Delete all memories for a session.

        Args:
            session_id: Session identifier

        Returns:
            Number of memories deleted
        """
        if not self.enabled:
            return 0

        try:
            memories = self.get_session_memories(session_id, limit=1000)
            if not memories:
                return 0

            memory_ids = [m["id"] for m in memories]
            self.vector_store.delete(ids=memory_ids)
            logger.info(f"Deleted {len(memory_ids)} memories for session: {session_id}")
            return len(memory_ids)
        except Exception as e:
            logger.error(f"Error deleting session memories: {e}")
            return 0

    def _format_conversation(
        self,
        messages: List[Dict[str, Any]],
        summary: Optional[str] = None,
    ) -> str:
        """Format conversation for storage."""
        parts = []
        if summary:
            parts.append(f"Summary: {summary}\n")
        parts.append("Conversation:")
        for msg in messages:
            role = msg.get("role", "unknown")
            content = msg.get("content", "")
            parts.append(f"{role}: {content}")
        return "\n".join(parts)

    def store_fact(
        self,
        fact: str,
        session_id: Optional[str] = None,
        metadata: Optional[Dict[str, Any]] = None,
    ) -> str:
        """
        Store a fact or piece of information.

        Args:
            fact: Fact to store
            session_id: Optional session ID
            metadata: Optional additional metadata

        Returns:
            Memory entry ID
        """
        if not self.enabled:
            return ""

        try:
            memory_id = str(uuid.uuid4())
            fact_metadata = {
                "timestamp": datetime.now().isoformat(),
                "type": "fact",
            }
            if session_id:
                fact_metadata["session_id"] = session_id
            if metadata:
                fact_metadata.update(metadata)

            self.vector_store.add_documents(
                documents=[fact],
                metadatas=[fact_metadata],
                ids=[memory_id],
            )

            logger.info(f"Stored fact in long-term memory: {memory_id}")
            return memory_id
        except Exception as e:
            logger.error(f"Error storing fact: {e}")
            return ""