AbhijitClemson's picture
Update db.py
d695885 verified
# db.py
import os
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from dotenv import load_dotenv
load_dotenv()
DB_HOST = os.getenv("DB_HOST")
DB_PORT = os.getenv("DB_PORT")
DB_NAME = os.getenv("DB_NAME")
DB_USER = os.getenv("DB_USER")
DB_PASSWORD = os.getenv("DB_PASSWORD")
DATABASE_URL = None
engine = None
SessionLocal = None
if all([DB_HOST, DB_NAME, DB_USER, DB_PASSWORD]):
DATABASE_URL = f"postgresql+psycopg://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
engine = create_engine(DATABASE_URL, pool_pre_ping=True)
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False)
def _require_engine():
if engine is None:
raise RuntimeError(
"Database is not configured. Set DB_HOST, DB_PORT, DB_NAME, DB_USER, and DB_PASSWORD."
)
def fetch_all(query, params=None):
_require_engine()
with engine.connect() as conn:
result = conn.execute(text(query), params or {})
return [dict(row._mapping) for row in result]
def fetch_one(query, params=None):
_require_engine()
with engine.connect() as conn:
result = conn.execute(text(query), params or {})
row = result.fetchone()
return dict(row._mapping) if row else None
def execute_query(query, params=None):
_require_engine()
with engine.begin() as conn:
result = conn.execute(text(query), params or {})
return result.rowcount