| """Database connection helper for the ML pipeline.""" | |
| import psycopg2 | |
| import psycopg2.extras | |
| from config import DB | |
| def get_conn(): | |
| """Return a new psycopg2 connection.""" | |
| return psycopg2.connect( | |
| host=DB["host"], | |
| port=DB["port"], | |
| dbname=DB["dbname"], | |
| user=DB["user"], | |
| password=DB["password"], | |
| ) | |
| def fetch_all(sql, params=None): | |
| with get_conn() as conn: | |
| with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: | |
| cur.execute(sql, params or ()) | |
| return cur.fetchall() | |
| def fetch_one(sql, params=None): | |
| with get_conn() as conn: | |
| with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: | |
| cur.execute(sql, params or ()) | |
| return cur.fetchone() | |
| def execute(sql, params=None): | |
| with get_conn() as conn: | |
| with conn.cursor() as cur: | |
| cur.execute(sql, params or ()) | |
| conn.commit() | |
| def execute_batch(sql, params_list, page_size=500): | |
| with get_conn() as conn: | |
| with conn.cursor() as cur: | |
| psycopg2.extras.execute_batch(cur, sql, params_list, page_size=page_size) | |
| conn.commit() | |