File size: 2,805 Bytes
99f834c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
"""PostgreSQL database connector (via SQLAlchemy + psycopg2)."""
from __future__ import annotations

from typing import List, Optional

import pandas as pd
from sqlalchemy import create_engine, inspect, text
from sqlalchemy.engine import Engine

from core.database.base import ConnectionConfig, DatabaseConnector


class PostgreSQLConnector(DatabaseConnector):
    """Connects to a PostgreSQL database."""

    def __init__(self, config: ConnectionConfig) -> None:
        super().__init__(config)
        self._engine: Optional[Engine] = None

    def _build_url(self) -> str:
        p = self.config.params
        user = p.get("user", "")
        password = p.get("password", "")
        host = p.get("host", "localhost")
        port = p.get("port", 5432)
        dbname = p.get("dbname", "")
        if password:
            return f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{dbname}"
        return f"postgresql+psycopg2://{user}@{host}:{port}/{dbname}"

    def connect(self) -> None:
        try:
            url = self._build_url()
            self._engine = create_engine(url, pool_pre_ping=True)
            # Test the connection
            with self._engine.connect() as conn:
                conn.execute(text("SELECT 1"))
            self._connected = True
        except Exception as e:
            raise ConnectionError(f"PostgreSQL connection failed: {e}") from e

    def disconnect(self) -> None:
        if self._engine:
            self._engine.dispose()
            self._engine = None
        self._connected = False

    def list_tables(self) -> List[str]:
        self._require_connected()
        inspector = inspect(self._engine)
        return inspector.get_table_names()

    def get_columns(self, table: str) -> List[str]:
        self._require_connected()
        inspector = inspect(self._engine)
        return [col["name"] for col in inspector.get_columns(table)]

    def get_records(
        self,
        table: str,
        query: Optional[str] = None,
        limit: Optional[int] = None,
    ) -> pd.DataFrame:
        self._require_connected()
        sql = f'SELECT * FROM "{table}"'
        if query:
            sql += f" WHERE {query}"
        if limit:
            sql += f" LIMIT {limit}"
        with self._engine.connect() as conn:  # type: ignore[union-attr]
            return pd.read_sql_query(text(sql), conn)

    def execute_raw(self, sql: str) -> pd.DataFrame:
        """Run arbitrary read-only SQL."""
        self._require_connected()
        with self._engine.connect() as conn:  # type: ignore[union-attr]
            return pd.read_sql_query(text(sql), conn)

    def _require_connected(self) -> None:
        if not self._connected or self._engine is None:
            raise RuntimeError("Not connected. Call connect() first.")