Spaces:
Running
Running
| import pymysql | |
| import sqlite3 | |
| import os | |
| from dotenv import load_dotenv | |
| from urllib.parse import urlparse, unquote | |
| class Database: | |
| def __init__(self): | |
| load_dotenv() | |
| # π FORCE SAFE PATH: We ignore .env settings for SQLite to prevent permission errors | |
| # We use /tmp/ which is ALWAYS writable on Hugging Face/Linux | |
| self.db_path = "/tmp/chatbot_v2.db" | |
| self.type = "sqlite" | |
| print(f" β‘ DATABASE PATH FORCED: {self.db_path}") | |
| # Always rebuild the demo data to ensure it exists | |
| self._init_sqlite_data() | |
| def get_connection(self): | |
| if self.type == "sqlite": | |
| # Connect to the safe temp file | |
| conn = sqlite3.connect(self.db_path, check_same_thread=False) | |
| conn.row_factory = sqlite3.Row | |
| return conn | |
| else: | |
| # Fallback for MySQL (if you ever switch back) | |
| return pymysql.connect( | |
| host=self.host, user=self.user, password=self.password, | |
| database=self.db_name, port=self.port, | |
| cursorclass=pymysql.cursors.DictCursor | |
| ) | |
| def _init_sqlite_data(self): | |
| """Forces creation of a fresh database with your data.""" | |
| try: | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| # 1. Wipe clean to remove old errors | |
| cursor.execute("DROP TABLE IF EXISTS sales") | |
| cursor.execute("DROP TABLE IF EXISTS employees") | |
| # 2. Create 'employees' (Lowercase, Plural) | |
| cursor.execute(""" | |
| CREATE TABLE employees ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| name TEXT, | |
| department TEXT, | |
| salary REAL, | |
| hire_date TEXT | |
| ) | |
| """) | |
| # 3. Create 'sales' | |
| cursor.execute(""" | |
| CREATE TABLE sales ( | |
| sale_id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| employee_id INTEGER, | |
| amount REAL, | |
| sale_date TEXT, | |
| FOREIGN KEY (employee_id) REFERENCES employees(id) | |
| ) | |
| """) | |
| # 4. Insert Data | |
| data_employees = [ | |
| ('Alice', 'Sales', 70000, '2023-01-15'), | |
| ('Bob', 'Engineering', 90000, '2022-05-20'), | |
| ('Charlie', 'Marketing', 60000, '2023-03-10'), | |
| ('David', 'Engineering', 95000, '2021-11-05'), | |
| ('Eve', 'Sales', 72000, '2023-02-28') | |
| ] | |
| cursor.executemany("INSERT INTO employees (name, department, salary, hire_date) VALUES (?, ?, ?, ?)", data_employees) | |
| cursor.execute("INSERT INTO sales (employee_id, amount, sale_date) VALUES (1, 500.00, '2023-06-01')") | |
| cursor.execute("INSERT INTO sales (employee_id, amount, sale_date) VALUES (1, 1200.50, '2023-06-03')") | |
| conn.commit() | |
| print(" β Internal Database & Data Built Successfully!") | |
| except Exception as e: | |
| print(f" β FATAL DB ERROR: {e}") | |
| finally: | |
| if conn: conn.close() | |
| def run_query(self, query): | |
| conn = self.get_connection() | |
| try: | |
| cursor = conn.cursor() | |
| cursor.execute(query) | |
| # Handle different cursor types | |
| if self.type == "sqlite": | |
| return [dict(row) for row in cursor.fetchall()] | |
| else: | |
| return cursor.fetchall() | |
| except Exception as e: | |
| return [f"Error: {e}"] | |
| finally: | |
| conn.close() | |
| def get_tables(self): | |
| conn = self.get_connection() | |
| try: | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT name FROM sqlite_master WHERE type='table';") | |
| tables = [row[0] for row in cursor.fetchall() if row[0] != 'sqlite_sequence'] | |
| print(f" π TABLES FOUND: {tables}") | |
| return tables | |
| except Exception as e: | |
| print(f"Error fetching tables: {e}") | |
| return [] | |
| finally: | |
| conn.close() | |
| def get_table_schema(self, table_name): | |
| conn = self.get_connection() | |
| columns = [] | |
| try: | |
| cursor = conn.cursor() | |
| cursor.execute(f"PRAGMA table_info({table_name})") | |
| rows = cursor.fetchall() | |
| for row in rows: | |
| col_name = row['name'] if isinstance(row, sqlite3.Row) else row[1] | |
| col_type = row['type'] if isinstance(row, sqlite3.Row) else row[2] | |
| columns.append(f"{col_name} ({col_type})") | |
| return columns | |
| except Exception as e: | |
| return [] | |
| finally: | |
| conn.close() | |
| def get_schema(self): | |
| tables = self.get_tables() | |
| schema_text = "" | |
| for table in tables: | |
| columns = self.get_table_schema(table) | |
| schema_text += f"Table: {table}\nColumns:\n" | |
| for col in columns: | |
| schema_text += f" - {col}\n" | |
| schema_text += "\n" | |
| return schema_text |