Spaces:
Sleeping
Sleeping
| """ | |
| Database connection management for PostgreSQL and Redis. | |
| """ | |
| import redis | |
| import psycopg2 | |
| import psycopg2.extras | |
| from config import Config | |
| class DatabaseManager: | |
| """Manages connections to PostgreSQL and Redis.""" | |
| def __init__(self, config: Config): | |
| self.config = config | |
| self._pg_conn = None | |
| self._redis_conn = None | |
| # -- PostgreSQL --------------------------------------------------------- | |
| def pg(self): | |
| """Return a live PostgreSQL connection, reconnecting if needed.""" | |
| if self._pg_conn is None or self._pg_conn.closed: | |
| self._pg_conn = psycopg2.connect( | |
| host=self.config.pg_host, | |
| port=self.config.pg_port, | |
| dbname=self.config.pg_database, | |
| user=self.config.pg_user, | |
| password=self.config.pg_password, | |
| ) | |
| self._pg_conn.autocommit = True | |
| return self._pg_conn | |
| def pg_query(self, sql: str, params=None) -> list[dict]: | |
| """Execute a SQL query and return rows as dicts.""" | |
| with self.pg.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: | |
| cur.execute(sql, params) | |
| if cur.description: | |
| return [dict(row) for row in cur.fetchall()] | |
| return [] | |
| def pg_execute(self, sql: str, params=None) -> str: | |
| """Execute a SQL statement and return status message.""" | |
| with self.pg.cursor() as cur: | |
| cur.execute(sql, params) | |
| return cur.statusmessage | |
| # -- Redis -------------------------------------------------------------- | |
| def rds(self) -> redis.Redis: | |
| """Return a live Redis connection.""" | |
| if self._redis_conn is None: | |
| self._redis_conn = redis.Redis( | |
| host=self.config.redis_host, | |
| port=self.config.redis_port, | |
| password=self.config.redis_password or None, | |
| db=self.config.redis_db, | |
| decode_responses=True, | |
| socket_timeout=5, | |
| ) | |
| return self._redis_conn | |
| def redis_ping(self) -> bool: | |
| try: | |
| return self.rds.ping() | |
| except redis.ConnectionError: | |
| return False | |
| def redis_get(self, key: str) -> str | None: | |
| return self.rds.get(key) | |
| def redis_delete(self, key: str) -> int: | |
| return self.rds.delete(key) | |
| def redis_keys(self, pattern: str) -> list[str]: | |
| """Return keys matching a pattern using SCAN (production-safe).""" | |
| keys = [] | |
| cursor = 0 | |
| while True: | |
| cursor, batch = self.rds.scan(cursor=cursor, match=pattern, count=200) | |
| keys.extend(batch) | |
| if cursor == 0: | |
| break | |
| return keys | |
| def redis_smembers(self, key: str) -> set: | |
| return self.rds.smembers(key) | |
| def redis_ttl(self, key: str) -> int: | |
| return self.rds.ttl(key) | |
| def redis_info(self, section: str = "all") -> dict: | |
| return self.rds.info(section) | |
| # -- Cleanup ------------------------------------------------------------ | |
| def close(self): | |
| if self._pg_conn and not self._pg_conn.closed: | |
| self._pg_conn.close() | |
| if self._redis_conn: | |
| self._redis_conn.close() | |