File size: 12,576 Bytes
9f63939
ed2fe48
 
 
e87f50f
ed2fe48
e87f50f
 
 
 
 
6185b4f
ed2fe48
 
07cc8e5
ed2fe48
 
 
07cc8e5
 
ed2fe48
 
 
 
 
 
07cc8e5
ed2fe48
 
 
6185b4f
ed2fe48
6185b4f
ed2fe48
 
 
e87f50f
 
6185b4f
 
ed2fe48
e87f50f
 
ed2fe48
6185b4f
 
e87f50f
 
 
ed2fe48
 
e87f50f
ed2fe48
e87f50f
6185b4f
ed2fe48
e87f50f
ed2fe48
 
 
 
e87f50f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ed2fe48
e87f50f
 
 
 
 
 
 
 
 
 
 
 
 
6185b4f
ed2fe48
6185b4f
ed2fe48
 
 
 
6185b4f
 
ed2fe48
 
 
6185b4f
e87f50f
 
 
ed2fe48
 
 
 
e87f50f
6185b4f
ed2fe48
e87f50f
ed2fe48
e87f50f
 
ed2fe48
e87f50f
 
ed2fe48
e87f50f
 
 
 
 
9f63939
ed2fe48
9f63939
ed2fe48
 
 
9f63939
 
ed2fe48
 
 
9f63939
ed2fe48
 
 
 
 
 
 
9f63939
ed2fe48
 
 
 
6185b4f
ed2fe48
 
 
 
 
 
 
 
 
 
 
 
 
e87f50f
 
 
ed2fe48
 
 
 
e87f50f
ed2fe48
 
e87f50f
ed2fe48
 
 
 
e87f50f
ed2fe48
e87f50f
321debd
e87f50f
321debd
e87f50f
 
 
 
 
 
321debd
e87f50f
 
 
 
321debd
e87f50f
 
 
 
321debd
e87f50f
321debd
e87f50f
 
 
 
 
321debd
e87f50f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
321debd
 
 
 
 
e87f50f
 
321debd
 
e87f50f
 
321debd
e87f50f
321debd
6185b4f
ed2fe48
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6185b4f
ed2fe48
 
 
 
 
 
 
e87f50f
 
ed2fe48
6185b4f
ed2fe48
 
e87f50f
321debd
 
 
 
 
 
 
 
 
ed2fe48
9f63939
e87f50f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
import gradio as gr
import pandas as pd
import sys
import os
import json

from utils.tools import get_kb, get_schema, get_tables, get_meaning, execute_sqlite_query, search_meaning


def get_message(msg):
    return {"status": "error", "headers":['message'], "data": [[f"{msg}"]]}

@gr.mcp.tool()
def get_all_databases() -> list:
    """
    Get all available database names from the schema file.
    
    This function reads the database schema CSV file and extracts unique database names.
    
    Returns:
        list: A sorted list of unique database names available in the system.
    
    Example:
        >>> databases = get_all_databases()
        >>> print(databases)
        ['db1', 'db2', 'db3']
    """
    # 從 schema_df 中獲取所有唯一的 db_name
    schema_df = pd.read_csv("./data/db_schema.csv")
    return sorted(schema_df['db_name'].unique().tolist())

def kb_query(db_name, knowledge_keyword):
    """
    Query the knowledge base for a specific database with optional keyword filtering.
    
    This function retrieves knowledge base information for a specified database.
    If keywords are provided, it filters the results based on those keywords.
    Multiple keywords can be separated by commas, 'or', or spaces.
    
    Args:
        db_name (str): The name of the database to query. Must not be empty.
        knowledge_keyword (str): Optional keywords to filter knowledge base results.
                               Multiple keywords can be separated by commas, 'or', or spaces.
                               If empty or None, returns all knowledge for the database.
    
    Returns:
        dict: Dictionary containing query results with structure:
              - Success: {"status": "success", "headers": [...], "data": [...]}
              - Error: {"status": "error", "headers": ['message'], "data": [['Error message.']]}
    
    Example:
        >>> result = kb_query("sales_db", "customer, order")
        >>> print(result)
        # Returns dict with customer and order-related knowledge from sales_db
    """
    if not db_name:
        return get_message("Please select db_name.")
    
    if not knowledge_keyword:
        result = get_kb(db_name)
    else:
        # Split keywords by comma, 'or', or space
        keywords = [k.strip() for k in knowledge_keyword.replace('or', ',').replace(' ', ',').split(',') if k.strip()]

        # Get results for each keyword and combine them
        combined_results = []
        for keyword in keywords:
            keyword_result = get_kb(db_name, keyword)
            # 使用 append 而不是 extend,因為 get_kb 返回 DataFrame
            if not keyword_result.empty:
                combined_results.append(keyword_result)
    
        # Remove duplicates if any
        # Convert DataFrames to a single DataFrame and remove duplicate rows
        if combined_results:
            result = pd.concat(combined_results, ignore_index=True).drop_duplicates()
        else:
            result = pd.DataFrame()

    if len(result) == 0:
        return get_message("Not Found.")
    
    # Convert DataFrame to JSON format with columns and rows
    data = {
        "columns": result.columns.tolist(),
        "rows": result.values.tolist()
    }
    return {"status": "success", "headers": result.columns.tolist(), "data":result.values.tolist()}
    # return json.dumps(
    #     {"status": "success", "data": data},
    #     ensure_ascii=False,
    #     indent=2
    # )

