| import duckdb |
| from pathlib import Path |
| from app.hf_store import HFStore |
| from app.metadata import metadata |
| from app.query_parser import query_parser |
| from typing import List, Dict, Any |
| import threading |
| import time |
|
|
| |
| _pool: Dict[str, dict] = {} |
| _pool_lock = threading.Lock() |
| _POOL_TTL = 300 |
| _MAX_POOL_SIZE = 20 |
| _MEMORY_LIMIT_MB = 256 |
|
|
|
|
| def _evict_idle_connections(): |
| """Remove connections idle longer than TTL, and LRU if over max size.""" |
| now = time.time() |
| with _pool_lock: |
| |
| stale = [k for k, v in _pool.items() if now - v["last_used"] > _POOL_TTL] |
| for k in stale: |
| try: |
| _pool[k]["conn"].close() |
| except Exception: |
| pass |
| del _pool[k] |
| |
| while len(_pool) > _MAX_POOL_SIZE: |
| oldest = min(_pool, key=lambda k: _pool[k]["last_used"]) |
| try: |
| _pool[oldest]["conn"].close() |
| except Exception: |
| pass |
| del _pool[oldest] |
|
|
| class QueryEngine: |
| def __init__(self, user_store: HFStore): |
| self.store = user_store |
| self._workspace_id = getattr(user_store, 'workspace_id', None) or 'default' |
| self._owned = False |
|
|
| _evict_idle_connections() |
|
|
| with _pool_lock: |
| if self._workspace_id not in _pool: |
| conn = duckdb.connect(':memory:') |
| conn.execute(f"SET memory_limit='{_MEMORY_LIMIT_MB}MB'") |
| conn.execute("SET max_temp_directory_size='1GB'") |
| conn.execute("SET threads=2") |
| _pool[self._workspace_id] = { |
| "conn": conn, |
| "registered": set(), |
| "last_used": time.time(), |
| } |
| self._owned = True |
| _pool[self._workspace_id]["last_used"] = time.time() |
|
|
| entry = _pool[self._workspace_id] |
| self.conn = entry["conn"] |
| self._registered_tables = entry["registered"] |
| |
| def _register_table(self, database: str, table: str): |
| """Register a Parquet file as a DuckDB table only if needed""" |
| |
| database = query_parser.sanitize_identifier(database) |
| table = query_parser.sanitize_identifier(table) |
| |
| key = f"{database}.{table}" |
| if key in self._registered_tables: |
| return |
| |
| |
| try: |
| self.conn.execute(f'CREATE SCHEMA IF NOT EXISTS "{database}"') |
| except Exception: |
| pass |
| |
| parquet_path = self.store.local("data", database, f"{table}.parquet") |
| if parquet_path.exists(): |
| |
| safe_path = str(parquet_path).replace("'", "''") |
| self.conn.execute(f""" |
| CREATE OR REPLACE VIEW "{database}"."{table}" AS |
| SELECT * FROM read_parquet('{safe_path}') |
| """) |
| self._registered_tables.add(key) |
| |
| def _auto_discover_tables(self): |
| """Auto-discover all Parquet files and register them""" |
| tables = metadata.list_tables(self.store) |
| |
| schemas = set() |
| for table_info in tables: |
| db = query_parser.sanitize_identifier(table_info['database']) |
| if db not in schemas: |
| try: |
| self.conn.execute(f'CREATE SCHEMA IF NOT EXISTS "{db}"') |
| schemas.add(db) |
| except Exception: |
| pass |
| |
| for table_info in tables: |
| try: |
| self._register_table(table_info['database'], table_info['table']) |
| except Exception: |
| pass |
| |
| def execute_sql(self, sql: str, params: List[Any] = None, auto_discover=True, allow_write=True, allow_schema_ops=False): |
| """Execute raw SQL query with validation and lazy table discovery""" |
| |
| is_valid, error = query_parser.validate_query(sql, allow_write=allow_write, allow_schema_ops=allow_schema_ops) |
| if not is_valid: |
| return { |
| 'ok': False, |
| 'error': f"Query validation failed: {error}" |
| } |
| |
| if auto_discover: |
| |
| tables_to_register = query_parser.extract_tables(sql) |
| for table_name in tables_to_register: |
| if '.' in table_name: |
| db, tbl = table_name.split('.', 1) |
| try: |
| |
| db_sanitized = query_parser.sanitize_identifier(db) |
| self.conn.execute(f'CREATE SCHEMA IF NOT EXISTS "{db_sanitized}"') |
| self._register_table(db, tbl) |
| except Exception as e: |
| print(f"Warning: Failed to register {db}.{tbl}: {e}") |
| else: |
| |
| pass |
| |
| try: |
| |
| if params: |
| result = self.conn.execute(sql, params).fetchdf() |
| else: |
| result = self.conn.execute(sql).fetchdf() |
| |
| return { |
| 'ok': True, |
| 'data': result.to_dict('records'), |
| 'columns': list(result.columns), |
| 'rows': len(result) |
| } |
| except Exception as e: |
| return { |
| 'ok': False, |
| 'error': str(e) |
| } |
| |
| def query_table(self, database: str, table: str, filters: Dict = None, limit: int = None, offset: int = None): |
| """Query a table with optional filters using safe parameterized queries""" |
| self._register_table(database, table) |
| |
| |
| sql = query_parser.build_safe_query(database, table, filters, limit, offset) |
| |
| |
| params = [] |
| if filters: |
| params = list(filters.values()) |
| |
| return self.execute_sql(sql, params=params, auto_discover=False) |
| |
| def insert_row(self, database: str, table: str, row: Dict): |
| """Insert row using DuckDB directly to Parquet""" |
| database = query_parser.sanitize_identifier(database) |
| table = query_parser.sanitize_identifier(table) |
| |
| parquet_path = self.store.local("data", database, f"{table}.parquet") |
| |
| |
| self.conn.execute(f'CREATE SCHEMA IF NOT EXISTS "{database}"') |
|
|
| |
| try: |
| self.conn.execute(f'DROP VIEW IF EXISTS "{database}"."{table}"') |
| except Exception: |
| pass |
| self._registered_tables.discard(f"{database}.{table}") |
|
|
| |
| if parquet_path.exists(): |
| safe_path = str(parquet_path).replace("'", "''") |
| self.conn.execute(f""" |
| CREATE OR REPLACE TABLE "{database}"."{table}" AS |
| SELECT * FROM read_parquet('{safe_path}') |
| """) |
| else: |
| return {'ok': False, 'error': 'Table does not exist. Create it first.'} |
| |
| |
| columns = list(row.keys()) |
| safe_columns = [query_parser.sanitize_identifier(c) for c in columns] |
| placeholders = ', '.join(['?' for _ in columns]) |
| column_list = ', '.join([f'"{c}"' for c in safe_columns]) |
| |
| sql = f'INSERT INTO "{database}"."{table}" ({column_list}) VALUES ({placeholders})' |
| |
| try: |
| self.conn.execute(sql, list(row.values())) |
| |
| |
| safe_path = str(parquet_path).replace("'", "''") |
| self.conn.execute(f""" |
| COPY (SELECT * FROM "{database}"."{table}") |
| TO '{safe_path}' (FORMAT PARQUET) |
| """) |
| |
| return {'ok': True} |
| except Exception as e: |
| return {'ok': False, 'error': str(e)} |
| |
| def get_table_stats(self, database: str, table: str): |
| """Get statistics about a table""" |
| self._register_table(database, table) |
| |
| db = query_parser.sanitize_identifier(database) |
| tbl = query_parser.sanitize_identifier(table) |
| |
| stats_sql = f""" |
| SELECT |
| COUNT(*) as row_count |
| FROM "{db}"."{tbl}" |
| """ |
| |
| return self.execute_sql(stats_sql, auto_discover=False) |
| |
| def cross_database_query(self, sql: str, params: List[Any] = None): |
| """Execute queries across multiple databases with validation""" |
| self._auto_discover_tables() |
| return self.execute_sql(sql, params=params, auto_discover=False) |
| |
| def close(self): |
| """Close connection and remove from pool.""" |
| with _pool_lock: |
| if self._workspace_id in _pool: |
| del _pool[self._workspace_id] |
| self.conn.close() |
|
|
| def create_query_engine(user_store: HFStore) -> QueryEngine: |
| """Factory function to create a query engine for a user""" |
| return QueryEngine(user_store) |
|
|