RobotPai / src /database /connection_pool.py
atr0p05's picture
Upload 291 files
8a682b5 verified
"""
Database connection pooling with retry logic
"""
import asyncio
from typing import Optional, Dict, Any, List
from contextlib import asynccontextmanager
import time
from dataclasses import dataclass
from datetime import datetime
import aiohttp
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type
)
from src.utils.logging import get_logger
logger = get_logger(__name__)
@dataclass
class ConnectionStats:
"""Track connection statistics"""
created: int = 0
active: int = 0
idle: int = 0
failed: int = 0
total_wait_time: float = 0
last_error: Optional[str] = None
last_error_time: Optional[datetime] = None
class DatabaseConnection:
"""Wrapper for a database connection"""
def __init__(self, connection_id: str, pool):
self.id = connection_id
self.pool = pool
self.created_at = datetime.now()
self.last_used = datetime.now()
self.in_use = False
self.connection = None
async def execute(self, query: str, params: Optional[Dict] = None) -> Any:
"""Execute a query with automatic retry"""
self.last_used = datetime.now()
return await self.pool.execute_with_retry(self.connection, query, params)
def release(self):
"""Release connection back to pool"""
self.in_use = False
self.pool.release_connection(self)
class DatabasePool:
"""Connection pool with advanced features"""
def __init__(
self,
url: str,
key: str,
pool_size: int = 10,
max_overflow: int = 5,
connection_timeout: float = 30.0,
query_timeout: float = 60.0,
max_retries: int = 3
):
self.url = url
self.key = key
self.pool_size = pool_size
self.max_overflow = max_overflow
self.connection_timeout = connection_timeout
self.query_timeout = query_timeout
self.max_retries = max_retries
# Connection tracking
self.connections: List[DatabaseConnection] = []
self.available_connections: asyncio.Queue = asyncio.Queue()
self.stats = ConnectionStats()
# Health monitoring
self._health_check_task = None
self._closed = False
async def initialize(self):
"""Initialize the connection pool"""
logger.info(f"Initializing database pool with {self.pool_size} connections...")
# Create initial connections
for i in range(self.pool_size):
try:
conn = await self._create_connection(f"conn_{i}")
self.connections.append(conn)
await self.available_connections.put(conn)
self.stats.created += 1
self.stats.idle += 1
except Exception as e:
logger.error(f"Failed to create connection {i}: {e}")
self.stats.failed += 1
# Start health check task
self._health_check_task = asyncio.create_task(self._health_check_loop())
logger.info(f"Pool initialized with {len(self.connections)} connections")
async def _create_connection(self, connection_id: str) -> DatabaseConnection:
"""Create a new database connection"""
conn = DatabaseConnection(connection_id, self)
# Initialize the actual connection (Supabase client)
from supabase import create_client
conn.connection = create_client(self.url, self.key)
return conn
@asynccontextmanager
async def acquire(self):
"""Acquire a connection from the pool"""
start_time = time.time()
connection = None
try:
# Try to get an available connection
try:
connection = await asyncio.wait_for(
self.available_connections.get(),
timeout=self.connection_timeout
)
except asyncio.TimeoutError:
# Try to create overflow connection
if len(self.connections) < self.pool_size + self.max_overflow:
logger.warning("Pool exhausted, creating overflow connection")
connection = await self._create_connection(f"overflow_{len(self.connections)}")
self.connections.append(connection)
self.stats.created += 1
else:
raise TimeoutError("Connection pool exhausted")
# Mark as in use
connection.in_use = True
self.stats.active += 1
self.stats.idle -= 1
self.stats.total_wait_time += time.time() - start_time
yield connection
finally:
# Release connection back to pool
if connection:
connection.in_use = False
self.stats.active -= 1
self.stats.idle += 1
if not self._closed:
await self.available_connections.put(connection)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
retry=retry_if_exception_type((aiohttp.ClientError, asyncio.TimeoutError))
)
async def execute_with_retry(self, connection, query: str, params: Optional[Dict] = None):
"""Execute query with retry logic"""
try:
# Execute the query
result = await asyncio.wait_for(
connection.table(query).select("*").execute(),
timeout=self.query_timeout
)
return result
except Exception as e:
logger.error(f"Query execution failed: {e}")
self.stats.last_error = str(e)
self.stats.last_error_time = datetime.now()
raise
async def execute(self, query: str, params: Optional[Dict] = None) -> Any:
"""Execute a query using a connection from the pool"""
async with self.acquire() as conn:
return await conn.execute(query, params)
async def _health_check_loop(self):
"""Periodic health check of connections"""
while not self._closed:
try:
await asyncio.sleep(60) # Check every minute
# Test each idle connection
idle_connections = [c for c in self.connections if not c.in_use]
for conn in idle_connections:
try:
# Simple health check query
await conn.execute("health_check", {})
except Exception as e:
logger.warning(f"Health check failed for {conn.id}: {e}")
# Mark connection for recreation
# In production, implement connection recreation logic
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Health check loop error: {e}")
def get_stats(self) -> Dict[str, Any]:
"""Get pool statistics"""
return {
'created': self.stats.created,
'active': self.stats.active,
'idle': self.stats.idle,
'failed': self.stats.failed,
'average_wait_time': (
self.stats.total_wait_time / self.stats.created
if self.stats.created > 0 else 0
),
'last_error': self.stats.last_error,
'last_error_time': (
self.stats.last_error_time.isoformat()
if self.stats.last_error_time else None
),
'pool_size': self.pool_size,
'max_overflow': self.max_overflow
}
async def close(self):
"""Close all connections in the pool"""
logger.info("Closing database pool...")
self._closed = True
# Cancel health check
if self._health_check_task:
self._health_check_task.cancel()
# Close all connections
for conn in self.connections:
try:
# Supabase client doesn't need explicit close
pass
except Exception as e:
logger.error(f"Error closing connection {conn.id}: {e}")
logger.info("Database pool closed")