File size: 11,659 Bytes
330b6e4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
"""

Connection pooling utilities for database and Redis connections.



This module provides optimized connection pooling for both PostgreSQL and Redis

to improve performance under concurrent load.

"""

import os
import logging
from typing import Optional, Dict, Any
from contextlib import contextmanager

import redis
from redis.connection import ConnectionPool
from sqlalchemy import create_engine, event
from sqlalchemy.engine import Engine
from sqlalchemy.pool import QueuePool, StaticPool

logger = logging.getLogger(__name__)


class DatabaseConnectionPool:
    """Manages optimized database connection pooling."""
    
    def __init__(self, database_url: str, **kwargs):
        """

        Initialize database connection pool.

        

        Args:

            database_url: Database connection URL

            **kwargs: Additional engine options

        """
        self.database_url = database_url
        
        # Default pool configuration optimized for chat workload
        default_config = {
            'pool_size': int(os.getenv('DB_POOL_SIZE', '10')),
            'max_overflow': int(os.getenv('DB_MAX_OVERFLOW', '20')),
            'pool_recycle': int(os.getenv('DB_POOL_RECYCLE', '3600')),  # 1 hour
            'pool_pre_ping': True,  # Validate connections before use
            'pool_timeout': int(os.getenv('DB_POOL_TIMEOUT', '30')),
            'echo': os.getenv('SQLALCHEMY_ECHO', 'False').lower() == 'true'
        }
        
        # Override with provided kwargs
        default_config.update(kwargs)
        
        # Create engine with optimized settings
        self.engine = create_engine(
            database_url,
            poolclass=QueuePool,
            **default_config
        )
        
        # Add connection event listeners for monitoring
        self._setup_connection_events()
        
        logger.info(f"Database connection pool initialized", extra={
            'pool_size': default_config['pool_size'],
            'max_overflow': default_config['max_overflow'],
            'pool_recycle': default_config['pool_recycle']
        })
    
    def _setup_connection_events(self):
        """Setup SQLAlchemy event listeners for connection monitoring."""
        
        @event.listens_for(self.engine, "connect")
        def set_sqlite_pragma(dbapi_connection, connection_record):
            """Set SQLite pragmas for better performance (if using SQLite)."""
            if 'sqlite' in self.database_url.lower():
                cursor = dbapi_connection.cursor()
                cursor.execute("PRAGMA foreign_keys=ON")
                cursor.execute("PRAGMA journal_mode=WAL")
                cursor.execute("PRAGMA synchronous=NORMAL")
                cursor.execute("PRAGMA cache_size=10000")
                cursor.execute("PRAGMA temp_store=MEMORY")
                cursor.close()
        
        @event.listens_for(self.engine, "checkout")
        def receive_checkout(dbapi_connection, connection_record, connection_proxy):
            """Log connection checkout for monitoring."""
            logger.debug("Database connection checked out from pool")
        
        @event.listens_for(self.engine, "checkin")
        def receive_checkin(dbapi_connection, connection_record):
            """Log connection checkin for monitoring."""
            logger.debug("Database connection returned to pool")
    
    def get_pool_status(self) -> Dict[str, Any]:
        """

        Get current pool status for monitoring.

        

        Returns:

            Dictionary with pool statistics

        """
        pool = self.engine.pool
        
        status = {
            'pool_size': pool.size(),
            'checked_in': pool.checkedin(),
            'checked_out': pool.checkedout(),
            'overflow': pool.overflow()
        }
        
        # Add invalid count if available (not all pool types have this)
        if hasattr(pool, 'invalid'):
            status['invalid'] = pool.invalid()
        else:
            status['invalid'] = 0
            
        return status
    
    @contextmanager
    def get_connection(self):
        """

        Context manager for getting database connections.

        

        Yields:

            Database connection from the pool

        """
        connection = self.engine.connect()
        try:
            yield connection
        finally:
            connection.close()


