| |
| import os |
| from sqlalchemy import create_engine, text |
| from sqlalchemy.orm import sessionmaker |
| from dotenv import load_dotenv |
|
|
| load_dotenv() |
|
|
| DB_HOST = os.getenv("DB_HOST") |
| DB_PORT = os.getenv("DB_PORT") |
| DB_NAME = os.getenv("DB_NAME") |
| DB_USER = os.getenv("DB_USER") |
| DB_PASSWORD = os.getenv("DB_PASSWORD") |
|
|
| DATABASE_URL = None |
| engine = None |
| SessionLocal = None |
|
|
| if all([DB_HOST, DB_NAME, DB_USER, DB_PASSWORD]): |
| DATABASE_URL = f"postgresql+psycopg://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}" |
| engine = create_engine(DATABASE_URL, pool_pre_ping=True) |
| SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False) |
|
|
|
|
| def _require_engine(): |
| if engine is None: |
| raise RuntimeError( |
| "Database is not configured. Set DB_HOST, DB_PORT, DB_NAME, DB_USER, and DB_PASSWORD." |
| ) |
|
|
| def fetch_all(query, params=None): |
| _require_engine() |
| with engine.connect() as conn: |
| result = conn.execute(text(query), params or {}) |
| return [dict(row._mapping) for row in result] |
|
|
| def fetch_one(query, params=None): |
| _require_engine() |
| with engine.connect() as conn: |
| result = conn.execute(text(query), params or {}) |
| row = result.fetchone() |
| return dict(row._mapping) if row else None |
|
|
| def execute_query(query, params=None): |
| _require_engine() |
| with engine.begin() as conn: |
| result = conn.execute(text(query), params or {}) |
| return result.rowcount |
|
|