selfevolveagent / evoagentx /tools /database_base.py
iLOVE2D's picture
Upload 2846 files
5374a2d verified
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Union
from enum import Enum
class DatabaseType(Enum):
"""Enumeration of supported database types"""
MONGODB = "mongodb"
POSTGRESQL = "postgresql"
MYSQL = "mysql"
SQLITE = "sqlite"
REDIS = "redis"
ELASTICSEARCH = "elasticsearch"
NEO4J = "neo4j"
VECTOR = "vector" # For vector databases like Pinecone, Weaviate, etc.
class QueryType(Enum):
"""Enumeration of query types"""
SELECT = "select"
INSERT = "insert"
UPDATE = "update"
DELETE = "delete"
CREATE = "create"
DROP = "drop"
ALTER = "alter"
INDEX = "index"
AGGREGATE = "aggregate" # For MongoDB aggregation pipelines
SEARCH = "search" # For full-text search
VECTOR_SEARCH = "vector_search" # For vector similarity search
class DatabaseConnection:
"""Base class for database connection management"""
def __init__(self, connection_string: str, **kwargs):
self.connection_string = connection_string
self.connection_params = kwargs
self._connection = None
self._is_connected = False
@property
def is_connected(self) -> bool:
return self._is_connected
@abstractmethod
def connect(self) -> bool:
"""Establish connection to the database"""
pass
@abstractmethod
def disconnect(self) -> bool:
"""Close connection to the database"""
pass
@abstractmethod
def test_connection(self) -> bool:
"""Test if the connection is working"""
pass
class DatabaseBase(ABC):
"""
Abstract base class for database operations.
Provides a common interface for different database types.
"""
def __init__(self,
connection_string: str = None,
database_name: str = None,
**kwargs):
"""
Initialize the database base.
Args:
connection_string: Database connection string
database_name: Name of the database to use
**kwargs: Additional connection parameters
"""
self.connection_string = connection_string
self.database_name = database_name
self.connection_params = kwargs
self.db_type = self._get_database_type()
self.connection = None
self._is_initialized = False
# Initialize connection if connection string is provided
if connection_string:
self.connect()
@abstractmethod
def _get_database_type(self) -> DatabaseType:
"""Return the database type"""
pass
@abstractmethod
def connect(self) -> bool:
"""
Establish connection to the database.
Returns:
bool: True if connection successful, False otherwise
"""
pass
@abstractmethod
def disconnect(self) -> bool:
"""
Close connection to the database.
Returns:
bool: True if disconnection successful, False otherwise
"""
pass
@abstractmethod
def test_connection(self) -> bool:
"""
Test if the database connection is working.
Returns:
bool: True if connection is working, False otherwise
"""
pass
@abstractmethod
def execute_query(self,
query: Union[str, Dict, List],
query_type: QueryType = None,
**kwargs) -> Dict[str, Any]:
"""
Execute a query on the database.
Args:
query: The query to execute (string for SQL, dict/list for NoSQL)
query_type: Type of query being executed
**kwargs: Additional query parameters
Returns:
Dict containing query results and metadata
"""
pass
@abstractmethod
def get_database_info(self) -> Dict[str, Any]:
"""
Get information about the database.
Returns:
Dict containing database information
"""
pass
@abstractmethod
def list_collections(self) -> List[str]:
"""
List all collections/tables in the database.
Returns:
List of collection/table names
"""
pass
@abstractmethod
def get_collection_info(self, collection_name: str) -> Dict[str, Any]:
"""
Get information about a specific collection/table.
Args:
collection_name: Name of the collection/table
Returns:
Dict containing collection/table information
"""
pass
@abstractmethod
def get_schema(self, collection_name: str = None) -> Dict[str, Any]:
"""
Get the schema/structure of the database or a specific collection.
Args:
collection_name: Name of the collection/table (optional)
Returns:
Dict containing schema information
"""
pass
def validate_query(self, query: Union[str, Dict, List]) -> Dict[str, Any]:
"""
Validate a query before execution.
Args:
query: The query to validate
Returns:
Dict containing validation results
"""
try:
# Basic validation - child classes can override for specific validation
if isinstance(query, str):
if not query.strip():
return {"valid": False, "error": "Query cannot be empty"}
elif isinstance(query, (dict, list)):
if not query:
return {"valid": False, "error": "Query cannot be empty"}
else:
return {"valid": False, "error": f"Unsupported query type: {type(query)}"}
return {"valid": True, "error": None}
except Exception as e:
return {"valid": False, "error": str(e)}
def format_query_result(self,
data: Any,
query_type: QueryType,
execution_time: float = None,
**kwargs) -> Dict[str, Any]:
"""
Format query results into a standard structure.
Args:
data: Raw query results
query_type: Type of query that was executed
execution_time: Time taken to execute the query
**kwargs: Additional metadata
Returns:
Dict containing formatted results
"""
return {
"success": True,
"data": data,
"query_type": query_type.value if query_type else None,
"execution_time": execution_time,
"row_count": len(data) if isinstance(data, (list, tuple)) else 1,
"metadata": kwargs
}
def format_error_result(self,
error: str,
query_type: QueryType = None,
**kwargs) -> Dict[str, Any]:
"""
Format error results into a standard structure.
Args:
error: Error message
query_type: Type of query that failed
**kwargs: Additional error metadata
Returns:
Dict containing formatted error results
"""
return {
"success": False,
"error": error,
"query_type": query_type.value if query_type else None,
"data": None,
"execution_time": None,
"row_count": 0,
"metadata": kwargs
}
def get_supported_query_types(self) -> List[QueryType]:
"""
Get list of supported query types for this database.
Returns:
List of supported QueryType enums
"""
return [
QueryType.SELECT,
QueryType.INSERT,
QueryType.UPDATE,
QueryType.DELETE,
QueryType.CREATE,
QueryType.DROP
]
def get_capabilities(self) -> Dict[str, Any]:
"""
Get database capabilities and features.
Returns:
Dict containing database capabilities
"""
return {
"database_type": self.db_type.value,
"supports_sql": False, # Override in SQL databases
"supports_aggregation": False, # Override in document databases
"supports_full_text_search": False, # Override in search databases
"supports_vector_search": False, # Override in vector databases
"supports_transactions": False, # Override if supported
"supports_indexing": True,
"supported_query_types": [qt.value for qt in self.get_supported_query_types()],
"connection_info": {
"is_connected": self.connection is not None,
"database_name": self.database_name
}
}
def __enter__(self):
"""Context manager entry"""
if not self.connection:
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit"""
self.disconnect()
def __del__(self):
"""Cleanup on deletion"""
try:
self.disconnect()
except Exception:
pass