class RedisConnectionPool:
    """Manages optimized Redis connection pooling."""
    
    def __init__(self, redis_url: str, **kwargs):
        """

        Initialize Redis connection pool.

        

        Args:

            redis_url: Redis connection URL

            **kwargs: Additional pool options

        """
        self.redis_url = redis_url
        
        # Default pool configuration optimized for chat workload
        default_config = {
            'max_connections': int(os.getenv('REDIS_MAX_CONNECTIONS', '20')),
            'retry_on_timeout': True,
            'socket_timeout': int(os.getenv('REDIS_SOCKET_TIMEOUT', '5')),
            'socket_connect_timeout': int(os.getenv('REDIS_CONNECT_TIMEOUT', '5')),
            'socket_keepalive': True,
            'socket_keepalive_options': {},
            'health_check_interval': int(os.getenv('REDIS_HEALTH_CHECK_INTERVAL', '30'))
        }
        
        # Override with provided kwargs
        default_config.update(kwargs)
        
        # Create connection pool
        self.connection_pool = ConnectionPool.from_url(
            redis_url,
            **default_config
        )
        
        # Create Redis client with the pool
        self.redis_client = redis.Redis(connection_pool=self.connection_pool)
        
        # Test connection
        try:
            self.redis_client.ping()
            logger.info(f"Redis connection pool initialized", extra={
                'max_connections': default_config['max_connections'],
                'socket_timeout': default_config['socket_timeout']
            })
        except redis.RedisError as e:
            logger.error(f"Failed to initialize Redis connection pool: {e}")
            raise
    
    def get_client(self) -> redis.Redis:
        """

        Get Redis client with connection pooling.

        

        Returns:

            Redis client instance

        """
        return self.redis_client
    
    def get_pool_status(self) -> Dict[str, Any]:
        """

        Get current pool status for monitoring.

        

        Returns:

            Dictionary with pool statistics

        """
        pool = self.connection_pool
        
        status = {
            'max_connections': getattr(pool, 'max_connections', 0),
        }
        
        # Add connection counts if available (attributes may vary by Redis version)
        if hasattr(pool, '_created_connections'):
            status['created_connections'] = pool._created_connections
        elif hasattr(pool, 'created_connections'):
            status['created_connections'] = pool.created_connections
        else:
            status['created_connections'] = 0
            
        if hasattr(pool, '_available_connections'):
            status['available_connections'] = len(pool._available_connections)
        else:
            status['available_connections'] = 0
            
        if hasattr(pool, '_in_use_connections'):
            status['in_use_connections'] = len(pool._in_use_connections)
        else:
            status['in_use_connections'] = 0
            
        return status
    
    def health_check(self) -> bool:
        """

        Perform health check on Redis connection.

        

        Returns:

            True if Redis is healthy, False otherwise

        """
        try:
            self.redis_client.ping()
            return True
        except redis.RedisError as e:
            logger.warning(f"Redis health check failed: {e}")
            return False
    
    def close(self):
        """Close all connections in the pool."""
        try:
            self.connection_pool.disconnect()
            logger.info("Redis connection pool closed")
        except Exception as e:
            logger.error(f"Error closing Redis connection pool: {e}")


class ConnectionPoolManager:
    """Manages both database and Redis connection pools."""
    
    def __init__(self, database_url: str, redis_url: Optional[str] = None):
        """

        Initialize connection pool manager.

        

        Args:

            database_url: Database connection URL

            redis_url: Redis connection URL (optional)

        """
        self.database_pool = DatabaseConnectionPool(database_url)
        
        self.redis_pool = None
        if redis_url and redis_url != 'None':
            try:
                self.redis_pool = RedisConnectionPool(redis_url)
            except Exception as e:
                logger.warning(f"Failed to initialize Redis pool: {e}")
        
        logger.info("Connection pool manager initialized")
    
    def get_database_engine(self) -> Engine:
        """Get database engine with connection pooling."""
        return self.database_pool.engine
    
    def get_redis_client(self) -> Optional[redis.Redis]:
        """Get Redis client with connection pooling."""
        return self.redis_pool.get_client() if self.redis_pool else None
    
    def get_status(self) -> Dict[str, Any]:
        """

        Get status of all connection pools.

        

        Returns:

            Dictionary with pool status information

        """
        status = {
            'database': self.database_pool.get_pool_status(),
            'redis': None
        }
        
        if self.redis_pool:
            status['redis'] = self.redis_pool.get_pool_status()
            status['redis']['healthy'] = self.redis_pool.health_check()
        
        return status
    
    def close_all(self):
        """Close all connection pools."""
        try:
            self.database_pool.engine.dispose()
            logger.info("Database connection pool closed")
        except Exception as e:
            logger.error(f"Error closing database pool: {e}")
        
        if self.redis_pool:
            self.redis_pool.close()


# Global connection pool manager instance
_connection_pool_manager: Optional[ConnectionPoolManager] = None


def initialize_connection_pools(database_url: str, redis_url: Optional[str] = None) -> ConnectionPoolManager:
    """

    Initialize global connection pool manager.

    

    Args:

        database_url: Database connection URL

        redis_url: Redis connection URL (optional)

        

    Returns:

        ConnectionPoolManager instance

    """
    global _connection_pool_manager
    
    if _connection_pool_manager is None:
        _connection_pool_manager = ConnectionPoolManager(database_url, redis_url)
    
    return _connection_pool_manager


def get_connection_pool_manager() -> Optional[ConnectionPoolManager]:
    """

    Get the global connection pool manager.

    

    Returns:

        ConnectionPoolManager instance or None if not initialized

    """
    return _connection_pool_manager


def cleanup_connection_pools():
    """Cleanup all connection pools."""
    global _connection_pool_manager
    
    if _connection_pool_manager:
        _connection_pool_manager.close_all()
        _connection_pool_manager = None