def schema_query(db_name, table_name):
    """
    Query the schema structure for a specific table in a database.
    
    This function retrieves detailed schema information for a specified table
    within a given database, including column definitions, data types, and constraints.
    
    Args:
        db_name (str): The name of the database containing the table. Must not be empty.
        table_name (str): The name of the table to query schema for. Must not be empty.
    
    Returns:
        dict: Dictionary containing query results with structure:
              - Success: {"status": "success", "headers": [...], "data": [...]}
              - Error: {"status": "error", "headers": ['message'], "data": [['Error message.']]}
    
    Example:
        >>> result = schema_query("sales_db", "customers")
        >>> print(result)
        # Returns dict with column definitions for customers table
    """
    if not db_name or not table_name:
        return get_message("請選擇資料庫和資料表")
    
    # 取得 schema 資訊
    schema_result = get_schema(db_name, table_name)
    
    if len(schema_result) == 0:
        return get_message("Not Found.")
    
    return {
        "status": "success",
        "headers": schema_result.columns.tolist(),
        "data": schema_result.values.tolist()
    }

def tables_query(db_name):
    """
    Get list of all tables available in a specific database.
    
    This function retrieves all table names that exist within the specified database.
    
    Args:
        db_name (str): The name of the database to query tables from. 
                      If empty or None, returns empty list.
    
    Returns:
        list: List of table names in the specified database. Returns empty list
              if database name is not provided or no tables found.
    
    Example:
        >>> tables = tables_query("sales_db")
        >>> print(tables)
        ['customers', 'orders', 'products', 'inventory']
    """
    if not db_name:
        return []
    
    return get_tables(db_name)

def meaning_query(db_name, table_name):
    """
    Query the meaning and description of columns in a specific table.
    
    This function retrieves detailed explanations and meanings for each column
    in the specified table, helping users understand the purpose and content
    of each field.
    
    Args:
        db_name (str): The name of the database containing the table. Must not be empty.
        table_name (str): The name of the table to query column meanings for. Must not be empty.
    
    Returns:
        dict: Dictionary containing query results with structure:
              - Success: {"status": "success", "headers": [...], "data": [...]}
              - Error: {"status": "error", "headers": ['message'], "data": [['Error message.']]}
    
    Example:
        >>> result = meaning_query("sales_db", "customers")
        >>> print(result)
        # Returns dict with explanations for each column in customers table
    """
    if not db_name or not table_name:
        return get_message("請選擇資料庫和資料表")
    
    result = get_meaning(db_name, table_name)
    
    if len(result) == 0:
        return get_message("Not Found.")
    
    return {"status": "success", "headers": result.columns.tolist(), "data": result.values.tolist()}

def meaning_search(db_name, keyword):
    """
    Search for column meanings using one or more keywords in a specific database.
    
    This function searches for columns whose names or meanings contain the specified
    keyword(s), helping users find relevant fields across all tables in the database.
    Multiple keywords can be separated by commas or spaces.
    
    Args:
        db_name (str): The name of the database to search in. Must not be empty.
        keyword (str): The search keyword(s) to match against column names or meanings.
                      Multiple keywords can be separated by commas or spaces. Must not be empty.
    
    Returns:
        dict: Dictionary containing query results with structure:
              - Success: {"status": "success", "headers": [...], "data": [...]}
              - Error: {"status": "error", "headers": ['message'], "data": [['Error message.']]}
    
    Example:
        >>> result = meaning_search("sales_db", "customer")
        >>> print(result)
        # Returns dict with all columns containing "customer" in their name or meaning
        
        >>> result = meaning_search("sales_db", "customer, order, product")
        >>> print(result)
        # Returns dict with all columns containing "customer", "order", or "product" in their name or meaning
    """
    if not db_name or not keyword:
        return get_message("Please input keyword.")
    
    # Split keywords by comma or space
    keywords = [k.strip() for k in keyword.replace(' ', ',').split(',') if k.strip()]
    
    # Get results for each keyword and combine them
    combined_results = []
    for kw in keywords:
        keyword_result = search_meaning(db_name, kw)
        # Append non-empty results
        if not keyword_result.empty:
            combined_results.append(keyword_result)
    
    # Remove duplicates if any
    # Convert DataFrames to a single DataFrame and remove duplicate rows
    if combined_results:
        result = pd.concat(combined_results, ignore_index=True).drop_duplicates()
    else:
        result = pd.DataFrame()
    
    if len(result) == 0:
        return get_message("Not Found.")
    
    return {"status": "success", "headers": result.columns.tolist(), "data": result.values.tolist()}

