File size: 2,427 Bytes
5374a2d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
from abc import ABC, abstractmethod
from typing import Optional, Literal


# Abstract base class for database store implementations
class DBStoreBase(ABC):
    """
    Abstract base class defining the interface for database storage operations.
    Subclasses must implement methods for inserting, deleting, updating, and retrieving metadata.
    """

    @abstractmethod
    def insert(self, metadata, store_type: Optional[Literal["memory", "agent", "workflow", "history"]], table=None):
        """
        Insert metadata into a specified table.
        """
        pass

    @abstractmethod
    def insert_memory(self, metadata, store_type: Optional[Literal["memory", "agent", "workflow", "history"]], table=None):
        """
        Insert memory metadata into a specified table.
        """
        pass

    @abstractmethod
    def insert_agent(self, metadata, store_type: Optional[Literal["memory", "agent", "workflow", "history"]], table=None):
        """
        Insert agent metadata into a specified table.
        """
        pass

    @abstractmethod
    def insert_workflow(self, metadata, store_type: Optional[Literal["memory", "agent", "workflow", "history"]], table=None):
        """
        Insert workflow metadata into a specified table.
        """
        pass

    @abstractmethod
    def insert_history(self, metadata, store_type: Optional[Literal["memory", "agent", "workflow", "history"]], table=None):
        """
        Insert history metadata into a specified table.
        """
        pass

    @abstractmethod
    def delete(self, metadata_id, store_type: Optional[Literal["memory", "agent", "workflow", "history"]], table=None):
        """
        Delete metadata by its ID from a specified table.
        """
        pass

    @abstractmethod
    def update(self, metadata_id, new_metadata=None, 
               store_type: Optional[Literal["memory", "agent", "workflow", "history"]]=None, table=None):
        """
        Update metadata by its ID in a specified table.
        """
        pass

    @abstractmethod
    def get_by_id(self, metadata_id, store_type: Optional[Literal["memory", "agent", "workflow", "history"]], table=None):
        """
        Retrieve metadata by its ID from a specified table.
        """
        pass

    @abstractmethod
    def col_info(self):
        """
        Retrieve information about the database collections (tables).
        """
        pass