""" Database connection management for PostgreSQL and Redis. """ import redis import psycopg2 import psycopg2.extras from config import Config class DatabaseManager: """Manages connections to PostgreSQL and Redis.""" def __init__(self, config: Config): self.config = config self._pg_conn = None self._redis_conn = None # -- PostgreSQL --------------------------------------------------------- @property def pg(self): """Return a live PostgreSQL connection, reconnecting if needed.""" if self._pg_conn is None or self._pg_conn.closed: self._pg_conn = psycopg2.connect( host=self.config.pg_host, port=self.config.pg_port, dbname=self.config.pg_database, user=self.config.pg_user, password=self.config.pg_password, ) self._pg_conn.autocommit = True return self._pg_conn def pg_query(self, sql: str, params=None) -> list[dict]: """Execute a SQL query and return rows as dicts.""" with self.pg.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute(sql, params) if cur.description: return [dict(row) for row in cur.fetchall()] return [] def pg_execute(self, sql: str, params=None) -> str: """Execute a SQL statement and return status message.""" with self.pg.cursor() as cur: cur.execute(sql, params) return cur.statusmessage # -- Redis -------------------------------------------------------------- @property def rds(self) -> redis.Redis: """Return a live Redis connection.""" if self._redis_conn is None: self._redis_conn = redis.Redis( host=self.config.redis_host, port=self.config.redis_port, password=self.config.redis_password or None, db=self.config.redis_db, decode_responses=True, socket_timeout=5, ) return self._redis_conn def redis_ping(self) -> bool: try: return self.rds.ping() except redis.ConnectionError: return False def redis_get(self, key: str) -> str | None: return self.rds.get(key) def redis_delete(self, key: str) -> int: return self.rds.delete(key) def redis_keys(self, pattern: str) -> list[str]: """Return keys matching a pattern using SCAN (production-safe).""" keys = [] cursor = 0 while True: cursor, batch = self.rds.scan(cursor=cursor, match=pattern, count=200) keys.extend(batch) if cursor == 0: break return keys def redis_smembers(self, key: str) -> set: return self.rds.smembers(key) def redis_ttl(self, key: str) -> int: return self.rds.ttl(key) def redis_info(self, section: str = "all") -> dict: return self.rds.info(section) # -- Cleanup ------------------------------------------------------------ def close(self): if self._pg_conn and not self._pg_conn.closed: self._pg_conn.close() if self._redis_conn: self._redis_conn.close()