Spaces:
Running
Running
| """ | |
| SQLite in-memory database management. | |
| Creates fresh DB instances per episode with deterministic seed data. | |
| """ | |
| import sqlite3 | |
| import time | |
| from typing import Dict, Any, List | |
| class EpisodeDatabase: | |
| """ | |
| Manages a single SQLite in-memory database for one episode. | |
| Seeded with deterministic data per task. | |
| """ | |
| def __init__(self, task_id: str, schema_sql: str, seed_data_sql: str): | |
| self.task_id = task_id | |
| self.conn = sqlite3.connect(":memory:", check_same_thread=False) | |
| self.conn.row_factory = sqlite3.Row | |
| self.conn.execute("PRAGMA foreign_keys = ON") | |
| self._setup(schema_sql, seed_data_sql) | |
| def _setup(self, schema_sql: str, seed_data_sql: str): | |
| """Create schema and insert seed data.""" | |
| cursor = self.conn.cursor() | |
| for statement in schema_sql.strip().split(";"): | |
| stmt = statement.strip() | |
| if stmt: | |
| cursor.execute(stmt) | |
| for statement in seed_data_sql.strip().split(";"): | |
| stmt = statement.strip() | |
| if stmt: | |
| cursor.execute(stmt) | |
| self.conn.commit() | |
| def execute_query(self, query: str) -> Dict[str, Any]: | |
| """ | |
| Execute a read-only SQL query safely. | |
| Returns rows or error. Enforces SELECT-only. | |
| Execution timeout: 5 seconds. | |
| """ | |
| query_stripped = query.strip().upper() | |
| # Block dangerous operations | |
| blocked = ["DROP", "DELETE", "UPDATE", "INSERT", "CREATE", "ALTER", | |
| "TRUNCATE", "REPLACE", "ATTACH", "DETACH"] | |
| for kw in blocked: | |
| if query_stripped.startswith(kw) or f" {kw} " in query_stripped: | |
| return { | |
| "success": False, | |
| "rows": None, | |
| "row_count": None, | |
| "error_message": f"BLOCKED: Only SELECT queries are allowed. '{kw}' is not permitted.", | |
| "execution_time_ms": 0.0 | |
| } | |
| start = time.time() | |
| try: | |
| cursor = self.conn.cursor() | |
| cursor.execute(query) | |
| rows = cursor.fetchall() | |
| elapsed = (time.time() - start) * 1000 | |
| # Convert Row objects to dicts | |
| result_rows = [dict(row) for row in rows] | |
| return { | |
| "success": True, | |
| "rows": result_rows, | |
| "row_count": len(result_rows), | |
| "error_message": None, | |
| "execution_time_ms": round(elapsed, 2) | |
| } | |
| except sqlite3.Error as e: | |
| elapsed = (time.time() - start) * 1000 | |
| return { | |
| "success": False, | |
| "rows": None, | |
| "row_count": None, | |
| "error_message": str(e), | |
| "execution_time_ms": round(elapsed, 2) | |
| } | |
| def get_schema(self) -> Dict[str, List[Dict[str, str]]]: | |
| """Return schema info: tables and their columns.""" | |
| schema = {} | |
| cursor = self.conn.cursor() | |
| cursor.execute("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name") | |
| tables = [row[0] for row in cursor.fetchall()] | |
| for table in tables: | |
| cursor.execute(f"PRAGMA table_info({table})") | |
| columns = [] | |
| for col in cursor.fetchall(): | |
| columns.append({ | |
| "name": col[1], | |
| "type": col[2], | |
| "nullable": "YES" if col[3] == 0 else "NO", | |
| "primary_key": "YES" if col[5] > 0 else "NO" | |
| }) | |
| schema[table] = columns | |
| return schema | |
| def get_sample_rows(self, table_name: str, limit: int = 3) -> List[Dict[str, Any]]: | |
| """Get sample rows from a table.""" | |
| result = self.execute_query(f"SELECT * FROM {table_name} LIMIT {limit}") | |
| return result.get("rows", []) or [] | |
| def close(self): | |
| self.conn.close() | |