def execute_sqlite(db_name: str, query: str):

    if not db_name or not query:
        return pd.DataFrame({"message": ["請選擇資料庫和SQL"]})
    
    result = execute_sqlite_query(db_name, query)
    
    # if len(result) == 0:
    #     return pd.DataFrame({"message": ["沒有找到相關資料"]})
    
    if result['success']:
        # return pd.DataFrame(result['data'], columns=result['columns'])
        return pd.DataFrame({"type":["OK"], "message": ["Success"]})
    else:
        return pd.DataFrame({"type":["Error"], "message": [result['error']]})


# 建立 Gradio 界面
with gr.Blocks(title="資料庫查詢工具") as demo:
    gr.Markdown("# 資料庫查詢工具")
    gr.Markdown("這個工具可以幫助您查詢資料庫的知識庫、資料表結構和欄位意義。")
    
    # 獲取所有可用的資料庫
    all_dbs = get_all_databases()
    
    with gr.Tab("知識庫查詢"):
        with gr.Row():
            kb_db = gr.Dropdown(choices=all_dbs, label="選擇資料庫", value=all_dbs[0] if all_dbs else None)
            kb_keyword = gr.Textbox(label="知識關鍵字 (可選)")
        kb_search = gr.Button("查詢知識庫")
        kb_result = gr.DataFrame(label="查詢結果")
        kb_search.click(kb_query, inputs=[kb_db, kb_keyword], outputs=kb_result)
        gr.api(get_all_databases)

    with gr.Tab("資料表查詢"):
        with gr.Row():
            kb_db = gr.Dropdown(choices=all_dbs, label="選擇資料庫", value=all_dbs[0] if all_dbs else None)
        kb_search = gr.Button("查詢資料表")
        kb_result = gr.DataFrame(label="查詢結果")
        kb_search.click(tables_query, inputs=[kb_db], outputs=kb_result)
    
    with gr.Tab("資料表結構查詢"):
        with gr.Row():
            schema_db = gr.Dropdown(choices=all_dbs, label="選擇資料庫", value=all_dbs[0] if all_dbs else None)
            schema_table = gr.Text(label="選擇資料表")
        schema_search = gr.Button("查詢資料表結構")
        schema_result = gr.DataFrame(label="查詢結果")
        
        # 當資料庫選擇變更時,更新資料表下拉選單
        # schema_db.change(update_tables, inputs=schema_db, outputs=schema_table)
        schema_search.click(schema_query, inputs=[schema_db, schema_table], outputs=schema_result)
    
    with gr.Tab("欄位意義查詢"):
        with gr.Row():
            meaning_db = gr.Dropdown(choices=all_dbs, label="選擇資料庫", value=all_dbs[0] if all_dbs else None)
            meaning_keyword = gr.Text(label="Keyword")
        search_button = gr.Button("查詢欄位意義")
        meaning_result = gr.DataFrame(label="查詢結果")
        
        # 當資料庫選擇變更時,更新資料表下拉選單
        # meaning_db.change(update_tables, inputs=meaning_db, outputs=meaning_table)
        search_button.click(meaning_search, inputs=[meaning_db, meaning_keyword], outputs=meaning_result)

    with gr.Tab("sqlite SQL 執行"):
        with gr.Row():
            sql_db = gr.Dropdown(choices=all_dbs, label="選擇資料庫", value=all_dbs[0] if all_dbs else None)
            sql_query  = gr.Textbox(label="sqlite SQL")
        sql_exec = gr.Button("執行 SQL")
        sql_result = gr.DataFrame(label="查詢結果")
        sql_exec.click(execute_sqlite, inputs=[sql_db, sql_query], outputs=sql_result)

# 啟動 Gradio 應用程式
if __name__ == "__main__":
    demo.launch(mcp_server=True, server_name="0.0.0.0", allowed_paths=["/"], share=True)