File size: 15,207 Bytes
5374a2d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
import os
import json
from typing import List, Dict, Any, Optional, Union

from pydantic import Field

from ..core.module import BaseModule
from .storages_config import StoreConfig
from .db_stores import DBStoreBase, DBStoreFactory
from .graph_stores import GraphStoreFactory, GraphStoreBase
from .vectore_stores import VectorStoreFactory, VectorStoreBase
from .schema import TableType, AgentStore, WorkflowStore, MemoryStore, HistoryStore, IndexStore


class StorageHandler(BaseModule):
    """
    Implementation of a storage handler for managing various storage backends.
    
    StorageHandler provides an abstraction for reading and writing data (e.g., memory, agents, workflows).
    It supports multiple storage types, including database, vector, and graph storage, initialized via factories.
    """
    storageConfig: StoreConfig = Field(..., description="Configuration for all storage backends")
    storageDB: Optional[Union[DBStoreBase, Any]] = Field(None, description="Database storage backend")
    vector_store: Optional[Union[VectorStoreBase, Any]] = Field(None, description="Single vector storage backend")
    graph_store: Optional[Union[GraphStoreBase, Any]] = Field(None, description="Optional graph storage backend")

    def init_module(self):
        """
        Initialize all storage backends based on the provided configuration.
        Calls individual initialization methods for database, vector, and graph stores.
        """
        # Create the path
        if (self.storageConfig.path is not None) or (self.storageConfig.path != ":memory:") \
            or (not self.storageConfig.path):
            os.makedirs(os.path.dirname(self.storageConfig.path), exist_ok=True)
        
        self._init_db_store()
        self._init_vector_store()
        self._init_graph_store()
    
    def _init_db_store(self):
        """
        Initialize the database storage backend using the DBStoreFactory.
        Sets the storageDB attribute with the created instance.
        """
        db_config = self.storageConfig.dbConfig
        self.storageDB = DBStoreFactory.create(db_config.db_name, db_config)
    
    def _init_vector_store(self):
        """
        Initialize the vector storage backend using the VectorStoreFactory.
        Sets the storageVector attribute if the configuration is provided.
        """
        vector_config = self.storageConfig.vectorConfig
        if vector_config is not None:
            if self.vector_store is not None:
                del self.vector_store

            vector_config_dict = vector_config.model_dump()
            self.vector_store = VectorStoreFactory().create(
                store_type=vector_config.vector_name,
                store_config=vector_config_dict
            )
    
    def _init_graph_store(self):
        """
        Initialize the graph storage backend using the GraphStoreFactory.
        Sets the storageGraph attribute if the configuration is provided.
        """
        graph_config = self.storageConfig.graphConfig
        if graph_config is not None:
            self.graph_store = GraphStoreFactory().create(
                store_type=graph_config.graph_name,
                store_config=graph_config.model_dump()
            )

    def load(self, tables: Optional[List[str]] = None, *args, **kwargs) -> Dict[str, Any]:
        """
        Load all data from the database storage.

        Attributes:
            tables (Optional[List[str]]): List of table names to load; if None, loads all tables.

        Returns:
            Dict[str, Dict[str, str]]: A dictionary with table names as keys and lists of records as values. You should parse the values by yourself.
        """
        result = {}
        table_info = self.storageDB.col_info()
        
        if tables is None:
            tables_to_load = [t.value for t in TableType]
        else:
            tables_to_load = tables

        # Load data for each table
        for table_name in tables_to_load:
            table_data = []
            # Check if the table exists
            if any(t["table_name"] == table_name for t in table_info):
                cursor = self.storageDB.connection.cursor()
                cursor.execute(f"SELECT * FROM {table_name}")
                # Get column names from the columns dictionary
                columns = next(t["columns"].keys() for t in table_info if t["table_name"] == table_name)
                rows = cursor.fetchall()
                table_data = [dict(zip(columns, row)) for row in rows]
            result[table_name] = table_data
        
        return result

    def save(self, data: Dict[str, Any], *args, **kwargs):
        """
        Save all provided data to the database storage.

        Attributes:
            data (Dict[str, Any]): Dictionary with table names as keys and lists of records to save.

        Raises:
            ValueError: If an unknown table name is provided.
        """
        for table_name, records in data.items():
            store_type = None
            # Map table name to store_type
            for st in TableType:
                if st.value == table_name:
                    store_type = st
                    break
            if store_type is None:
                raise ValueError(f"Unknown table: {table_name}")
            # Insert each record
            for record in records:
                self.storageDB.insert(metadata=record, store_type=store_type, table=table_name)

    def parse_result(self, results: Dict[str, str], 
                     store: Union[AgentStore, WorkflowStore, MemoryStore, HistoryStore]) -> Dict[str, Any]:
        """
        Parse database results, converting JSON strings to Python objects where applicable.

        Attributes:
            results (Dict[str, str]): Raw database results with column names as keys.
            store (Union[AgentStore, WorkflowStore, MemoryStore, HistoryStore]): Pydantic model for validation.

        Returns:
            Dict[str, Any]: Parsed results with JSON strings deserialized to Python objects.
        """
        for k, v in store.model_fields.items():
            if v.annotation not in [Optional[str], str]:
                try:
                    results[k] = json.loads(results[k])
                except (json.JSONDecodeError, KeyError, TypeError):
                    results[k] = results.get(k)
        return results

    def load_memory(self, memory_id: str, table: Optional[str]=None, **kwargs) -> Dict[str, Any]:
        """
        Load a single long-term memory data.

        Attributes:
            memory_id (str): The ID of the long-term memory.
            table (Optional[str]): The table name; defaults to 'memory' if None.

        Returns:
            Dict[str, Any]: The data that can be used to create a LongTermMemory instance.
        """
        table = table or TableType.store_memory.value
        result = self.storageDB.get_by_id(memory_id, store_type="memory", table=table)
        if result is not None:
            result = self.parse_result(result, MemoryStore)
        return result

    def save_memory(self, memory_data: Dict[str, Any], table: Optional[str]=None, **kwargs):
        """
        Save or update a single memory.

        Attributes:
            memory_data (Dict[str, Any]): The long-term memory's data.
            table (Optional[str]): The table name; defaults to 'memory' if None.

        """
        table = table or TableType.store_memory.value
        memory_id = memory_data.get("memory_id")
        if not memory_id:
            raise ValueError("Memory data must include a 'memory_id' field")
        existing = self.storageDB.get_by_id(memory_id, store_type="memory", table=table)
        if existing:
            self.storageDB.update(memory_id, new_metadata=memory_data, store_type="memory", table=table)
        else:
            self.storageDB.insert(metadata=memory_data, store_type="memory", table=table)

    def load_agent(self, agent_name: str, table: Optional[str]=None, *args, **kwargs) -> Dict[str, Any]:
        """
        Load a single agent's data.

        Attributes:
            agent_name (str): The unique name of the agent to retrieve.
            table (Optional[str]): The table name; defaults to 'agent' if None.

        Returns:
            Dict[str, Any]: The data that can be used to create an Agent instance, or None if not found.
        """
        table = table or TableType.store_agent.value
        result = self.storageDB.get_by_id(agent_name, store_type="agent", table=table)
        # Parse the result to convert JSON strings to Python objects
        if result is not None:
            result = self.parse_result(result, AgentStore)
        return result

    def remove_agent(self, agent_name: str, table: Optional[str]=None, *args, **kwargs):
        """
        Remove an agent from storage if the agent exists.

        Attributes:
            agent_name (str): The name of the agent to be deleted.
            table (Optional[str]): The table name; defaults to 'agent' if None.

        Raises:
            ValueError: If the agent does not exist in the specified table.
        """
        table = table or TableType.store_agent.value
        success = self.storageDB.delete(agent_name, store_type="agent", table=table)
        if not success:
            raise ValueError(f"Agent with name {agent_name} not found in table {table}")

    def save_agent(self, agent_data: Dict[str, Any], table: Optional[str]=None, *args, **kwargs):
        """
        Save or update a single agent's data.

        Attributes:
            agent_data (Dict[str, Any]): The agent's data, must include 'name' and 'content' keys.
            table (Optional[str]): The table name; defaults to 'agent' if None.

        Raises:
            ValueError: If 'name' field is missing or if Pydantic validation fails.
        """
        table = table or TableType.store_agent.value
        agent_name = agent_data.get("name")
        if not agent_name:
            raise ValueError("Agent data must include a 'name' field")
        
        existing = self.storageDB.get_by_id(agent_name, store_type="agent", table=table)
        if existing:
            self.storageDB.update(agent_name, new_metadata=agent_data, store_type="agent", table=table)
        else:
            self.storageDB.insert(metadata=agent_data, store_type="agent", table=table)

    def load_workflow(self, workflow_id: str, table: Optional[str] = None, *args, **kwargs) -> Dict[str, Any]:
        """
        Load a single workflow's data.

        Attributes:
            workflow_id (str): The ID of the workflow.
            table (Optional[str]): The table name; defaults to 'workflow' if None.

        Returns:
            Dict[str, Any]: The data that can be used to create a WorkFlow instance, or None if not found.
        """
        table = table or TableType.store_workflow.value
        result = self.storageDB.get_by_id(workflow_id, store_type="workflow", table=table)
        # Parse the result to convert JSON strings to Python objects
        if result is not None:
            result = self.parse_result(result, WorkflowStore)
        return result

    def save_workflow(self, workflow_data: Dict[str, Any], table: Optional[str] = None, *args, **kwargs):
        """
        Save or update a workflow's data.

        Attributes:
            workflow_data (Dict[str, Any]): The workflow's data, must include 'name' field.
            table (Optional[str]): The table name; defaults to 'workflow' if None.

        Raises:
            ValueError: If 'name' field is missing or if Pydantic validation fails.
        """
        table = table or TableType.store_workflow.value
        workflow_id = workflow_data.get("name")
        if not workflow_id:
            raise ValueError("Workflow data must include a 'name' field")
        # Check if workflow exists to decide between insert or update
        existing = self.storageDB.get_by_id(workflow_id, store_type="workflow", table=table)
        if existing:

            self.storageDB.update(workflow_id, new_metadata=workflow_data, store_type="workflow", table=table)
        else:
            self.storageDB.insert(metadata=workflow_data, store_type="workflow", table=table)

    def load_history(self, memory_id: str, table: Optional[str] = None, *args, **kwargs) -> Dict[str, Any]:
        """
        Load a single history entry.

        Attributes:
            memory_id (str): The ID of the memory associated with the history entry.
            table (Optional[str]): The table name; defaults to 'history' if None.

        Returns:
            Dict[str, Any]: The history data, or None if not found.
        """
        table = table or TableType.store_history.value
        result = self.storageDB.get_by_id(memory_id, store_type="history", table=table)
        # Parse the result to convert JSON strings to Python objects (if any)
        if result is not None:
            result = self.parse_result(result, HistoryStore)
        return result

    def save_history(self, history_data: Dict[str, Any], table: Optional[str] = None, *args, **kwargs):
        """
        Save or update a single history entry.

        Attributes:
            history_data (Dict[str, Any]): The history data, must include 'memory_id' field.
            table (Optional[str]): The table name; defaults to 'history' if None.

        Raises:
            ValueError: If 'memory_id' field is missing or if Pydantic validation fails.
        """
        table = table or TableType.store_history.value
        memory_id = history_data.get("memory_id")
        if not memory_id:
            raise ValueError("History data must include a 'memory_id' field")
        # Check if history entry exists to decide between insert or update
        existing = self.storageDB.get_by_id(memory_id, store_type="history", table=table)
        if existing:
            # parse the history, then change the old_hisotry
            result = HistoryStore.model_validate(self.parse_result(existing, HistoryStore))
            history_data["old_memory"] = result.old_memory
            self.storageDB.update(memory_id, new_metadata=history_data, store_type="history", table=table)
        else:
            self.storageDB.insert(metadata=history_data, store_type="history", table=table)

    def load_index(self, corpus_id: str, table: Optional[str] = None) -> Optional[Dict[str, Any]]:
        result = self.storageDB.get_by_id(corpus_id, store_type="indexing", table=table)
        if result is not None:
            result = self.parse_result(result, IndexStore)

        return result

    def save_index(self, index_data: Dict[str, Any], table: Optional[str] = None):
        corpus_id = index_data.get("corpus_id")
        if not corpus_id:
            raise ValueError("Index data must include an 'corpus_id' field")
        existing = self.storageDB.get_by_id(corpus_id, store_type="indexing", table=table)
        if existing:
            self.storageDB.update(corpus_id, new_metadata=index_data, store_type="indexing", table=table)
        else:
            self.storageDB.insert(metadata=index_data, store_type="indexing", table=table)