PlainSQL-Agent / src /db_connector.py
LalitChaudhari3's picture
Update src/db_connector.py
740a66e verified
import pymysql
import sqlite3
import os
from dotenv import load_dotenv
from urllib.parse import urlparse, unquote
class Database:
def __init__(self):
load_dotenv()
# πŸ”’ FORCE SAFE PATH: We ignore .env settings for SQLite to prevent permission errors
# We use /tmp/ which is ALWAYS writable on Hugging Face/Linux
self.db_path = "/tmp/chatbot_v2.db"
self.type = "sqlite"
print(f" ⚑ DATABASE PATH FORCED: {self.db_path}")
# Always rebuild the demo data to ensure it exists
self._init_sqlite_data()
def get_connection(self):
if self.type == "sqlite":
# Connect to the safe temp file
conn = sqlite3.connect(self.db_path, check_same_thread=False)
conn.row_factory = sqlite3.Row
return conn
else:
# Fallback for MySQL (if you ever switch back)
return pymysql.connect(
host=self.host, user=self.user, password=self.password,
database=self.db_name, port=self.port,
cursorclass=pymysql.cursors.DictCursor
)
def _init_sqlite_data(self):
"""Forces creation of a fresh database with your data."""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 1. Wipe clean to remove old errors
cursor.execute("DROP TABLE IF EXISTS sales")
cursor.execute("DROP TABLE IF EXISTS employees")
# 2. Create 'employees' (Lowercase, Plural)
cursor.execute("""
CREATE TABLE employees (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
department TEXT,
salary REAL,
hire_date TEXT
)
""")
# 3. Create 'sales'
cursor.execute("""
CREATE TABLE sales (
sale_id INTEGER PRIMARY KEY AUTOINCREMENT,
employee_id INTEGER,
amount REAL,
sale_date TEXT,
FOREIGN KEY (employee_id) REFERENCES employees(id)
)
""")
# 4. Insert Data
data_employees = [
('Alice', 'Sales', 70000, '2023-01-15'),
('Bob', 'Engineering', 90000, '2022-05-20'),
('Charlie', 'Marketing', 60000, '2023-03-10'),
('David', 'Engineering', 95000, '2021-11-05'),
('Eve', 'Sales', 72000, '2023-02-28')
]
cursor.executemany("INSERT INTO employees (name, department, salary, hire_date) VALUES (?, ?, ?, ?)", data_employees)
cursor.execute("INSERT INTO sales (employee_id, amount, sale_date) VALUES (1, 500.00, '2023-06-01')")
cursor.execute("INSERT INTO sales (employee_id, amount, sale_date) VALUES (1, 1200.50, '2023-06-03')")
conn.commit()
print(" βœ… Internal Database & Data Built Successfully!")
except Exception as e:
print(f" ❌ FATAL DB ERROR: {e}")
finally:
if conn: conn.close()
def run_query(self, query):
conn = self.get_connection()
try:
cursor = conn.cursor()
cursor.execute(query)
# Handle different cursor types
if self.type == "sqlite":
return [dict(row) for row in cursor.fetchall()]
else:
return cursor.fetchall()
except Exception as e:
return [f"Error: {e}"]
finally:
conn.close()
def get_tables(self):
conn = self.get_connection()
try:
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = [row[0] for row in cursor.fetchall() if row[0] != 'sqlite_sequence']
print(f" πŸ” TABLES FOUND: {tables}")
return tables
except Exception as e:
print(f"Error fetching tables: {e}")
return []
finally:
conn.close()
def get_table_schema(self, table_name):
conn = self.get_connection()
columns = []
try:
cursor = conn.cursor()
cursor.execute(f"PRAGMA table_info({table_name})")
rows = cursor.fetchall()
for row in rows:
col_name = row['name'] if isinstance(row, sqlite3.Row) else row[1]
col_type = row['type'] if isinstance(row, sqlite3.Row) else row[2]
columns.append(f"{col_name} ({col_type})")
return columns
except Exception as e:
return []
finally:
conn.close()
def get_schema(self):
tables = self.get_tables()
schema_text = ""
for table in tables:
columns = self.get_table_schema(table)
schema_text += f"Table: {table}\nColumns:\n"
for col in columns:
schema_text += f" - {col}\n"
schema_text += "\n"
return schema_text