Spaces:
Running
Running
| import gradio as gr | |
| import pandas as pd | |
| import sys | |
| import os | |
| import json | |
| from utils.tools import get_kb, get_schema, get_tables, get_meaning, execute_sqlite_query, search_meaning | |
| def get_message(msg): | |
| return {"status": "error", "headers":['message'], "data": [[f"{msg}"]]} | |
| def get_all_databases() -> list: | |
| """ | |
| Get all available database names from the schema file. | |
| This function reads the database schema CSV file and extracts unique database names. | |
| Returns: | |
| list: A sorted list of unique database names available in the system. | |
| Example: | |
| >>> databases = get_all_databases() | |
| >>> print(databases) | |
| ['db1', 'db2', 'db3'] | |
| """ | |
| # 從 schema_df 中獲取所有唯一的 db_name | |
| schema_df = pd.read_csv("./data/db_schema.csv") | |
| return sorted(schema_df['db_name'].unique().tolist()) | |
| def kb_query(db_name, knowledge_keyword): | |
| """ | |
| Query the knowledge base for a specific database with optional keyword filtering. | |
| This function retrieves knowledge base information for a specified database. | |
| If keywords are provided, it filters the results based on those keywords. | |
| Multiple keywords can be separated by commas, 'or', or spaces. | |
| Args: | |
| db_name (str): The name of the database to query. Must not be empty. | |
| knowledge_keyword (str): Optional keywords to filter knowledge base results. | |
| Multiple keywords can be separated by commas, 'or', or spaces. | |
| If empty or None, returns all knowledge for the database. | |
| Returns: | |
| dict: Dictionary containing query results with structure: | |
| - Success: {"status": "success", "headers": [...], "data": [...]} | |
| - Error: {"status": "error", "headers": ['message'], "data": [['Error message.']]} | |
| Example: | |
| >>> result = kb_query("sales_db", "customer, order") | |
| >>> print(result) | |
| # Returns dict with customer and order-related knowledge from sales_db | |
| """ | |
| if not db_name: | |
| return get_message("Please select db_name.") | |
| if not knowledge_keyword: | |
| result = get_kb(db_name) | |
| else: | |
| # Split keywords by comma, 'or', or space | |
| keywords = [k.strip() for k in knowledge_keyword.replace('or', ',').replace(' ', ',').split(',') if k.strip()] | |
| # Get results for each keyword and combine them | |
| combined_results = [] | |
| for keyword in keywords: | |
| keyword_result = get_kb(db_name, keyword) | |
| # 使用 append 而不是 extend,因為 get_kb 返回 DataFrame | |
| if not keyword_result.empty: | |
| combined_results.append(keyword_result) | |
| # Remove duplicates if any | |
| # Convert DataFrames to a single DataFrame and remove duplicate rows | |
| if combined_results: | |
| result = pd.concat(combined_results, ignore_index=True).drop_duplicates() | |
| else: | |
| result = pd.DataFrame() | |
| if len(result) == 0: | |
| return get_message("Not Found.") | |
| # Convert DataFrame to JSON format with columns and rows | |
| data = { | |
| "columns": result.columns.tolist(), | |
| "rows": result.values.tolist() | |
| } | |
| return {"status": "success", "headers": result.columns.tolist(), "data":result.values.tolist()} | |
| # return json.dumps( | |
| # {"status": "success", "data": data}, | |
| # ensure_ascii=False, | |
| # indent=2 | |
| # ) | |
| def schema_query(db_name, table_name): | |
| """ | |
| Query the schema structure for a specific table in a database. | |
| This function retrieves detailed schema information for a specified table | |
| within a given database, including column definitions, data types, and constraints. | |
| Args: | |
| db_name (str): The name of the database containing the table. Must not be empty. | |
| table_name (str): The name of the table to query schema for. Must not be empty. | |
| Returns: | |
| dict: Dictionary containing query results with structure: | |
| - Success: {"status": "success", "headers": [...], "data": [...]} | |
| - Error: {"status": "error", "headers": ['message'], "data": [['Error message.']]} | |
| Example: | |
| >>> result = schema_query("sales_db", "customers") | |
| >>> print(result) | |
| # Returns dict with column definitions for customers table | |
| """ | |
| if not db_name or not table_name: | |
| return get_message("請選擇資料庫和資料表") | |
| # 取得 schema 資訊 | |
| schema_result = get_schema(db_name, table_name) | |
| if len(schema_result) == 0: | |
| return get_message("Not Found.") | |
| return { | |
| "status": "success", | |
| "headers": schema_result.columns.tolist(), | |
| "data": schema_result.values.tolist() | |
| } | |
| def tables_query(db_name): | |
| """ | |
| Get list of all tables available in a specific database. | |
| This function retrieves all table names that exist within the specified database. | |
| Args: | |
| db_name (str): The name of the database to query tables from. | |
| If empty or None, returns empty list. | |
| Returns: | |
| list: List of table names in the specified database. Returns empty list | |
| if database name is not provided or no tables found. | |
| Example: | |
| >>> tables = tables_query("sales_db") | |
| >>> print(tables) | |
| ['customers', 'orders', 'products', 'inventory'] | |
| """ | |
| if not db_name: | |
| return [] | |
| return get_tables(db_name) | |
| def meaning_query(db_name, table_name): | |
| """ | |
| Query the meaning and description of columns in a specific table. | |
| This function retrieves detailed explanations and meanings for each column | |
| in the specified table, helping users understand the purpose and content | |
| of each field. | |
| Args: | |
| db_name (str): The name of the database containing the table. Must not be empty. | |
| table_name (str): The name of the table to query column meanings for. Must not be empty. | |
| Returns: | |
| dict: Dictionary containing query results with structure: | |
| - Success: {"status": "success", "headers": [...], "data": [...]} | |
| - Error: {"status": "error", "headers": ['message'], "data": [['Error message.']]} | |
| Example: | |
| >>> result = meaning_query("sales_db", "customers") | |
| >>> print(result) | |
| # Returns dict with explanations for each column in customers table | |
| """ | |
| if not db_name or not table_name: | |
| return get_message("請選擇資料庫和資料表") | |
| result = get_meaning(db_name, table_name) | |
| if len(result) == 0: | |
| return get_message("Not Found.") | |
| return {"status": "success", "headers": result.columns.tolist(), "data": result.values.tolist()} | |
| def meaning_search(db_name, keyword): | |
| """ | |
| Search for column meanings using one or more keywords in a specific database. | |
| This function searches for columns whose names or meanings contain the specified | |
| keyword(s), helping users find relevant fields across all tables in the database. | |
| Multiple keywords can be separated by commas or spaces. | |
| Args: | |
| db_name (str): The name of the database to search in. Must not be empty. | |
| keyword (str): The search keyword(s) to match against column names or meanings. | |
| Multiple keywords can be separated by commas or spaces. Must not be empty. | |
| Returns: | |
| dict: Dictionary containing query results with structure: | |
| - Success: {"status": "success", "headers": [...], "data": [...]} | |
| - Error: {"status": "error", "headers": ['message'], "data": [['Error message.']]} | |
| Example: | |
| >>> result = meaning_search("sales_db", "customer") | |
| >>> print(result) | |
| # Returns dict with all columns containing "customer" in their name or meaning | |
| >>> result = meaning_search("sales_db", "customer, order, product") | |
| >>> print(result) | |
| # Returns dict with all columns containing "customer", "order", or "product" in their name or meaning | |
| """ | |
| if not db_name or not keyword: | |
| return get_message("Please input keyword.") | |
| # Split keywords by comma or space | |
| keywords = [k.strip() for k in keyword.replace(' ', ',').split(',') if k.strip()] | |
| # Get results for each keyword and combine them | |
| combined_results = [] | |
| for kw in keywords: | |
| keyword_result = search_meaning(db_name, kw) | |
| # Append non-empty results | |
| if not keyword_result.empty: | |
| combined_results.append(keyword_result) | |
| # Remove duplicates if any | |
| # Convert DataFrames to a single DataFrame and remove duplicate rows | |
| if combined_results: | |
| result = pd.concat(combined_results, ignore_index=True).drop_duplicates() | |
| else: | |
| result = pd.DataFrame() | |
| if len(result) == 0: | |
| return get_message("Not Found.") | |
| return {"status": "success", "headers": result.columns.tolist(), "data": result.values.tolist()} | |
| def execute_sqlite(db_name: str, query: str): | |
| if not db_name or not query: | |
| return pd.DataFrame({"message": ["請選擇資料庫和SQL"]}) | |
| result = execute_sqlite_query(db_name, query) | |
| # if len(result) == 0: | |
| # return pd.DataFrame({"message": ["沒有找到相關資料"]}) | |
| if result['success']: | |
| # return pd.DataFrame(result['data'], columns=result['columns']) | |
| return pd.DataFrame({"type":["OK"], "message": ["Success"]}) | |
| else: | |
| return pd.DataFrame({"type":["Error"], "message": [result['error']]}) | |
| # 建立 Gradio 界面 | |
| with gr.Blocks(title="資料庫查詢工具") as demo: | |
| gr.Markdown("# 資料庫查詢工具") | |
| gr.Markdown("這個工具可以幫助您查詢資料庫的知識庫、資料表結構和欄位意義。") | |
| # 獲取所有可用的資料庫 | |
| all_dbs = get_all_databases() | |
| with gr.Tab("知識庫查詢"): | |
| with gr.Row(): | |
| kb_db = gr.Dropdown(choices=all_dbs, label="選擇資料庫", value=all_dbs[0] if all_dbs else None) | |
| kb_keyword = gr.Textbox(label="知識關鍵字 (可選)") | |
| kb_search = gr.Button("查詢知識庫") | |
| kb_result = gr.DataFrame(label="查詢結果") | |
| kb_search.click(kb_query, inputs=[kb_db, kb_keyword], outputs=kb_result) | |
| gr.api(get_all_databases) | |
| with gr.Tab("資料表查詢"): | |
| with gr.Row(): | |
| kb_db = gr.Dropdown(choices=all_dbs, label="選擇資料庫", value=all_dbs[0] if all_dbs else None) | |
| kb_search = gr.Button("查詢資料表") | |
| kb_result = gr.DataFrame(label="查詢結果") | |
| kb_search.click(tables_query, inputs=[kb_db], outputs=kb_result) | |
| with gr.Tab("資料表結構查詢"): | |
| with gr.Row(): | |
| schema_db = gr.Dropdown(choices=all_dbs, label="選擇資料庫", value=all_dbs[0] if all_dbs else None) | |
| schema_table = gr.Text(label="選擇資料表") | |
| schema_search = gr.Button("查詢資料表結構") | |
| schema_result = gr.DataFrame(label="查詢結果") | |
| # 當資料庫選擇變更時,更新資料表下拉選單 | |
| # schema_db.change(update_tables, inputs=schema_db, outputs=schema_table) | |
| schema_search.click(schema_query, inputs=[schema_db, schema_table], outputs=schema_result) | |
| with gr.Tab("欄位意義查詢"): | |
| with gr.Row(): | |
| meaning_db = gr.Dropdown(choices=all_dbs, label="選擇資料庫", value=all_dbs[0] if all_dbs else None) | |
| meaning_keyword = gr.Text(label="Keyword") | |
| search_button = gr.Button("查詢欄位意義") | |
| meaning_result = gr.DataFrame(label="查詢結果") | |
| # 當資料庫選擇變更時,更新資料表下拉選單 | |
| # meaning_db.change(update_tables, inputs=meaning_db, outputs=meaning_table) | |
| search_button.click(meaning_search, inputs=[meaning_db, meaning_keyword], outputs=meaning_result) | |
| with gr.Tab("sqlite SQL 執行"): | |
| with gr.Row(): | |
| sql_db = gr.Dropdown(choices=all_dbs, label="選擇資料庫", value=all_dbs[0] if all_dbs else None) | |
| sql_query = gr.Textbox(label="sqlite SQL") | |
| sql_exec = gr.Button("執行 SQL") | |
| sql_result = gr.DataFrame(label="查詢結果") | |
| sql_exec.click(execute_sqlite, inputs=[sql_db, sql_query], outputs=sql_result) | |
| # 啟動 Gradio 應用程式 | |
| if __name__ == "__main__": | |
| demo.launch(mcp_server=True, server_name="0.0.0.0", allowed_paths=["/"], share=True) | |