import pandas as pd import sqlite3 import os.path import gradio as gr import os import shutil import uuid kb_df = pd.read_csv("./data/kb.csv", index_col=0) def get_kb(db_name, knowledge=None): if not knowledge: result = kb_df[(kb_df['db_name']==db_name)] else: result = kb_df[(kb_df['db_name']==db_name) & (kb_df['knowledge'].str.contains(knowledge, case=False))] return result schema_df = pd.read_csv("./data/db_schema.csv", index_col=0) def get_schema(db_name, table_name): result = schema_df[(schema_df['db_name']==db_name) & (schema_df['table_name']==table_name)] result = result[['schema', 'sample_data']] return result def get_tables(db_name): result = schema_df[(schema_df['db_name']==db_name)] result = result.drop_duplicates(subset=['table_name']) tables = result['table_name'].to_list() return tables meaning_df = pd.read_csv("./data/column_meanings.csv", index_col=0) def get_meaning(db_name, table_name): result = meaning_df[(meaning_df['db_name']==db_name) & (meaning_df['table_name']==table_name)] result = result[['table_name', 'column_name', 'meaning']] return result def search_meaning(db_name, keyword): # Search in meaning column result_meaning = meaning_df[(meaning_df['db_name']==db_name) & (meaning_df['meaning'].str.contains(keyword, case=False))] # Search in column_name column result_column = meaning_df[(meaning_df['db_name']==db_name) & (meaning_df['column_name'].str.contains(keyword, case=False))] # Combine results and remove duplicates result = pd.concat([result_meaning, result_column]).drop_duplicates() result = result[['table_name', 'column_name', 'meaning']] return result def execute_sqlite_query(db_name, query): """ 執行 SQLite 查詢並返回結果 參數: db_name (str): 數據庫名稱 query (str): SQL 查詢字符串 返回: dict: 包含查詢結果或錯誤信息的字典 成功時: {'success': True, 'columns': 列名列表, 'data': 查詢結果} 失敗時: {'success': False, 'error': 錯誤信息} """ db_path = f"./database/{db_name}_template.sqlite" # 檢查數據庫文件是否存在 if not os.path.exists(db_path): return {'success': False, 'error': f"數據庫文件不存在: {db_path}"} uid = uuid.uuid4() tmp_db_file = f'/tmp/{uid}_{os.path.basename(db_path)}' shutil.copy(db_path, tmp_db_file) # check the file exist if not os.path.exists(tmp_db_file): return {'success': False, 'error': f"cp {db_path} to {tmp_db_file} failed."} # else: # print(f"cp {db_path} to {tmp_db_file} ok.") try: # 連接到 SQLite 數據庫 conn = sqlite3.connect(tmp_db_file) cursor = conn.cursor() # # 將查詢按分號分開並執行每條查詢 # queries = query.split(';') # for q in queries: # q = q.strip() # if q: # 確保不執行空查詢 # cursor.execute(q) cursor.executescript(query) # 獲取結果 try: rows = cursor.fetchall() # 獲取列名 columns = [description[0] for description in cursor.description] if cursor.description else [] except sqlite3.Error: # 如果最後一個語句不是 SELECT,則沒有結果可獲取 rows = [] columns = [] # 提交事務 conn.commit() # 關閉連接 conn.close() os.remove(tmp_db_file) return { 'success': True, 'columns': columns, 'data': rows } except sqlite3.Error as e: # 捕獲 SQLite 錯誤 os.remove(tmp_db_file) return { 'success': False, 'error': str(e) } except Exception as e: # 捕獲其他錯誤 os.remove(tmp_db_file) return { 'success': False, 'error': f"執行查詢時發生錯誤: {str(e)}" } get_kb('solar', 'PP') get_schema('solar', 'alerts') get_tables('solar') get_meaning('solar', 'alerts') result = execute_sqlite_query('solar', 'SELECT * from test') # print(result)