File size: 3,233 Bytes
ed2fe48
321debd
 
 
 
 
 
ed2fe48
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
321debd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
612bd91
 
 
2ec8afb
 
321debd
 
 
03502b1
321debd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ed2fe48
 
 
 
321debd
03502b1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import pandas as pd
import sqlite3
import os.path
import gradio as gr
import os
import shutil
import uuid


kb_df = pd.read_csv("./data/kb.csv")
def get_kb(db_name, knowledge=None):
    if not knowledge:
        result = kb_df[(kb_df['db_name']==db_name)]
    else:
        result = kb_df[(kb_df['db_name']==db_name) & (kb_df['knowledge'].str.contains(knowledge))]
    return result

schema_df = pd.read_csv("./data/db_schema.csv")
def get_schema(db_name, table_name):
    result = schema_df[(schema_df['db_name']==db_name) & (schema_df['table_name']==table_name)]
    result = result[['schema', 'sample_data']]
    return result

def get_tables(db_name):
    result = schema_df[(schema_df['db_name']==db_name)]
    result = result.drop_duplicates(subset=['table_name'])
    tables = result['table_name'].to_list()
    return tables

meaning_df = pd.read_csv("./data/column_meanings.csv")
def get_meaning(db_name, table_name):
    result = meaning_df[(meaning_df['db_name']==db_name) & (meaning_df['table_name']==table_name)]
    result = result[['column_name', 'meaning']]
    return result

def execute_sqlite_query(db_name, query):
    """
    執行 SQLite 查詢並返回結果
    
    參數:
        db_name (str): 數據庫名稱
        query (str): SQL 查詢字符串
        
    返回:
        dict: 包含查詢結果或錯誤信息的字典
            成功時: {'success': True, 'columns': 列名列表, 'data': 查詢結果}
            失敗時: {'success': False, 'error': 錯誤信息}
    """
    
    db_path = f"./database/{db_name}_template.sqlite"
    
    # 檢查數據庫文件是否存在
    if not os.path.exists(db_path):
        return {'success': False, 'error': f"數據庫文件不存在: {db_path}"}
    
    uid = uuid.uuid4()
    tmp_db_file = f'/tmp/{uid}_{os.path.basename(db_path)}'
    shutil.copy(db_path, tmp_db_file)
    # check the file exist 
    if not os.path.exists(tmp_db_file):
        return {'success': False, 'error': f"cp {db_path} to {tmp_db_file} failed."}
    else:
        print(f"cp {db_path} to {tmp_db_file} ok.")

    try:
        # 連接到 SQLite 數據庫
        conn = sqlite3.connect(db_path)
        cursor = conn.cursor()
        
        # 執行查詢
        cursor.execute(query)
        
        # 獲取結果
        rows = cursor.fetchall()
        
        # 獲取列名
        columns = [description[0] for description in cursor.description] if cursor.description else []
        
        # 關閉連接
        conn.close()

        os.remove(tmp_db_file)
        return {
            'success': True,
            'columns': columns,
            'data': rows
        }
        
    except sqlite3.Error as e:
        # 捕獲 SQLite 錯誤
        os.remove(tmp_db_file)
        return {
            'success': False,
            'error': str(e)
        }
    except Exception as e:
        # 捕獲其他錯誤
        os.remove(tmp_db_file)
        return {
            'success': False,
            'error': f"執行查詢時發生錯誤: {str(e)}"
        }

get_kb('solar', 'PP')
get_schema('solar', 'alerts')
get_tables('solar')
get_meaning('solar', 'alerts')
result = execute_sqlite_query('solar', 'SELECT * from test')
print(result)