File size: 4,314 Bytes
ed2fe48
321debd
 
 
 
 
 
ed2fe48
 
e87f50f
ed2fe48
 
 
 
e87f50f
ed2fe48
 
e87f50f
ed2fe48
 
 
 
 
 
 
 
 
 
 
e87f50f
ed2fe48
 
e87f50f
 
 
 
 
 
 
 
 
 
 
ed2fe48
 
321debd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
612bd91
 
 
295aacc
 
321debd
 
 
15279d5
321debd
 
e87f50f
 
 
 
 
 
 
321debd
 
7ec1d7a
 
 
 
 
 
 
 
 
 
 
321debd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ed2fe48
 
 
 
321debd
295aacc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import pandas as pd
import sqlite3
import os.path
import gradio as gr
import os
import shutil
import uuid


kb_df = pd.read_csv("./data/kb.csv", index_col=0)
def get_kb(db_name, knowledge=None):
    if not knowledge:
        result = kb_df[(kb_df['db_name']==db_name)]
    else:
        result = kb_df[(kb_df['db_name']==db_name) & (kb_df['knowledge'].str.contains(knowledge, case=False))]
    return result

schema_df = pd.read_csv("./data/db_schema.csv", index_col=0)
def get_schema(db_name, table_name):
    result = schema_df[(schema_df['db_name']==db_name) & (schema_df['table_name']==table_name)]
    result = result[['schema', 'sample_data']]
    return result

def get_tables(db_name):
    result = schema_df[(schema_df['db_name']==db_name)]
    result = result.drop_duplicates(subset=['table_name'])
    tables = result['table_name'].to_list()
    return tables

meaning_df = pd.read_csv("./data/column_meanings.csv", index_col=0)
def get_meaning(db_name, table_name):
    result = meaning_df[(meaning_df['db_name']==db_name) & (meaning_df['table_name']==table_name)]
    result = result[['table_name', 'column_name', 'meaning']]
    return result

def search_meaning(db_name, keyword):
    # Search in meaning column
    result_meaning = meaning_df[(meaning_df['db_name']==db_name) & (meaning_df['meaning'].str.contains(keyword, case=False))]
    # Search in column_name column
    result_column = meaning_df[(meaning_df['db_name']==db_name) & (meaning_df['column_name'].str.contains(keyword, case=False))]
    # Combine results and remove duplicates
    result = pd.concat([result_meaning, result_column]).drop_duplicates()
    result = result[['table_name', 'column_name', 'meaning']]
    return result

def execute_sqlite_query(db_name, query):
    """
    執行 SQLite 查詢並返回結果
    
    參數:
        db_name (str): 數據庫名稱
        query (str): SQL 查詢字符串
        
    返回:
        dict: 包含查詢結果或錯誤信息的字典
            成功時: {'success': True, 'columns': 列名列表, 'data': 查詢結果}
            失敗時: {'success': False, 'error': 錯誤信息}
    """
    
    db_path = f"./database/{db_name}_template.sqlite"
    
    # 檢查數據庫文件是否存在
    if not os.path.exists(db_path):
        return {'success': False, 'error': f"數據庫文件不存在: {db_path}"}
    
    uid = uuid.uuid4()
    tmp_db_file = f'/tmp/{uid}_{os.path.basename(db_path)}'
    shutil.copy(db_path, tmp_db_file)
    # check the file exist 
    if not os.path.exists(tmp_db_file):
        return {'success': False, 'error': f"cp {db_path} to {tmp_db_file} failed."}
    # else:
    #     print(f"cp {db_path} to {tmp_db_file} ok.")

    try:
        # 連接到 SQLite 數據庫
        conn = sqlite3.connect(tmp_db_file)
        cursor = conn.cursor()
        
        # # 將查詢按分號分開並執行每條查詢
        # queries = query.split(';')
        # for q in queries:
        #     q = q.strip()
        #     if q:  # 確保不執行空查詢
        #         cursor.execute(q)
        cursor.executescript(query)
        
        # 獲取結果
        try:
            rows = cursor.fetchall()
            # 獲取列名
            columns = [description[0] for description in cursor.description] if cursor.description else []
        except sqlite3.Error:
            # 如果最後一個語句不是 SELECT,則沒有結果可獲取
            rows = []
            columns = []
            
        # 提交事務
        conn.commit()
        
        # 關閉連接
        conn.close()

        os.remove(tmp_db_file)
        return {
            'success': True,
            'columns': columns,
            'data': rows
        }
        
    except sqlite3.Error as e:
        # 捕獲 SQLite 錯誤
        os.remove(tmp_db_file)
        return {
            'success': False,
            'error': str(e)
        }
    except Exception as e:
        # 捕獲其他錯誤
        os.remove(tmp_db_file)
        return {
            'success': False,
            'error': f"執行查詢時發生錯誤: {str(e)}"
        }

get_kb('solar', 'PP')
get_schema('solar', 'alerts')
get_tables('solar')
get_meaning('solar', 'alerts')
result = execute_sqlite_query('solar', 'SELECT * from test')
# print(result)