SQL-Assignment / sql_tab.py
mertyazan's picture
Upload folder using huggingface_hub
426f5ad verified
raw
history blame
3.75 kB
import os
import re
import time
import pandas as pd
import numpy as np
import psycopg2.extras as extras
from psycopg2.pool import SimpleConnectionPool
DB_NAME = os.getenv("PGDATABASE", "mert")
DB_USER = os.getenv("PGUSER", "mert")
DB_PASS = os.getenv("POSTGRES_PASSWORD")
DB_HOST = os.getenv("PGHOST", "127.0.0.1")
DB_PORT = int(os.getenv("PGPORT", "5432"))
POOL_MAX = int(os.getenv("PG_POOL_MAX", "10"))
_pool: SimpleConnectionPool | None = None
def _get_pool():
global _pool
if _pool is None:
_pool = SimpleConnectionPool(
minconn=1, maxconn=POOL_MAX,
database=DB_NAME, user=DB_USER, password=DB_PASS,
host=DB_HOST, port=DB_PORT
)
return _pool
def _borrow_conn():
pool = _get_pool()
conn = pool.getconn()
conn.autocommit = True
try:
with conn.cursor() as cur:
cur.execute("SET statement_timeout = 10000;") # 10s
except Exception:
pass
return conn
def _return_conn(conn):
try:
_get_pool().putconn(conn)
except Exception:
try: conn.close()
except Exception: pass
WRITE_FIRST_KEYWORDS = {
"INSERT","UPDATE","DELETE","DROP","ALTER","CREATE","TRUNCATE",
"VACUUM","REINDEX","GRANT","REVOKE","MERGE","CALL","DO",
"ATTACH","DETACH"
}
def is_write_query(sql: str) -> bool:
"""
Returns True if the first *statement* is a write. Ignores function names like REPLACE().
Handles WITH ... (SELECT ...) vs WITH ... INSERT/UPDATE/DELETE/MERGE ...
"""
first_stmt = re.split(r";\s*", sql.strip(), maxsplit=1)[0]
# If it starts with WITH, decide based on the main statement following the CTEs
if re.match(r"^\s*WITH\b", first_stmt, flags=re.IGNORECASE):
# Heuristic: if an INSERT/UPDATE/DELETE/MERGE appears after the CTE block, treat as write
return bool(re.search(r"\)\s*(INSERT|UPDATE|DELETE|MERGE)\b", first_stmt, flags=re.IGNORECASE))
# Otherwise, just check the very first keyword
m = re.match(r"^\s*([A-Za-z]+)", first_stmt)
first_kw = m.group(1).upper() if m else ""
return first_kw in WRITE_FIRST_KEYWORDS
def enforce_limit(sql: str, limit: int) -> str:
"""
Adds LIMIT if:
- query starts with SELECT or WITH
- and no existing LIMIT present (naive but practical)
"""
first = sql.strip().strip(";")
if re.match(r"^(SELECT|WITH)\b", first, flags=re.IGNORECASE) and not re.search(r"\bLIMIT\b", first, flags=re.IGNORECASE):
return f"{first} LIMIT {int(limit)}"
return first
def run_sql(query: str, max_rows: int, allow_writes: bool):
if not query or not query.strip():
return pd.DataFrame(), "Provide a SQL query.", 0.0
if ";" in query.strip().rstrip(";"):
return pd.DataFrame(), "Multiple statements detected; please run one at a time.", 0.0
if not allow_writes and is_write_query(query):
return pd.DataFrame(), "Write operations are disabled. Enable the toggle to allow writes.", 0.0
sql_to_run = enforce_limit(query, max_rows)
started = time.perf_counter()
conn = None
try:
conn = _borrow_conn()
with conn.cursor(cursor_factory=extras.RealDictCursor) as cur:
cur.execute("SET LOCAL statement_timeout = 10000;")
cur.execute(sql_to_run)
rows = cur.fetchall() if cur.description else []
df = pd.DataFrame(rows)
except Exception as e:
return pd.DataFrame(), f"Error: {e}", 0.0
finally:
if conn: _return_conn(conn)
elapsed = time.perf_counter() - started
meta = f"Rows: {len(df)} | Time: {elapsed:.3f}s"
df.replace([np.inf, -np.inf], pd.NA, inplace=True)
df = df.where(pd.notnull(df), None)
return df, meta, elapsed