File size: 3,308 Bytes
40eb9bf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
"""
Database connection management for PostgreSQL and Redis.
"""

import redis
import psycopg2
import psycopg2.extras
from config import Config


class DatabaseManager:
    """Manages connections to PostgreSQL and Redis."""

    def __init__(self, config: Config):
        self.config = config
        self._pg_conn = None
        self._redis_conn = None

    # -- PostgreSQL ---------------------------------------------------------

    @property
    def pg(self):
        """Return a live PostgreSQL connection, reconnecting if needed."""
        if self._pg_conn is None or self._pg_conn.closed:
            self._pg_conn = psycopg2.connect(
                host=self.config.pg_host,
                port=self.config.pg_port,
                dbname=self.config.pg_database,
                user=self.config.pg_user,
                password=self.config.pg_password,
            )
            self._pg_conn.autocommit = True
        return self._pg_conn

    def pg_query(self, sql: str, params=None) -> list[dict]:
        """Execute a SQL query and return rows as dicts."""
        with self.pg.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
            cur.execute(sql, params)
            if cur.description:
                return [dict(row) for row in cur.fetchall()]
            return []

    def pg_execute(self, sql: str, params=None) -> str:
        """Execute a SQL statement and return status message."""
        with self.pg.cursor() as cur:
            cur.execute(sql, params)
            return cur.statusmessage

    # -- Redis --------------------------------------------------------------

    @property
    def rds(self) -> redis.Redis:
        """Return a live Redis connection."""
        if self._redis_conn is None:
            self._redis_conn = redis.Redis(
                host=self.config.redis_host,
                port=self.config.redis_port,
                password=self.config.redis_password or None,
                db=self.config.redis_db,
                decode_responses=True,
                socket_timeout=5,
            )
        return self._redis_conn

    def redis_ping(self) -> bool:
        try:
            return self.rds.ping()
        except redis.ConnectionError:
            return False

    def redis_get(self, key: str) -> str | None:
        return self.rds.get(key)

    def redis_delete(self, key: str) -> int:
        return self.rds.delete(key)

    def redis_keys(self, pattern: str) -> list[str]:
        """Return keys matching a pattern using SCAN (production-safe)."""
        keys = []
        cursor = 0
        while True:
            cursor, batch = self.rds.scan(cursor=cursor, match=pattern, count=200)
            keys.extend(batch)
            if cursor == 0:
                break
        return keys

    def redis_smembers(self, key: str) -> set:
        return self.rds.smembers(key)

    def redis_ttl(self, key: str) -> int:
        return self.rds.ttl(key)

    def redis_info(self, section: str = "all") -> dict:
        return self.rds.info(section)

    # -- Cleanup ------------------------------------------------------------

    def close(self):
        if self._pg_conn and not self._pg_conn.closed:
            self._pg_conn.close()
        if self._redis_conn:
            self._redis_conn.close()