Spaces:
Running
Running
File size: 5,255 Bytes
d0fbfac 740a66e e6490d0 740a66e d0fbfac 740a66e d0fbfac bfc9e9b d0fbfac 740a66e d0fbfac 89037a2 d0fbfac 89037a2 740a66e e6490d0 740a66e e6490d0 740a66e e6490d0 740a66e e6490d0 740a66e e6490d0 740a66e e6490d0 740a66e e6490d0 740a66e e6490d0 89037a2 d0fbfac 740a66e 89037a2 740a66e d0fbfac 740a66e bfc9e9b 740a66e bfc9e9b d0fbfac 740a66e d0fbfac 8642c86 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
import pymysql
import sqlite3
import os
from dotenv import load_dotenv
from urllib.parse import urlparse, unquote
class Database:
def __init__(self):
load_dotenv()
# π FORCE SAFE PATH: We ignore .env settings for SQLite to prevent permission errors
# We use /tmp/ which is ALWAYS writable on Hugging Face/Linux
self.db_path = "/tmp/chatbot_v2.db"
self.type = "sqlite"
print(f" β‘ DATABASE PATH FORCED: {self.db_path}")
# Always rebuild the demo data to ensure it exists
self._init_sqlite_data()
def get_connection(self):
if self.type == "sqlite":
# Connect to the safe temp file
conn = sqlite3.connect(self.db_path, check_same_thread=False)
conn.row_factory = sqlite3.Row
return conn
else:
# Fallback for MySQL (if you ever switch back)
return pymysql.connect(
host=self.host, user=self.user, password=self.password,
database=self.db_name, port=self.port,
cursorclass=pymysql.cursors.DictCursor
)
def _init_sqlite_data(self):
"""Forces creation of a fresh database with your data."""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 1. Wipe clean to remove old errors
cursor.execute("DROP TABLE IF EXISTS sales")
cursor.execute("DROP TABLE IF EXISTS employees")
# 2. Create 'employees' (Lowercase, Plural)
cursor.execute("""
CREATE TABLE employees (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
department TEXT,
salary REAL,
hire_date TEXT
)
""")
# 3. Create 'sales'
cursor.execute("""
CREATE TABLE sales (
sale_id INTEGER PRIMARY KEY AUTOINCREMENT,
employee_id INTEGER,
amount REAL,
sale_date TEXT,
FOREIGN KEY (employee_id) REFERENCES employees(id)
)
""")
# 4. Insert Data
data_employees = [
('Alice', 'Sales', 70000, '2023-01-15'),
('Bob', 'Engineering', 90000, '2022-05-20'),
('Charlie', 'Marketing', 60000, '2023-03-10'),
('David', 'Engineering', 95000, '2021-11-05'),
('Eve', 'Sales', 72000, '2023-02-28')
]
cursor.executemany("INSERT INTO employees (name, department, salary, hire_date) VALUES (?, ?, ?, ?)", data_employees)
cursor.execute("INSERT INTO sales (employee_id, amount, sale_date) VALUES (1, 500.00, '2023-06-01')")
cursor.execute("INSERT INTO sales (employee_id, amount, sale_date) VALUES (1, 1200.50, '2023-06-03')")
conn.commit()
print(" β
Internal Database & Data Built Successfully!")
except Exception as e:
print(f" β FATAL DB ERROR: {e}")
finally:
if conn: conn.close()
def run_query(self, query):
conn = self.get_connection()
try:
cursor = conn.cursor()
cursor.execute(query)
# Handle different cursor types
if self.type == "sqlite":
return [dict(row) for row in cursor.fetchall()]
else:
return cursor.fetchall()
except Exception as e:
return [f"Error: {e}"]
finally:
conn.close()
def get_tables(self):
conn = self.get_connection()
try:
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = [row[0] for row in cursor.fetchall() if row[0] != 'sqlite_sequence']
print(f" π TABLES FOUND: {tables}")
return tables
except Exception as e:
print(f"Error fetching tables: {e}")
return []
finally:
conn.close()
def get_table_schema(self, table_name):
conn = self.get_connection()
columns = []
try:
cursor = conn.cursor()
cursor.execute(f"PRAGMA table_info({table_name})")
rows = cursor.fetchall()
for row in rows:
col_name = row['name'] if isinstance(row, sqlite3.Row) else row[1]
col_type = row['type'] if isinstance(row, sqlite3.Row) else row[2]
columns.append(f"{col_name} ({col_type})")
return columns
except Exception as e:
return []
finally:
conn.close()
def get_schema(self):
tables = self.get_tables()
schema_text = ""
for table in tables:
columns = self.get_table_schema(table)
schema_text += f"Table: {table}\nColumns:\n"
for col in columns:
schema_text += f" - {col}\n"
schema_text += "\n"
return schema_text |