"""Database connection helper for the ML pipeline.""" import psycopg2 import psycopg2.extras from config import DB def get_conn(): """Return a new psycopg2 connection.""" return psycopg2.connect( host=DB["host"], port=DB["port"], dbname=DB["dbname"], user=DB["user"], password=DB["password"], ) def fetch_all(sql, params=None): with get_conn() as conn: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute(sql, params or ()) return cur.fetchall() def fetch_one(sql, params=None): with get_conn() as conn: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute(sql, params or ()) return cur.fetchone() def execute(sql, params=None): with get_conn() as conn: with conn.cursor() as cur: cur.execute(sql, params or ()) conn.commit() def execute_batch(sql, params_list, page_size=500): with get_conn() as conn: with conn.cursor() as cur: psycopg2.extras.execute_batch(cur, sql, params_list, page_size=page_size) conn.commit()