Spaces:
Running
Running
File size: 4,314 Bytes
ed2fe48 321debd ed2fe48 e87f50f ed2fe48 e87f50f ed2fe48 e87f50f ed2fe48 e87f50f ed2fe48 e87f50f ed2fe48 321debd 612bd91 295aacc 321debd 15279d5 321debd e87f50f 321debd 7ec1d7a 321debd ed2fe48 321debd 295aacc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
import pandas as pd
import sqlite3
import os.path
import gradio as gr
import os
import shutil
import uuid
kb_df = pd.read_csv("./data/kb.csv", index_col=0)
def get_kb(db_name, knowledge=None):
if not knowledge:
result = kb_df[(kb_df['db_name']==db_name)]
else:
result = kb_df[(kb_df['db_name']==db_name) & (kb_df['knowledge'].str.contains(knowledge, case=False))]
return result
schema_df = pd.read_csv("./data/db_schema.csv", index_col=0)
def get_schema(db_name, table_name):
result = schema_df[(schema_df['db_name']==db_name) & (schema_df['table_name']==table_name)]
result = result[['schema', 'sample_data']]
return result
def get_tables(db_name):
result = schema_df[(schema_df['db_name']==db_name)]
result = result.drop_duplicates(subset=['table_name'])
tables = result['table_name'].to_list()
return tables
meaning_df = pd.read_csv("./data/column_meanings.csv", index_col=0)
def get_meaning(db_name, table_name):
result = meaning_df[(meaning_df['db_name']==db_name) & (meaning_df['table_name']==table_name)]
result = result[['table_name', 'column_name', 'meaning']]
return result
def search_meaning(db_name, keyword):
# Search in meaning column
result_meaning = meaning_df[(meaning_df['db_name']==db_name) & (meaning_df['meaning'].str.contains(keyword, case=False))]
# Search in column_name column
result_column = meaning_df[(meaning_df['db_name']==db_name) & (meaning_df['column_name'].str.contains(keyword, case=False))]
# Combine results and remove duplicates
result = pd.concat([result_meaning, result_column]).drop_duplicates()
result = result[['table_name', 'column_name', 'meaning']]
return result
def execute_sqlite_query(db_name, query):
"""
執行 SQLite 查詢並返回結果
參數:
db_name (str): 數據庫名稱
query (str): SQL 查詢字符串
返回:
dict: 包含查詢結果或錯誤信息的字典
成功時: {'success': True, 'columns': 列名列表, 'data': 查詢結果}
失敗時: {'success': False, 'error': 錯誤信息}
"""
db_path = f"./database/{db_name}_template.sqlite"
# 檢查數據庫文件是否存在
if not os.path.exists(db_path):
return {'success': False, 'error': f"數據庫文件不存在: {db_path}"}
uid = uuid.uuid4()
tmp_db_file = f'/tmp/{uid}_{os.path.basename(db_path)}'
shutil.copy(db_path, tmp_db_file)
# check the file exist
if not os.path.exists(tmp_db_file):
return {'success': False, 'error': f"cp {db_path} to {tmp_db_file} failed."}
# else:
# print(f"cp {db_path} to {tmp_db_file} ok.")
try:
# 連接到 SQLite 數據庫
conn = sqlite3.connect(tmp_db_file)
cursor = conn.cursor()
# # 將查詢按分號分開並執行每條查詢
# queries = query.split(';')
# for q in queries:
# q = q.strip()
# if q: # 確保不執行空查詢
# cursor.execute(q)
cursor.executescript(query)
# 獲取結果
try:
rows = cursor.fetchall()
# 獲取列名
columns = [description[0] for description in cursor.description] if cursor.description else []
except sqlite3.Error:
# 如果最後一個語句不是 SELECT,則沒有結果可獲取
rows = []
columns = []
# 提交事務
conn.commit()
# 關閉連接
conn.close()
os.remove(tmp_db_file)
return {
'success': True,
'columns': columns,
'data': rows
}
except sqlite3.Error as e:
# 捕獲 SQLite 錯誤
os.remove(tmp_db_file)
return {
'success': False,
'error': str(e)
}
except Exception as e:
# 捕獲其他錯誤
os.remove(tmp_db_file)
return {
'success': False,
'error': f"執行查詢時發生錯誤: {str(e)}"
}
get_kb('solar', 'PP')
get_schema('solar', 'alerts')
get_tables('solar')
get_meaning('solar', 'alerts')
result = execute_sqlite_query('solar', 'SELECT * from test')
# print(result)
|