Spaces:
Runtime error
Runtime error
| import os | |
| import re | |
| import time | |
| import pandas as pd | |
| import numpy as np | |
| import psycopg2.extras as extras | |
| from psycopg2.pool import SimpleConnectionPool | |
| DB_NAME = os.getenv("PGDATABASE", "mert") | |
| DB_USER = os.getenv("PGUSER", "mert") | |
| DB_PASS = os.getenv("POSTGRES_PASSWORD") | |
| DB_HOST = os.getenv("PGHOST", "127.0.0.1") | |
| DB_PORT = int(os.getenv("PGPORT", "5432")) | |
| POOL_MAX = int(os.getenv("PG_POOL_MAX", "10")) | |
| _pool: SimpleConnectionPool | None = None | |
| def _get_pool(): | |
| global _pool | |
| if _pool is None: | |
| _pool = SimpleConnectionPool( | |
| minconn=1, maxconn=POOL_MAX, | |
| database=DB_NAME, user=DB_USER, password=DB_PASS, | |
| host=DB_HOST, port=DB_PORT | |
| ) | |
| return _pool | |
| def _borrow_conn(): | |
| pool = _get_pool() | |
| conn = pool.getconn() | |
| conn.autocommit = True | |
| try: | |
| with conn.cursor() as cur: | |
| cur.execute("SET statement_timeout = 10000;") # 10s | |
| except Exception: | |
| pass | |
| return conn | |
| def _return_conn(conn): | |
| try: | |
| _get_pool().putconn(conn) | |
| except Exception: | |
| try: conn.close() | |
| except Exception: pass | |
| WRITE_FIRST_KEYWORDS = { | |
| "INSERT","UPDATE","DELETE","DROP","ALTER","CREATE","TRUNCATE", | |
| "VACUUM","REINDEX","GRANT","REVOKE","MERGE","CALL","DO", | |
| "ATTACH","DETACH" | |
| } | |
| def is_write_query(sql: str) -> bool: | |
| """ | |
| Returns True if the first *statement* is a write. Ignores function names like REPLACE(). | |
| Handles WITH ... (SELECT ...) vs WITH ... INSERT/UPDATE/DELETE/MERGE ... | |
| """ | |
| first_stmt = re.split(r";\s*", sql.strip(), maxsplit=1)[0] | |
| # If it starts with WITH, decide based on the main statement following the CTEs | |
| if re.match(r"^\s*WITH\b", first_stmt, flags=re.IGNORECASE): | |
| # Heuristic: if an INSERT/UPDATE/DELETE/MERGE appears after the CTE block, treat as write | |
| return bool(re.search(r"\)\s*(INSERT|UPDATE|DELETE|MERGE)\b", first_stmt, flags=re.IGNORECASE)) | |
| # Otherwise, just check the very first keyword | |
| m = re.match(r"^\s*([A-Za-z]+)", first_stmt) | |
| first_kw = m.group(1).upper() if m else "" | |
| return first_kw in WRITE_FIRST_KEYWORDS | |
| def enforce_limit(sql: str, limit: int) -> str: | |
| """ | |
| Adds LIMIT if: | |
| - query starts with SELECT or WITH | |
| - and no existing LIMIT present (naive but practical) | |
| """ | |
| first = sql.strip().strip(";") | |
| if re.match(r"^(SELECT|WITH)\b", first, flags=re.IGNORECASE) and not re.search(r"\bLIMIT\b", first, flags=re.IGNORECASE): | |
| return f"{first} LIMIT {int(limit)}" | |
| return first | |
| def run_sql(query: str, max_rows: int, allow_writes: bool): | |
| if not query or not query.strip(): | |
| return pd.DataFrame(), "Provide a SQL query.", 0.0 | |
| if ";" in query.strip().rstrip(";"): | |
| return pd.DataFrame(), "Multiple statements detected; please run one at a time.", 0.0 | |
| if not allow_writes and is_write_query(query): | |
| return pd.DataFrame(), "Write operations are disabled. Enable the toggle to allow writes.", 0.0 | |
| sql_to_run = enforce_limit(query, max_rows) | |
| started = time.perf_counter() | |
| conn = None | |
| try: | |
| conn = _borrow_conn() | |
| with conn.cursor(cursor_factory=extras.RealDictCursor) as cur: | |
| cur.execute("SET LOCAL statement_timeout = 10000;") | |
| cur.execute(sql_to_run) | |
| rows = cur.fetchall() if cur.description else [] | |
| df = pd.DataFrame(rows) | |
| except Exception as e: | |
| return pd.DataFrame(), f"Error: {e}", 0.0 | |
| finally: | |
| if conn: _return_conn(conn) | |
| elapsed = time.perf_counter() - started | |
| meta = f"Rows: {len(df)} | Time: {elapsed:.3f}s" | |
| df.replace([np.inf, -np.inf], pd.NA, inplace=True) | |
| df = df.where(pd.notnull(df), None) | |
| return df, meta, elapsed |