Robin Chiu
improve some tools.
e87f50f
import pandas as pd
import sqlite3
import os.path
import gradio as gr
import os
import shutil
import uuid
kb_df = pd.read_csv("./data/kb.csv", index_col=0)
def get_kb(db_name, knowledge=None):
if not knowledge:
result = kb_df[(kb_df['db_name']==db_name)]
else:
result = kb_df[(kb_df['db_name']==db_name) & (kb_df['knowledge'].str.contains(knowledge, case=False))]
return result
schema_df = pd.read_csv("./data/db_schema.csv", index_col=0)
def get_schema(db_name, table_name):
result = schema_df[(schema_df['db_name']==db_name) & (schema_df['table_name']==table_name)]
result = result[['schema', 'sample_data']]
return result
def get_tables(db_name):
result = schema_df[(schema_df['db_name']==db_name)]
result = result.drop_duplicates(subset=['table_name'])
tables = result['table_name'].to_list()
return tables
meaning_df = pd.read_csv("./data/column_meanings.csv", index_col=0)
def get_meaning(db_name, table_name):
result = meaning_df[(meaning_df['db_name']==db_name) & (meaning_df['table_name']==table_name)]
result = result[['table_name', 'column_name', 'meaning']]
return result
def search_meaning(db_name, keyword):
# Search in meaning column
result_meaning = meaning_df[(meaning_df['db_name']==db_name) & (meaning_df['meaning'].str.contains(keyword, case=False))]
# Search in column_name column
result_column = meaning_df[(meaning_df['db_name']==db_name) & (meaning_df['column_name'].str.contains(keyword, case=False))]
# Combine results and remove duplicates
result = pd.concat([result_meaning, result_column]).drop_duplicates()
result = result[['table_name', 'column_name', 'meaning']]
return result
def execute_sqlite_query(db_name, query):
"""
執行 SQLite 查詢並返回結果
參數:
db_name (str): 數據庫名稱
query (str): SQL 查詢字符串
返回:
dict: 包含查詢結果或錯誤信息的字典
成功時: {'success': True, 'columns': 列名列表, 'data': 查詢結果}
失敗時: {'success': False, 'error': 錯誤信息}
"""
db_path = f"./database/{db_name}_template.sqlite"
# 檢查數據庫文件是否存在
if not os.path.exists(db_path):
return {'success': False, 'error': f"數據庫文件不存在: {db_path}"}
uid = uuid.uuid4()
tmp_db_file = f'/tmp/{uid}_{os.path.basename(db_path)}'
shutil.copy(db_path, tmp_db_file)
# check the file exist
if not os.path.exists(tmp_db_file):
return {'success': False, 'error': f"cp {db_path} to {tmp_db_file} failed."}
# else:
# print(f"cp {db_path} to {tmp_db_file} ok.")
try:
# 連接到 SQLite 數據庫
conn = sqlite3.connect(tmp_db_file)
cursor = conn.cursor()
# # 將查詢按分號分開並執行每條查詢
# queries = query.split(';')
# for q in queries:
# q = q.strip()
# if q: # 確保不執行空查詢
# cursor.execute(q)
cursor.executescript(query)
# 獲取結果
try:
rows = cursor.fetchall()
# 獲取列名
columns = [description[0] for description in cursor.description] if cursor.description else []
except sqlite3.Error:
# 如果最後一個語句不是 SELECT,則沒有結果可獲取
rows = []
columns = []
# 提交事務
conn.commit()
# 關閉連接
conn.close()
os.remove(tmp_db_file)
return {
'success': True,
'columns': columns,
'data': rows
}
except sqlite3.Error as e:
# 捕獲 SQLite 錯誤
os.remove(tmp_db_file)
return {
'success': False,
'error': str(e)
}
except Exception as e:
# 捕獲其他錯誤
os.remove(tmp_db_file)
return {
'success': False,
'error': f"執行查詢時發生錯誤: {str(e)}"
}
get_kb('solar', 'PP')
get_schema('solar', 'alerts')
get_tables('solar')
get_meaning('solar', 'alerts')
result = execute_sqlite_query('solar', 'SELECT * from test')
# print(result)