chat2excel / sql_generator.py
dinhquangson's picture
Upload folder using huggingface_hub
fb68502 verified
import os
import gradio as gr
import pandas as pd
import numpy as np
from neo4j import GraphDatabase
from openai import OpenAI
from dotenv import load_dotenv
import re
import duckdb # Thêm thư viện duckdb
import matplotlib.pyplot as plt
# Tải các biến môi trường
load_dotenv()
# Cấu hình OpenAI client với Ollama
client = OpenAI(
api_key="ollama", # Ollama không yêu cầu API key thực sự
base_url=os.getenv("OLLAMA_API_URL", "http://localhost:11434/v1")
)
# Cấu hình Neo4j
NEO4J_URI = os.getenv("NEO4J_URI", "bolt://localhost:7687")
NEO4J_USER = os.getenv("NEO4J_USER", "neo4j")
NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD", "abcd@1234")
# Biến toàn cục để lưu trữ schema builder và dataframes
current_schema_builder = None
current_dataframes = None # Thêm biến lưu trữ dataframes từ Excel
class Neo4jSchemaBuilder:
def __init__(self, uri, user, password):
self.driver = GraphDatabase.driver(uri, auth=(user, password))
def close(self):
self.driver.close()
def clear_database(self):
with self.driver.session() as session:
session.run("MATCH (n) DETACH DELETE n")
def create_table_node(self, table_name, columns):
with self.driver.session() as session:
session.run("""
CREATE (t:Table {name: $table_name})
""", table_name=table_name)
for col in columns:
col_name = col['name']
col_type = col['type']
col_desc = col.get('description', '')
# Tạo node cho cột
session.run("""
MATCH (t:Table {name: $table_name})
CREATE (c:Column {name: $col_name, type: $col_type, description: $col_desc})
CREATE (t)-[:HAS_COLUMN]->(c)
""", table_name=table_name, col_name=col_name, col_type=col_type, col_desc=col_desc)
# Nếu là khóa chính
if col.get('is_primary_key', False):
session.run("""
MATCH (c:Column {name: $col_name})
SET c.is_primary_key = true
""", col_name=col_name)
def create_foreign_key_relationship(self, from_table, from_column, to_table, to_column, relationship_name=None):
with self.driver.session() as session:
rel_name = relationship_name or f"REFERENCES"
# Tạo quan hệ giữa hai cột
session.run("""
MATCH (c1:Column {name: $from_column})<-[:HAS_COLUMN]-(t1:Table {name: $from_table})
MATCH (c2:Column {name: $to_column})<-[:HAS_COLUMN]-(t2:Table {name: $to_table})
CREATE (c1)-[:REFERENCES {name: $rel_name}]->(c2)
""", from_table=from_table, from_column=from_column,
to_table=to_table, to_column=to_column, rel_name=rel_name)
def get_schema_info(self):
with self.driver.session() as session:
# Lấy thông tin về các bảng và cột
tables_result = session.run("""
MATCH (t:Table)
OPTIONAL MATCH (t)-[:HAS_COLUMN]->(c)
RETURN t.name as table_name, collect({name: c.name, type: c.type, description: c.description, is_primary_key: c.is_primary_key}) as columns
""")
tables = {}
for record in tables_result:
table_name = record["table_name"]
columns = record["columns"]
tables[table_name] = columns
# Lấy thông tin về các khóa ngoại
relationships_result = session.run("""
MATCH (c1:Column)-[r:REFERENCES]->(c2:Column)
MATCH (t1:Table)-[:HAS_COLUMN]->(c1)
MATCH (t2:Table)-[:HAS_COLUMN]->(c2)
RETURN t1.name as from_table, c1.name as from_column, t2.name as to_table, c2.name as to_column, r.name as relationship_name
""")
relationships = []
for record in relationships_result:
relationships.append({
"from_table": record["from_table"],
"from_column": record["from_column"],
"to_table": record["to_table"],
"to_column": record["to_column"],
"relationship_name": record["relationship_name"]
})
return {"tables": tables, "relationships": relationships}
def find_related_tables(self, keywords):
with self.driver.session() as session:
# Tìm các bảng liên quan đến từ khóa
query = """
MATCH (t:Table)
WHERE any(keyword IN $keywords WHERE toLower(t.name) CONTAINS toLower(keyword))
RETURN t.name as table_name
UNION
MATCH (t:Table)-[:HAS_COLUMN]->(c:Column)
WHERE any(keyword IN $keywords WHERE toLower(c.name) CONTAINS toLower(keyword) OR toLower(c.description) CONTAINS toLower(keyword))
RETURN t.name as table_name
"""
result = session.run(query, keywords=keywords)
tables = [record["table_name"] for record in result]
# Nếu không tìm thấy bảng nào, trả về tất cả các bảng
if not tables:
all_tables_query = "MATCH (t:Table) RETURN t.name as table_name"
all_tables_result = session.run(all_tables_query)
tables = [record["table_name"] for record in all_tables_result]
return tables
def get_path_between_tables(self, table1, table2):
with self.driver.session() as session:
query = """
MATCH path = shortestPath((t1:Table {name: $table1})-[*]-(t2:Table {name: $table2}))
RETURN path
"""
result = session.run(query, table1=table1, table2=table2)
paths = []
for record in result:
path = record["path"]
paths.append(path)
return paths
# Hàm xử lý khi tải file Excel lên
def upload_excel(file):
global current_schema_builder
if file is None:
return "Vui lòng tải lên file Excel."
try:
# Xử lý file Excel
schema_builder, message = process_excel_file(file.name)
if schema_builder:
current_schema_builder = schema_builder
# Lấy thông tin schema để hiển thị
schema_info = schema_builder.get_schema_info()
schema_text = "Schema được tạo từ file Excel:\n\n"
# Hiển thị thông tin các bảng
for table_name, columns in schema_info["tables"].items():
schema_text += f"Bảng: {table_name}\n"
for col in columns:
if col["name"]: # Kiểm tra trường hợp cột null
pk_text = " (PRIMARY KEY)" if col.get("is_primary_key") else ""
schema_text += f" - {col['name']} ({col['type']}){pk_text}\n"
schema_text += "\n"
# Hiển thị thông tin các mối quan hệ
schema_text += "Các mối quan hệ:\n"
for rel in schema_info["relationships"]:
schema_text += f" {rel['from_table']}.{rel['from_column']} -> {rel['to_table']}.{rel['to_column']}\n"
return message + "\n\n" + schema_text
else:
return message
except Exception as e:
return f"Lỗi khi xử lý file: {str(e)}"
def extract_keywords(query, openai_client=None):
# Sử dụng Ollama với model qwen3:4b để trích xuất từ khóa
if openai_client:
try:
response = openai_client.chat.completions.create(
model="qwen3:4b",
messages=[
{"role": "system", "content": "Bạn là một assistant giúp trích xuất các từ khóa quan trọng từ câu hỏi để tìm kiếm trong cơ sở dữ liệu. Hãy trả về danh sách các từ khóa, mỗi từ khóa trên một dòng."},
{"role": "user", "content": f"Trích xuất các từ khóa quan trọng từ câu hỏi sau: \"{query}\""}
]
)
keywords_text = response.choices[0].message.content
keywords = [kw.strip() for kw in keywords_text.split('\n') if kw.strip()]
return keywords
except Exception as e:
print(f"Error using Ollama API: {e}")
# Phương pháp đơn giản nếu không có Ollama
words = re.findall(r'\b\w+\b', query.lower())
stopwords = ['là', 'và', 'của', 'cho', 'từ', 'đến', 'những', 'các', 'tất', 'cả', 'có', 'không', 'trong']
keywords = [word for word in words if len(word) > 2 and word not in stopwords]
return keywords
def generate_sql(query, schema_info, related_tables, openai_client=None, error_history=None): # Thêm error_history
# Chuẩn bị thông tin schema để đưa vào prompt
schema_text = "Thông tin về schema:\n"
for table_name, columns in schema_info["tables"].items():
if table_name in related_tables:
schema_text += f"Bảng \"{table_name}\":\n"
for col in columns:
if col["name"]: # Kiểm tra trường hợp cột null
pk_text = " (PRIMARY KEY)" if col.get("is_primary_key") else ""
desc_text = f" - {col.get('description', '')}" if col.get('description') else ""
schema_text += f" \"{col['name']}\" ({col['type']}){pk_text}{desc_text}\n"
# Thêm thông tin về các mối quan hệ
schema_text += "\nCác mối quan hệ:\n"
for rel in schema_info["relationships"]:
if rel["from_table"] in related_tables or rel["to_table"] in related_tables:
schema_text += f" {rel['from_table']}.{rel['from_column']} -> {rel['to_table']}.{rel['to_column']}\n"
# Thêm thông tin lỗi từ các lần thử trước
messages = [
{"role": "system", "content": "Bạn là một assistant giúp sinh câu lệnh SQL từ câu hỏi tiếng Việt. Hãy trả về câu lệnh SQL hoàn chỉnh dựa trên schema đã cung cấp. Tên trường và bảng để trong hai dấu \" không dùng dâu _"},
{"role": "user", "content": f"Schema của cơ sở dữ liệu:\n{schema_text}\n\nCâu hỏi: {query}\n\nHãy sinh câu lệnh SQL để trả lời câu hỏi trên:"}
]
if error_history:
for error in error_history:
messages.append({"role": "assistant", "content": error['sql']})
messages.append({"role": "user", "content": f"Lỗi khi thực thi SQL: {error['error']}. Vui lòng sửa lại câu lệnh SQL."})
# Sử dụng Ollama để sinh SQL
if openai_client:
try:
response = openai_client.chat.completions.create(
model="qwen3:4b",
messages=messages
)
sql = response.choices[0].message.content
# Tách phần SQL khỏi các giải thích (nếu có)
sql_pattern = r"```sql\n(.*?)\n```"
sql_match = re.search(sql_pattern, sql, re.DOTALL)
if sql_match:
sql = sql_match.group(1)
return sql
except Exception as e:
print(f"Error using Ollama API: {e}")
return f"-- Không thể kết nối với Ollama API\n-- Chi tiết lỗi: {str(e)}\n"
# Trường hợp không có Ollama
tables_str = ", ".join(related_tables)
return f"-- Vui lòng cấu hình Ollama API để sinh SQL chính xác\nSELECT * FROM {tables_str} LIMIT 10;"
def process_excel_file(file_path):
global current_dataframes # Lưu dataframes vào biến toàn cục
try:
# Đọc file Excel
xl = pd.ExcelFile(file_path)
# Tạo schema builder
schema_builder = Neo4jSchemaBuilder(NEO4J_URI, NEO4J_USER, NEO4J_PASSWORD)
# Xóa dữ liệu cũ
schema_builder.clear_database()
# Thông tin về các bảng và quan hệ
tables_info = {}
relationships = []
# Lưu các dataframe vào biến toàn cục
current_dataframes = {}
for sheet_name in xl.sheet_names:
df = pd.read_excel(file_path, sheet_name=sheet_name)
current_dataframes[sheet_name] = df
print(df)
# Các cột của bảng
columns = []
for col in df.columns:
# Xác định kiểu dữ liệu của cột
sample_value = df[col].dropna().iloc[0] if not df[col].dropna().empty else None
if pd.isna(sample_value):
data_type = "VARCHAR"
elif isinstance(sample_value, (int, np.integer)):
data_type = "INTEGER"
elif isinstance(sample_value, (float, np.floating)):
data_type = "FLOAT"
elif isinstance(sample_value, pd.Timestamp):
data_type = "DATETIME"
else:
data_type = "VARCHAR"
# Kiểm tra xem có phải là khóa chính không (giả định cột đầu tiên là khóa chính)
is_primary_key = (col == df.columns[0])
columns.append({
"name": col,
"type": data_type,
"is_primary_key": is_primary_key,
"description": ""
})
tables_info[sheet_name] = columns
# Tạo node cho bảng
schema_builder.create_table_node(sheet_name, columns)
# Tìm kiếm các quan hệ có thể có (dựa trên tên cột)
for col in df.columns:
# Nếu tên cột có định dạng like "table_id", có thể là khóa ngoại
if "_id" in col.lower() or "id_" in col.lower():
referenced_table = col.lower().replace("_id", "").replace("id_", "")
if referenced_table in xl.sheet_names and referenced_table != sheet_name:
# Thêm vào danh sách các mối quan hệ cần tạo
relationships.append({
"from_table": sheet_name,
"from_column": col,
"to_table": referenced_table,
"to_column": xl.parse(referenced_table).columns[0], # Giả định cột đầu tiên là khóa chính
"relationship_name": f"REFERS_TO_{referenced_table.upper()}"
})
# Tạo các mối quan hệ
for rel in relationships:
schema_builder.create_foreign_key_relationship(
rel["from_table"], rel["from_column"],
rel["to_table"], rel["to_column"],
rel["relationship_name"]
)
message = f"Đã xử lý file Excel thành công. Đã tạo {len(tables_info)} bảng và {len(relationships)} mối quan hệ."
return schema_builder, message
except Exception as e:
return None, f"Lỗi khi xử lý file Excel: {str(e)}"
def manual_sql_execution(sql_text):
global current_dataframes
if current_dataframes is None:
return None, "Vui lòng tải lên file Excel trước."
# Trích xuất SQL từ code block nếu có
sql_match = re.search(r"```sql\n(.*?)\n```", sql_text, re.DOTALL)
if sql_match:
sql = sql_match.group(1).strip()
else:
sql = sql_text.strip()
try:
result_df, error = execute_sql(sql, current_dataframes)
return result_df, error or ""
except Exception as e:
return None, str(e)
def execute_sql(sql, dataframes):
"""Thực thi câu lệnh SQL trên các dataframe"""
if not dataframes:
return None, "Không có dữ liệu được tải. Vui lòng tải file Excel trước."
conn = duckdb.connect()
try:
# Đăng ký các dataframe vào duckdb
for table_name, df in dataframes.items():
conn.register(table_name, df)
# Thực thi truy vấn
result = conn.execute(sql).fetchdf()
return result, None
except Exception as e:
return None, str(e)
finally:
# Unregister từng bảng thay vì dùng unregister_all()
for table_name in dataframes:
try:
conn.unregister(table_name) # <-- Sửa ở đây
except Exception as e:
print(f"Lỗi khi unregister {table_name}: {str(e)}")
conn.close()
def generate_chart(result_df, label_column, value_column, chart_type="bar"):
if result_df is None or result_df.empty:
return None, "Không có dữ liệu để vẽ biểu đồ"
if not label_column or not value_column:
return None, "Vui lòng chọn cột nhãn và cột giá trị"
try:
# Kiểm tra sự tồn tại của cột
if label_column not in result_df.columns:
return None, f"Cột nhãn '{label_column}' không tồn tại"
if value_column not in result_df.columns:
return None, f"Cột giá trị '{value_column}' không tồn tại"
# Kiểm tra giá trị số cho cột giá trị (trừ pie chart)
if chart_type != "pie" and not np.issubdtype(result_df[value_column].dtype, np.number):
return None, f"Cột giá trị '{value_column}' phải là số"
# Tạo biểu đồ
plt.figure(figsize=(10, 6))
if chart_type == "bar":
plt.bar(result_df[label_column].astype(str), result_df[value_column])
plt.title(f"Biểu đồ cột {value_column} theo {label_column}")
plt.xlabel(label_column)
plt.ylabel(value_column)
plt.xticks(rotation=45)
elif chart_type == "line":
plt.plot(result_df[label_column].astype(str), result_df[value_column], marker='o')
plt.title(f"Biểu đồ đường {value_column} theo {label_column}")
plt.xlabel(label_column)
plt.ylabel(value_column)
plt.xticks(rotation=45)
elif chart_type == "pie":
# Kiểm tra giá trị không âm cho pie chart
if any(result_df[value_column] < 0):
return None, "Pie chart yêu cầu tất cả giá trị không âm"
plt.pie(result_df[value_column], labels=result_df[label_column], autopct='%1.1f%%')
plt.title(f"Biểu đồ tròn {value_column}")
else:
return None, f"Loại chart '{chart_type}' không được hỗ trợ"
plt.tight_layout()
return plt.gcf(), None
except Exception as e:
return None, f"Lỗi khi tạo biểu đồ: {str(e)}"
def update_dropdowns(result_df):
if result_df is None or result_df.empty:
return gr.Dropdown(choices=[], value=None), gr.Dropdown(choices=[], value=None)
label_columns = [col.strip() for col in result_df.columns.tolist()]
numeric_columns = [col.strip() for col in result_df.select_dtypes(include=[np.number]).columns.tolist()]
label_value = label_columns[0] if label_columns else None
value_value = numeric_columns[0] if numeric_columns else None
return (
gr.Dropdown(choices=label_columns, value=label_value),
gr.Dropdown(choices=numeric_columns, value=value_value)
)
def generate_sql_from_question(question, max_retries):
global current_schema_builder, current_dataframes
if current_schema_builder is None:
return "Vui lòng tải lên file Excel trước.", None, None
if not question:
return "Vui lòng nhập câu hỏi.", None, None
try:
keywords = extract_keywords(question, client)
print(f"keywords: {keywords}")
related_tables = current_schema_builder.find_related_tables(keywords)
print(f"related_tables: {related_tables}")
schema_info = current_schema_builder.get_schema_info()
print(f"schema_info: {schema_info}")
error_history = []
final_sql = None
result_df = None
execution_error = None
for attempt in range(int(max_retries)):
# Sinh SQL với thông tin lỗi từ các lần trước
sql = generate_sql(question, schema_info, related_tables, client, error_history)
print(f"sql: {sql}")
if not sql:
continue
# Thực thi SQL
df, error = execute_sql(sql, current_dataframes)
if error is None:
final_sql = sql
result_df = df
break
else:
error_history.append({'sql': sql, 'error': error})
execution_error = error
else:
# Nếu vượt quá số lần thử
error_msg = f"Không thể sinh SQL đúng sau {max_retries} lần thử.\nLỗi cuối cùng: {execution_error}"
return error_msg, None, execution_error
# Định dạng kết quả
sql_text = f"SQL được sinh (lần {attempt + 1}):\n```sql\n{final_sql}\n```"
return sql_text, result_df, None
except Exception as e:
return f"Lỗi khi sinh SQL: {str(e)}", None, str(e)
# Cập nhật giao diện
with gr.Blocks(title="Ứng dụng sinh SQL từ Excel") as app:
gr.Markdown("# Ứng dụng sinh SQL từ Excel sử dụng Neo4j và Graph Schema")
with gr.Tab("Tải lên Excel"):
with gr.Row():
with gr.Column():
excel_file = gr.File(label="Tải lên file Excel")
upload_button = gr.Button("Xử lý file Excel")
with gr.Column():
schema_output = gr.Textbox(label="Kết quả xử lý schema", lines=15)
with gr.Tab("Sinh SQL"):
with gr.Row():
with gr.Column():
question_input = gr.Textbox(label="Nhập câu hỏi bằng tiếng Việt", placeholder="Ví dụ: Danh sách khách hàng đã mua sản phẩm X")
max_retries_slider = gr.Slider(minimum=1, maximum=10, step=1, value=5, label="Số lần thử lại tối đa")
generate_button = gr.Button("Sinh và Chạy SQL")
sql_output = gr.Textbox(label="Kết quả SQL", lines=10)
manual_run_button = gr.Button("Chạy SQL Manual")
with gr.Column():
result_output = gr.Dataframe(label="Kết quả truy vấn")
error_output = gr.Textbox(label="Thông báo lỗi")
with gr.Tab("Sinh Chart"):
with gr.Row():
label_dropdown = gr.Dropdown(label="Chọn cột nhãn")
value_dropdown = gr.Dropdown(label="Chọn cột giá trị")
chart_type_dropdown = gr.Dropdown(
choices=["bar", "line", "pie"],
value="bar",
label="Chọn loại biểu đồ"
)
chart_button = gr.Button("Sinh Chart")
chart_output = gr.Plot(label="Biểu đồ kết quả")
error_output = gr.Textbox(label="Thông báo lỗi")
with gr.Tab("Hướng dẫn"):
gr.Markdown("""
## Hướng dẫn sử dụng
### Bước 1: Tải lên file Excel
- Tải lên file Excel chứa dữ liệu của bạn
- Mỗi sheet sẽ được xem như một bảng trong cơ sở dữ liệu
- Hàng đầu tiên sẽ được sử dụng làm tên cột
- Cột đầu tiên của mỗi bảng sẽ được giả định là khóa chính
- Hệ thống sẽ tự động phát hiện các mối quan hệ dựa trên tên cột
### Bước 2: Nhập câu hỏi và sinh SQL
- Nhập câu hỏi bằng tiếng Việt
- Hệ thống sẽ trích xuất từ khóa từ câu hỏi
- Dựa vào từ khóa, hệ thống sẽ tìm các bảng liên quan trong graph schema
- Sau đó, hệ thống sẽ sinh câu lệnh SQL để trả lời câu hỏi
### Cấu hình
- Cần cài đặt Neo4j và cấu hình kết nối
- Cần cài đặt Ollama và cấu hình model qwen3:4b and qwen3:4b:
```bash
ollama pull qwen3:4b
ollama pull qwen3:4b
```
- Đặt URL API của Ollama trong file .env:
```
OLLAMA_API_URL=http://localhost:11434/v1
```
""")
# Kết nối sự kiện
result_output.change(
fn=update_dropdowns,
inputs=result_output,
outputs=[label_dropdown, value_dropdown]
)
upload_button.click(
fn=upload_excel,
inputs=excel_file,
outputs=schema_output
)
generate_button.click(
fn=generate_sql_from_question,
inputs=[question_input, max_retries_slider],
outputs=[sql_output, result_output, error_output]
)
# Thêm sự kiện cho nút manual
manual_run_button.click(
fn=manual_sql_execution,
inputs=[sql_output],
outputs=[result_output, error_output]
)
# Thêm kết nối sự kiện cho nút sinh chart
chart_button.click(
fn=generate_chart,
inputs=[result_output, label_dropdown, value_dropdown, chart_type_dropdown],
outputs=[chart_output, error_output]
)
# Khởi chạy ứng dụng
if __name__ == "__main__":
app.launch(share=True)