nilenpatel's picture
Initial release: pg_plan_cache agent
40eb9bf
"""
Database connection management for PostgreSQL and Redis.
"""
import redis
import psycopg2
import psycopg2.extras
from config import Config
class DatabaseManager:
"""Manages connections to PostgreSQL and Redis."""
def __init__(self, config: Config):
self.config = config
self._pg_conn = None
self._redis_conn = None
# -- PostgreSQL ---------------------------------------------------------
@property
def pg(self):
"""Return a live PostgreSQL connection, reconnecting if needed."""
if self._pg_conn is None or self._pg_conn.closed:
self._pg_conn = psycopg2.connect(
host=self.config.pg_host,
port=self.config.pg_port,
dbname=self.config.pg_database,
user=self.config.pg_user,
password=self.config.pg_password,
)
self._pg_conn.autocommit = True
return self._pg_conn
def pg_query(self, sql: str, params=None) -> list[dict]:
"""Execute a SQL query and return rows as dicts."""
with self.pg.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(sql, params)
if cur.description:
return [dict(row) for row in cur.fetchall()]
return []
def pg_execute(self, sql: str, params=None) -> str:
"""Execute a SQL statement and return status message."""
with self.pg.cursor() as cur:
cur.execute(sql, params)
return cur.statusmessage
# -- Redis --------------------------------------------------------------
@property
def rds(self) -> redis.Redis:
"""Return a live Redis connection."""
if self._redis_conn is None:
self._redis_conn = redis.Redis(
host=self.config.redis_host,
port=self.config.redis_port,
password=self.config.redis_password or None,
db=self.config.redis_db,
decode_responses=True,
socket_timeout=5,
)
return self._redis_conn
def redis_ping(self) -> bool:
try:
return self.rds.ping()
except redis.ConnectionError:
return False
def redis_get(self, key: str) -> str | None:
return self.rds.get(key)
def redis_delete(self, key: str) -> int:
return self.rds.delete(key)
def redis_keys(self, pattern: str) -> list[str]:
"""Return keys matching a pattern using SCAN (production-safe)."""
keys = []
cursor = 0
while True:
cursor, batch = self.rds.scan(cursor=cursor, match=pattern, count=200)
keys.extend(batch)
if cursor == 0:
break
return keys
def redis_smembers(self, key: str) -> set:
return self.rds.smembers(key)
def redis_ttl(self, key: str) -> int:
return self.rds.ttl(key)
def redis_info(self, section: str = "all") -> dict:
return self.rds.info(section)
# -- Cleanup ------------------------------------------------------------
def close(self):
if self._pg_conn and not self._pg_conn.closed:
self._pg_conn.close()
if self._redis_conn:
self._redis_conn.close()