File size: 9,591 Bytes
5374a2d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Union
from enum import Enum



class DatabaseType(Enum):
    """Enumeration of supported database types"""
    MONGODB = "mongodb"
    POSTGRESQL = "postgresql"
    MYSQL = "mysql"
    SQLITE = "sqlite"
    REDIS = "redis"
    ELASTICSEARCH = "elasticsearch"
    NEO4J = "neo4j"
    VECTOR = "vector"  # For vector databases like Pinecone, Weaviate, etc.


class QueryType(Enum):
    """Enumeration of query types"""
    SELECT = "select"
    INSERT = "insert"
    UPDATE = "update"
    DELETE = "delete"
    CREATE = "create"
    DROP = "drop"
    ALTER = "alter"
    INDEX = "index"
    AGGREGATE = "aggregate"  # For MongoDB aggregation pipelines
    SEARCH = "search"  # For full-text search
    VECTOR_SEARCH = "vector_search"  # For vector similarity search


class DatabaseConnection:
    """Base class for database connection management"""
    
    def __init__(self, connection_string: str, **kwargs):
        self.connection_string = connection_string
        self.connection_params = kwargs
        self._connection = None
        self._is_connected = False
    
    @property
    def is_connected(self) -> bool:
        return self._is_connected
    
    @abstractmethod
    def connect(self) -> bool:
        """Establish connection to the database"""
        pass
    
    @abstractmethod
    def disconnect(self) -> bool:
        """Close connection to the database"""
        pass
    
    @abstractmethod
    def test_connection(self) -> bool:
        """Test if the connection is working"""
        pass


class DatabaseBase(ABC):
    """
    Abstract base class for database operations.
    Provides a common interface for different database types.
    """
    
    def __init__(self, 
                 connection_string: str = None,
                 database_name: str = None,
                 **kwargs):
        """
        Initialize the database base.
        
        Args:
            connection_string: Database connection string
            database_name: Name of the database to use
            **kwargs: Additional connection parameters
        """
        self.connection_string = connection_string
        self.database_name = database_name
        self.connection_params = kwargs
        self.db_type = self._get_database_type()
        self.connection = None
        self._is_initialized = False
        
        # Initialize connection if connection string is provided
        if connection_string:
            self.connect()
    
    @abstractmethod
    def _get_database_type(self) -> DatabaseType:
        """Return the database type"""
        pass
    
    @abstractmethod
    def connect(self) -> bool:
        """
        Establish connection to the database.
        
        Returns:
            bool: True if connection successful, False otherwise
        """
        pass
    
    @abstractmethod
    def disconnect(self) -> bool:
        """
        Close connection to the database.
        
        Returns:
            bool: True if disconnection successful, False otherwise
        """
        pass
    
    @abstractmethod
    def test_connection(self) -> bool:
        """
        Test if the database connection is working.
        
        Returns:
            bool: True if connection is working, False otherwise
        """
        pass
    
    @abstractmethod
    def execute_query(self, 
                     query: Union[str, Dict, List], 
                     query_type: QueryType = None,
                     **kwargs) -> Dict[str, Any]:
        """
        Execute a query on the database.
        
        Args:
            query: The query to execute (string for SQL, dict/list for NoSQL)
            query_type: Type of query being executed
            **kwargs: Additional query parameters
            
        Returns:
            Dict containing query results and metadata
        """
        pass
    
    @abstractmethod
    def get_database_info(self) -> Dict[str, Any]:
        """
        Get information about the database.
        
        Returns:
            Dict containing database information
        """
        pass
    
    @abstractmethod
    def list_collections(self) -> List[str]:
        """
        List all collections/tables in the database.
        
        Returns:
            List of collection/table names
        """
        pass
    
    @abstractmethod
    def get_collection_info(self, collection_name: str) -> Dict[str, Any]:
        """
        Get information about a specific collection/table.
        
        Args:
            collection_name: Name of the collection/table
            
        Returns:
            Dict containing collection/table information
        """
        pass
    
    @abstractmethod
    def get_schema(self, collection_name: str = None) -> Dict[str, Any]:
        """
        Get the schema/structure of the database or a specific collection.
        
        Args:
            collection_name: Name of the collection/table (optional)
            
        Returns:
            Dict containing schema information
        """
        pass
    
    def validate_query(self, query: Union[str, Dict, List]) -> Dict[str, Any]:
        """
        Validate a query before execution.
        
        Args:
            query: The query to validate
            
        Returns:
            Dict containing validation results
        """
        try:
            # Basic validation - child classes can override for specific validation
            if isinstance(query, str):
                if not query.strip():
                    return {"valid": False, "error": "Query cannot be empty"}
            elif isinstance(query, (dict, list)):
                if not query:
                    return {"valid": False, "error": "Query cannot be empty"}
            else:
                return {"valid": False, "error": f"Unsupported query type: {type(query)}"}
            
            return {"valid": True, "error": None}
            
        except Exception as e:
            return {"valid": False, "error": str(e)}
    
    def format_query_result(self, 
                           data: Any, 
                           query_type: QueryType,
                           execution_time: float = None,
                           **kwargs) -> Dict[str, Any]:
        """
        Format query results into a standard structure.
        
        Args:
            data: Raw query results
            query_type: Type of query that was executed
            execution_time: Time taken to execute the query
            **kwargs: Additional metadata
            
        Returns:
            Dict containing formatted results
        """
        return {
            "success": True,
            "data": data,
            "query_type": query_type.value if query_type else None,
            "execution_time": execution_time,
            "row_count": len(data) if isinstance(data, (list, tuple)) else 1,
            "metadata": kwargs
        }
    
    def format_error_result(self, 
                           error: str, 
                           query_type: QueryType = None,
                           **kwargs) -> Dict[str, Any]:
        """
        Format error results into a standard structure.
        
        Args:
            error: Error message
            query_type: Type of query that failed
            **kwargs: Additional error metadata
            
        Returns:
            Dict containing formatted error results
        """
        return {
            "success": False,
            "error": error,
            "query_type": query_type.value if query_type else None,
            "data": None,
            "execution_time": None,
            "row_count": 0,
            "metadata": kwargs
        }
    
    def get_supported_query_types(self) -> List[QueryType]:
        """
        Get list of supported query types for this database.
        
        Returns:
            List of supported QueryType enums
        """
        return [
            QueryType.SELECT,
            QueryType.INSERT,
            QueryType.UPDATE,
            QueryType.DELETE,
            QueryType.CREATE,
            QueryType.DROP
        ]
    
    def get_capabilities(self) -> Dict[str, Any]:
        """
        Get database capabilities and features.
        
        Returns:
            Dict containing database capabilities
        """
        return {
            "database_type": self.db_type.value,
            "supports_sql": False,  # Override in SQL databases
            "supports_aggregation": False,  # Override in document databases
            "supports_full_text_search": False,  # Override in search databases
            "supports_vector_search": False,  # Override in vector databases
            "supports_transactions": False,  # Override if supported
            "supports_indexing": True,
            "supported_query_types": [qt.value for qt in self.get_supported_query_types()],
            "connection_info": {
                "is_connected": self.connection is not None,
                "database_name": self.database_name
            }
        }
    
    def __enter__(self):
        """Context manager entry"""
        if not self.connection:
            self.connect()
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit"""
        self.disconnect()
    
    def __del__(self):
        """Cleanup on deletion"""
        try:
            self.disconnect()
        except Exception:
            pass