MiSQL / app.py
Al1Abdullah's picture
Update app.py
f25ba03 verified
raw
history blame
14.9 kB
from flask import Flask, request, render_template, jsonify, session
import mysql.connector
from mysql.connector import Error
import os
from groq import Groq
from dotenv import load_dotenv
import re
import uuid
app = Flask(__name__)
app.secret_key = os.urandom(24) # Required for session management
load_dotenv()
# Default database configuration from .env (no localhost default)
default_db_config = {
'host': os.getenv('DB_HOST', ''),
'user': os.getenv('DB_USER', 'root'),
'password': os.getenv('DB_PASSWORD', ''),
'port': int(os.getenv('DB_PORT', 3306))
}
# Groq API configuration with error handling
try:
groq_client = Groq(api_key=os.getenv('GROQ_API_KEY'))
except Exception as e:
groq_client = None
print(f"Failed to initialize Groq client: {str(e)}")
# Temporary storage for current database name and schema
current_db_name = None
current_schema = {}
current_summary = {}
def get_db_connection(db_name=None):
"""Establish a database connection using session or default config."""
config = session.get('db_config', default_db_config).copy()
if not config['host'] or not config['user']:
return None, "Please configure a valid MySQL connection using the 'Configure MySQL Connection' modal."
if db_name:
config['database'] = db_name
try:
conn = mysql.connector.connect(**config)
return conn, None
except Error as e:
return None, f"Database connection failed: {str(e)}. Ensure the MySQL server is running and accessible, and check your credentials."
def parse_sql_file(file_content):
"""Parse SQL file to extract database name and clean statements."""
file_content = file_content.decode('utf-8') if isinstance(file_content, bytes) else file_content
statements = []
current_statement = ""
in_comment = False
# Extract database name
db_name_match = re.search(r"CREATE\s+DATABASE\s+[`']?(\w+)[`']?", file_content, re.IGNORECASE)
db_name = db_name_match.group(1) if db_name_match else f"temp_db_{uuid.uuid4().hex[:8]}"
# Split SQL into statements
for line in file_content.splitlines():
line = line.strip()
if not line or line.startswith('--'):
continue
if line.startswith('/*'):
in_comment = True
continue
if line.endswith('*/'):
in_comment = False
continue
if not in_comment:
current_statement += line + ' '
if line.endswith(';'):
statements.append(current_statement.strip())
current_statement = ""
return db_name, statements
def generate_schema_summary(schema, db_name):
"""Generate a dynamic summary of any MySQL database schema."""
summary = {
'description': '',
'main_tables': {},
'relationships': [],
'suggestions': {
'evaluation': 'Good',
'note': '',
'recommendations': []
}
}
# Infer description based on table names
table_names = list(schema.keys())
if any(table in table_names for table in ['patient', 'doctor', 'admission', 'appointment']):
summary['description'] = f"{db_name} appears to be a Hospital Management Database for tracking entities like patients, staff, and appointments."
elif any(table in table_names for table in ['customer', 'order', 'product', 'employee']):
summary['description'] = f"{db_name} appears to be a Retail or E-commerce Database for managing customers, orders, and products."
elif any(table in table_names for table in ['book', 'author', 'loan', 'member']):
summary['description'] = f"{db_name} appears to be a Library Management Database for tracking books, authors, and loans."
else:
summary['description'] = f"{db_name} is a database with {len(table_names)} tables for managing various entities."
# Select main tables (up to 5, prioritized by column count or presence of 'id')
sorted_tables = sorted(schema.items(), key=lambda x: len(x[1]), reverse=True)[:5]
for table, columns in sorted_tables:
key_columns = [col for col in columns if 'id' in col.lower() or col in ['name', 'first_name', 'last_name', 'title', 'amount', 'status', 'price']]
summary['main_tables'][table] = key_columns[:3] # Limit to 3 key columns
# Connect to database to detect relationships and suggestions
conn, error = get_db_connection(db_name)
if conn:
cursor = conn.cursor()
try:
# Detect foreign keys using INFORMATION_SCHEMA
cursor.execute("""
SELECT TABLE_NAME, COLUMN_NAME, REFERENCED_TABLE_NAME, REFERENCED_COLUMN_NAME
FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE
WHERE TABLE_SCHEMA = %s AND REFERENCED_TABLE_NAME IS NOT NULL
""", (db_name,))
relationships = cursor.fetchall()
for rel in relationships[:5]: # Limit to 5 relationships
summary['relationships'].append(f"{rel[0]} links to {rel[2]} via {rel[1]}")
# Fallback: Infer relationships from common column names
if not relationships:
for table1, columns1 in schema.items():
for col1 in columns1:
if '_id' in col1 and col1 != f"{table1}_id":
target_table = col1.replace('_id', '')
if target_table in schema:
summary['relationships'].append(f"{table1} likely links to {target_table} via {col1}")
# Check for indexes and constraints
cursor.execute("""
SELECT TABLE_NAME, NON_UNIQUE, INDEX_NAME
FROM INFORMATION_SCHEMA.STATISTICS
WHERE TABLE_SCHEMA = %s AND INDEX_NAME != 'PRIMARY'
""", (db_name,))
indexes = cursor.fetchall()
indexed_columns = set(row[0] + '.' + row[2] for row in indexes if row[1] == 0)
# Evaluate schema
has_foreign_keys = bool(relationships)
has_indexes = bool(indexes)
if has_foreign_keys and has_indexes:
summary['suggestions']['evaluation'] = 'Excellent'
summary['suggestions']['note'] = 'The schema is well-structured with defined foreign key constraints and indexes, supporting efficient queries.'
elif has_foreign_keys:
summary['suggestions']['evaluation'] = 'Good'
summary['suggestions']['note'] = 'The schema has clear foreign key relationships but may lack sufficient indexes.'
else:
summary['suggestions']['evaluation'] = 'Needs Improvement'
summary['suggestions']['note'] = 'The schema lacks explicit foreign key constraints, which may affect query reliability.'
# Recommendations
if not has_foreign_keys:
summary['suggestions']['recommendations'].append('Add explicit foreign key constraints to ensure data integrity.')
if not has_indexes:
summary['suggestions']['recommendations'].append('Add indexes on frequently queried columns (e.g., foreign keys, date fields) to improve performance.')
summary['suggestions']['recommendations'].append('Verify that date and numeric fields use appropriate data types for efficient querying.')
cursor.close()
conn.close()
except Error as e:
summary['suggestions']['note'] = f'Analysis limited due to: {str(e)}'
if not summary['relationships']:
summary['relationships'] = ['Unable to detect relationships due to limited metadata access.']
return summary
def load_sql_file(file):
"""Load SQL file into MySQL database and generate schema summary."""
global current_db_name, current_schema, current_summary
try:
file_content = file.read()
db_name, statements = parse_sql_file(file_content)
# Connect without specifying a database
conn, error = get_db_connection()
if error:
return False, error, None
cursor = conn.cursor()
# Drop existing database if it exists
cursor.execute(f"DROP DATABASE IF EXISTS `{db_name}`")
cursor.execute(f"CREATE DATABASE `{db_name}`")
conn.commit()
cursor.close()
conn.close()
# Connect to the new database
conn, error = get_db_connection(db_name)
if error:
return False, error, None
cursor = conn.cursor()
# Execute SQL statements
for statement in statements:
cursor.execute(statement)
conn.commit()
# Extract schema
cursor.execute("SHOW TABLES")
tables = [row[0] for row in cursor.fetchall()]
schema = {}
for table in tables:
cursor.execute(f"SHOW COLUMNS FROM `{table}`")
columns = [row[0] for row in cursor.fetchall()]
schema[table] = columns
# Generate summary
summary = generate_schema_summary(schema, db_name)
current_db_name = db_name
current_schema = schema
current_summary = summary
cursor.close()
conn.close()
return True, schema, summary
except Error as e:
return False, f"Failed to load SQL file: {str(e)}", None
def generate_sql_query(question, schema):
"""Generate SQL query using Groq API with user-friendly aliases."""
if not groq_client:
return "ERROR: Groq client not initialized. Check API key and try again."
schema_text = "\n".join([f"Table: {table}\nColumns: {', '.join(columns)}" for table, columns in schema.items()])
prompt = f"""
You are a SQL expert. Based on the following database schema, generate a valid MySQL query for the user's question. Only use tables and columns that exist in the schema. Use user-friendly aliases for column names (e.g., 'cust_id' becomes 'Customer ID', 'admission_date' becomes 'Admission Date'). Return ONLY the SQL query, without explanations, markdown, or code block formatting (e.g., no ```). If the question references non-existent tables or columns, return an error message starting with 'ERROR:'. Do not use GROUP BY or aggregation functions (e.g., SUM, COUNT, AVG) unless the question explicitly requests aggregation (e.g., 'sum of all bills', 'average cost', 'count of patients'). Treat 'total bill amount' as the individual bill amount (e.g., bill.amount) unless aggregation is clearly specified. For names, concatenate first_name and last_name if applicable (e.g., CONCAT(first_name, ' ', last_name) AS 'Full Name'). Use direct JOINs with correct foreign key relationships. Avoid subqueries unless absolutely necessary. Place filtering conditions (e.g., department name, status) in the WHERE clause, not JOIN clauses. Handle case sensitivity in string comparisons by using LOWER() for status fields (e.g., LOWER(status) = 'unpaid'). Verify table relationships before joining.
Schema:
{schema_text}
User Question: {question}
"""
try:
response = groq_client.chat.completions.create(
messages=[{"role": "user", "content": prompt}],
model="llama3-70b-8192"
)
query = response.choices[0].message.content.strip()
query = re.sub(r'```(?:sql)?\n?', '', query) # Remove any markdown
query = query.strip()
return query
except Exception as e:
return f"ERROR: Failed to generate SQL query: {str(e)}"
def execute_sql_query(query):
"""Execute SQL query on the current database."""
if not current_db_name:
return False, "No database loaded. Please upload an SQL file.", None
conn, error = get_db_connection(current_db_name)
if error:
return False, error, None
try:
cursor = conn.cursor(dictionary=True)
cursor.execute(query)
results = cursor.fetchall()
conn.commit()
cursor.close()
conn.close()
return True, results, None
except Error as e:
return False, f"SQL execution failed: {str(e)}", None
@app.route('/', methods=['GET', 'POST'])
def index():
error = None
schema = current_schema
summary = current_summary
results = None
generated_query = None
if not groq_client:
error = "Groq client not initialized. Please check GROQ_API_KEY and restart the app."
if request.method == 'POST':
if 'sql_file' in request.files:
file = request.files['sql_file']
if file and file.filename.endswith('.sql'):
success, result, summary = load_sql_file(file)
if success:
schema = result
else:
error = result
else:
error = "Please upload a valid .sql file."
elif 'question' in request.form:
question = request.form['question']
if not current_db_name or not current_schema:
error = "No database loaded. Please upload an SQL file first."
else:
generated_query = generate_sql_query(question, current_schema)
if not generated_query.startswith('ERROR:'):
success, result, _ = execute_sql_query(generated_query)
if success:
results = result
else:
error = result
else:
error = generated_query
return render_template('index.html', error=error, schema=schema, summary=summary, results=results, query=generated_query)
@app.route('/configure_db', methods=['POST'])
def configure_db():
"""Handle MySQL connection configuration."""
host = request.form.get('host', '').strip()
user = request.form.get('user', '').strip()
password = request.form.get('password', '')
port = request.form.get('port', '3306').strip()
if not host or not user:
return render_template('index.html', error="Host and user are required.", schema=current_schema, summary=current_summary)
try:
port = int(port)
except ValueError:
return render_template('index.html', error="Port must be a valid number.", schema=current_schema, summary=current_summary)
# Test connection
test_config = {'host': host, 'user': user, 'password': password, 'port': port}
conn, error = get_db_connection()
if error:
return render_template('index.html', error=error, schema=current_schema, summary=current_summary)
# Store in session
session['db_config'] = test_config
conn.close()
return render_template('index.html', error=None, schema=current_schema, summary=current_summary, success="MySQL connection configured successfully.")
if __name__ == '__main__':
app.run(host='0.0.0.0', port=int(os.getenv('PORT', 7860)), debug=False)