|
|
from abc import ABC, abstractmethod |
|
|
from typing import Dict, Any, List, Union |
|
|
from enum import Enum |
|
|
|
|
|
|
|
|
|
|
|
class DatabaseType(Enum): |
|
|
"""Enumeration of supported database types""" |
|
|
MONGODB = "mongodb" |
|
|
POSTGRESQL = "postgresql" |
|
|
MYSQL = "mysql" |
|
|
SQLITE = "sqlite" |
|
|
REDIS = "redis" |
|
|
ELASTICSEARCH = "elasticsearch" |
|
|
NEO4J = "neo4j" |
|
|
VECTOR = "vector" |
|
|
|
|
|
|
|
|
class QueryType(Enum): |
|
|
"""Enumeration of query types""" |
|
|
SELECT = "select" |
|
|
INSERT = "insert" |
|
|
UPDATE = "update" |
|
|
DELETE = "delete" |
|
|
CREATE = "create" |
|
|
DROP = "drop" |
|
|
ALTER = "alter" |
|
|
INDEX = "index" |
|
|
AGGREGATE = "aggregate" |
|
|
SEARCH = "search" |
|
|
VECTOR_SEARCH = "vector_search" |
|
|
|
|
|
|
|
|
class DatabaseConnection: |
|
|
"""Base class for database connection management""" |
|
|
|
|
|
def __init__(self, connection_string: str, **kwargs): |
|
|
self.connection_string = connection_string |
|
|
self.connection_params = kwargs |
|
|
self._connection = None |
|
|
self._is_connected = False |
|
|
|
|
|
@property |
|
|
def is_connected(self) -> bool: |
|
|
return self._is_connected |
|
|
|
|
|
@abstractmethod |
|
|
def connect(self) -> bool: |
|
|
"""Establish connection to the database""" |
|
|
pass |
|
|
|
|
|
@abstractmethod |
|
|
def disconnect(self) -> bool: |
|
|
"""Close connection to the database""" |
|
|
pass |
|
|
|
|
|
@abstractmethod |
|
|
def test_connection(self) -> bool: |
|
|
"""Test if the connection is working""" |
|
|
pass |
|
|
|
|
|
|
|
|
class DatabaseBase(ABC): |
|
|
""" |
|
|
Abstract base class for database operations. |
|
|
Provides a common interface for different database types. |
|
|
""" |
|
|
|
|
|
def __init__(self, |
|
|
connection_string: str = None, |
|
|
database_name: str = None, |
|
|
**kwargs): |
|
|
""" |
|
|
Initialize the database base. |
|
|
|
|
|
Args: |
|
|
connection_string: Database connection string |
|
|
database_name: Name of the database to use |
|
|
**kwargs: Additional connection parameters |
|
|
""" |
|
|
self.connection_string = connection_string |
|
|
self.database_name = database_name |
|
|
self.connection_params = kwargs |
|
|
self.db_type = self._get_database_type() |
|
|
self.connection = None |
|
|
self._is_initialized = False |
|
|
|
|
|
|
|
|
if connection_string: |
|
|
self.connect() |
|
|
|
|
|
@abstractmethod |
|
|
def _get_database_type(self) -> DatabaseType: |
|
|
"""Return the database type""" |
|
|
pass |
|
|
|
|
|
@abstractmethod |
|
|
def connect(self) -> bool: |
|
|
""" |
|
|
Establish connection to the database. |
|
|
|
|
|
Returns: |
|
|
bool: True if connection successful, False otherwise |
|
|
""" |
|
|
pass |
|
|
|
|
|
@abstractmethod |
|
|
def disconnect(self) -> bool: |
|
|
""" |
|
|
Close connection to the database. |
|
|
|
|
|
Returns: |
|
|
bool: True if disconnection successful, False otherwise |
|
|
""" |
|
|
pass |
|
|
|
|
|
@abstractmethod |
|
|
def test_connection(self) -> bool: |
|
|
""" |
|
|
Test if the database connection is working. |
|
|
|
|
|
Returns: |
|
|
bool: True if connection is working, False otherwise |
|
|
""" |
|
|
pass |
|
|
|
|
|
@abstractmethod |
|
|
def execute_query(self, |
|
|
query: Union[str, Dict, List], |
|
|
query_type: QueryType = None, |
|
|
**kwargs) -> Dict[str, Any]: |
|
|
""" |
|
|
Execute a query on the database. |
|
|
|
|
|
Args: |
|
|
query: The query to execute (string for SQL, dict/list for NoSQL) |
|
|
query_type: Type of query being executed |
|
|
**kwargs: Additional query parameters |
|
|
|
|
|
Returns: |
|
|
Dict containing query results and metadata |
|
|
""" |
|
|
pass |
|
|
|
|
|
@abstractmethod |
|
|
def get_database_info(self) -> Dict[str, Any]: |
|
|
""" |
|
|
Get information about the database. |
|
|
|
|
|
Returns: |
|
|
Dict containing database information |
|
|
""" |
|
|
pass |
|
|
|
|
|
@abstractmethod |
|
|
def list_collections(self) -> List[str]: |
|
|
""" |
|
|
List all collections/tables in the database. |
|
|
|
|
|
Returns: |
|
|
List of collection/table names |
|
|
""" |
|
|
pass |
|
|
|
|
|
@abstractmethod |
|
|
def get_collection_info(self, collection_name: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Get information about a specific collection/table. |
|
|
|
|
|
Args: |
|
|
collection_name: Name of the collection/table |
|
|
|
|
|
Returns: |
|
|
Dict containing collection/table information |
|
|
""" |
|
|
pass |
|
|
|
|
|
@abstractmethod |
|
|
def get_schema(self, collection_name: str = None) -> Dict[str, Any]: |
|
|
""" |
|
|
Get the schema/structure of the database or a specific collection. |
|
|
|
|
|
Args: |
|
|
collection_name: Name of the collection/table (optional) |
|
|
|
|
|
Returns: |
|
|
Dict containing schema information |
|
|
""" |
|
|
pass |
|
|
|
|
|
def validate_query(self, query: Union[str, Dict, List]) -> Dict[str, Any]: |
|
|
""" |
|
|
Validate a query before execution. |
|
|
|
|
|
Args: |
|
|
query: The query to validate |
|
|
|
|
|
Returns: |
|
|
Dict containing validation results |
|
|
""" |
|
|
try: |
|
|
|
|
|
if isinstance(query, str): |
|
|
if not query.strip(): |
|
|
return {"valid": False, "error": "Query cannot be empty"} |
|
|
elif isinstance(query, (dict, list)): |
|
|
if not query: |
|
|
return {"valid": False, "error": "Query cannot be empty"} |
|
|
else: |
|
|
return {"valid": False, "error": f"Unsupported query type: {type(query)}"} |
|
|
|
|
|
return {"valid": True, "error": None} |
|
|
|
|
|
except Exception as e: |
|
|
return {"valid": False, "error": str(e)} |
|
|
|
|
|
def format_query_result(self, |
|
|
data: Any, |
|
|
query_type: QueryType, |
|
|
execution_time: float = None, |
|
|
**kwargs) -> Dict[str, Any]: |
|
|
""" |
|
|
Format query results into a standard structure. |
|
|
|
|
|
Args: |
|
|
data: Raw query results |
|
|
query_type: Type of query that was executed |
|
|
execution_time: Time taken to execute the query |
|
|
**kwargs: Additional metadata |
|
|
|
|
|
Returns: |
|
|
Dict containing formatted results |
|
|
""" |
|
|
return { |
|
|
"success": True, |
|
|
"data": data, |
|
|
"query_type": query_type.value if query_type else None, |
|
|
"execution_time": execution_time, |
|
|
"row_count": len(data) if isinstance(data, (list, tuple)) else 1, |
|
|
"metadata": kwargs |
|
|
} |
|
|
|
|
|
def format_error_result(self, |
|
|
error: str, |
|
|
query_type: QueryType = None, |
|
|
**kwargs) -> Dict[str, Any]: |
|
|
""" |
|
|
Format error results into a standard structure. |
|
|
|
|
|
Args: |
|
|
error: Error message |
|
|
query_type: Type of query that failed |
|
|
**kwargs: Additional error metadata |
|
|
|
|
|
Returns: |
|
|
Dict containing formatted error results |
|
|
""" |
|
|
return { |
|
|
"success": False, |
|
|
"error": error, |
|
|
"query_type": query_type.value if query_type else None, |
|
|
"data": None, |
|
|
"execution_time": None, |
|
|
"row_count": 0, |
|
|
"metadata": kwargs |
|
|
} |
|
|
|
|
|
def get_supported_query_types(self) -> List[QueryType]: |
|
|
""" |
|
|
Get list of supported query types for this database. |
|
|
|
|
|
Returns: |
|
|
List of supported QueryType enums |
|
|
""" |
|
|
return [ |
|
|
QueryType.SELECT, |
|
|
QueryType.INSERT, |
|
|
QueryType.UPDATE, |
|
|
QueryType.DELETE, |
|
|
QueryType.CREATE, |
|
|
QueryType.DROP |
|
|
] |
|
|
|
|
|
def get_capabilities(self) -> Dict[str, Any]: |
|
|
""" |
|
|
Get database capabilities and features. |
|
|
|
|
|
Returns: |
|
|
Dict containing database capabilities |
|
|
""" |
|
|
return { |
|
|
"database_type": self.db_type.value, |
|
|
"supports_sql": False, |
|
|
"supports_aggregation": False, |
|
|
"supports_full_text_search": False, |
|
|
"supports_vector_search": False, |
|
|
"supports_transactions": False, |
|
|
"supports_indexing": True, |
|
|
"supported_query_types": [qt.value for qt in self.get_supported_query_types()], |
|
|
"connection_info": { |
|
|
"is_connected": self.connection is not None, |
|
|
"database_name": self.database_name |
|
|
} |
|
|
} |
|
|
|
|
|
def __enter__(self): |
|
|
"""Context manager entry""" |
|
|
if not self.connection: |
|
|
self.connect() |
|
|
return self |
|
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb): |
|
|
"""Context manager exit""" |
|
|
self.disconnect() |
|
|
|
|
|
def __del__(self): |
|
|
"""Cleanup on deletion""" |
|
|
try: |
|
|
self.disconnect() |
|
|
except Exception: |
|
|
pass |