| """PostgreSQL database connector (via SQLAlchemy + psycopg2).""" |
| from __future__ import annotations |
|
|
| from typing import List, Optional |
|
|
| import pandas as pd |
| from sqlalchemy import create_engine, inspect, text |
| from sqlalchemy.engine import Engine |
|
|
| from core.database.base import ConnectionConfig, DatabaseConnector |
|
|
|
|
| class PostgreSQLConnector(DatabaseConnector): |
| """Connects to a PostgreSQL database.""" |
|
|
| def __init__(self, config: ConnectionConfig) -> None: |
| super().__init__(config) |
| self._engine: Optional[Engine] = None |
|
|
| def _build_url(self) -> str: |
| p = self.config.params |
| user = p.get("user", "") |
| password = p.get("password", "") |
| host = p.get("host", "localhost") |
| port = p.get("port", 5432) |
| dbname = p.get("dbname", "") |
| if password: |
| return f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{dbname}" |
| return f"postgresql+psycopg2://{user}@{host}:{port}/{dbname}" |
|
|
| def connect(self) -> None: |
| try: |
| url = self._build_url() |
| self._engine = create_engine(url, pool_pre_ping=True) |
| |
| with self._engine.connect() as conn: |
| conn.execute(text("SELECT 1")) |
| self._connected = True |
| except Exception as e: |
| raise ConnectionError(f"PostgreSQL connection failed: {e}") from e |
|
|
| def disconnect(self) -> None: |
| if self._engine: |
| self._engine.dispose() |
| self._engine = None |
| self._connected = False |
|
|
| def list_tables(self) -> List[str]: |
| self._require_connected() |
| inspector = inspect(self._engine) |
| return inspector.get_table_names() |
|
|
| def get_columns(self, table: str) -> List[str]: |
| self._require_connected() |
| inspector = inspect(self._engine) |
| return [col["name"] for col in inspector.get_columns(table)] |
|
|
| def get_records( |
| self, |
| table: str, |
| query: Optional[str] = None, |
| limit: Optional[int] = None, |
| ) -> pd.DataFrame: |
| self._require_connected() |
| sql = f'SELECT * FROM "{table}"' |
| if query: |
| sql += f" WHERE {query}" |
| if limit: |
| sql += f" LIMIT {limit}" |
| with self._engine.connect() as conn: |
| return pd.read_sql_query(text(sql), conn) |
|
|
| def execute_raw(self, sql: str) -> pd.DataFrame: |
| """Run arbitrary read-only SQL.""" |
| self._require_connected() |
| with self._engine.connect() as conn: |
| return pd.read_sql_query(text(sql), conn) |
|
|
| def _require_connected(self) -> None: |
| if not self._connected or self._engine is None: |
| raise RuntimeError("Not connected. Call connect() first.") |
|
|