import gradio as gr import pandas as pd from sqlalchemy import create_engine, text import pymysql # 数据库连接配置 DB_CONFIG = { 'host': 'rm-j6c5yhe0l739e7752vo.mysql.cnhk.rds.aliyuncs.com', 'user': 'report_user', 'password': 'report_user_123', 'database': 'easy_financial_report' } def get_database_url(): """构造数据库连接URL""" return f"mysql+pymysql://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DB_CONFIG['host']}/{DB_CONFIG['database']}" def connect_to_database(): """创建数据库连接引擎""" try: engine = create_engine(get_database_url()) return engine except Exception as e: print(f"数据库连接失败: {e}") return None def execute_query(query): """执行SQL查询并返回结果""" if not query.strip(): return "请输入SQL查询语句" try: # 创建数据库连接引擎 engine = create_engine(get_database_url()) # 执行查询并返回DataFrame df = pd.read_sql_query(query, engine) engine.dispose() return df except Exception as e: return f"查询执行失败: {str(e)}" def get_table_names(): """获取数据库中的所有表名""" try: # 创建数据库连接引擎 engine = create_engine(get_database_url()) # 查询所有表名 query = "SHOW TABLES" df = pd.read_sql_query(query, engine) engine.dispose() # 返回表名列表 return df.iloc[:, 0].tolist() if not df.empty else [] except Exception as e: return [f"获取表名失败: {str(e)}"] def preview_table(table_name): """预览表的前几行数据""" if not table_name or "失败" in table_name: return "请选择有效的表名" query = f"SELECT * FROM {table_name} LIMIT 10" return execute_query(query) # 新增功能函数 def insert_record(title): """向report_file_link表插入新记录""" if not title.strip(): return "请输入标题" try: engine = create_engine(get_database_url()) # 插入新记录 query = "INSERT INTO report_file_link (title) VALUES (:title)" with engine.connect() as conn: conn.execute(text(query), {"title": title}) conn.commit() engine.dispose() return f"成功插入记录: {title}" except Exception as e: return f"插入记录失败: {str(e)}" def update_record(record_id, new_title): """更新report_file_link表中的记录""" if not record_id or not new_title.strip(): return "请输入记录ID和新标题" try: engine = create_engine(get_database_url()) # 更新记录 query = "UPDATE report_file_link SET title = :title WHERE id = :id" with engine.connect() as conn: result = conn.execute(text(query), {"title": new_title, "id": record_id}) conn.commit() engine.dispose() if result.rowcount > 0: return f"成功更新记录ID {record_id} 的标题为: {new_title}" else: return f"未找到ID为 {record_id} 的记录" except Exception as e: return f"更新记录失败: {str(e)}" def delete_record(record_id): """从report_file_link表中删除记录""" if not record_id: return "请输入记录ID" try: engine = create_engine(get_database_url()) # 删除记录 query = "DELETE FROM report_file_link WHERE id = :id" with engine.connect() as conn: result = conn.execute(text(query), {"id": record_id}) conn.commit() engine.dispose() if result.rowcount > 0: return f"成功删除ID为 {record_id} 的记录" else: return f"未找到ID为 {record_id} 的记录" except Exception as e: return f"删除记录失败: {str(e)}" def refresh_report_file_link(): """刷新report_file_link表的数据""" return execute_query("SELECT * FROM report_file_link") # 创建Gradio界面 with gr.Blocks(title="MySQL数据库查询工具") as demo: gr.Markdown("# MySQL数据库查询工具") gr.Markdown("连接到阿里云RDS MySQL数据库") with gr.Tab("SQL查询"): with gr.Row(): with gr.Column(): query_input = gr.Textbox( label="SQL查询语句", placeholder="请输入SQL查询语句,例如:SELECT * FROM table_name LIMIT 10", lines=5 ) query_btn = gr.Button("执行查询") with gr.Column(): query_output = gr.Dataframe(label="查询结果") query_btn.click( fn=execute_query, inputs=query_input, outputs=query_output ) with gr.Tab("表浏览"): with gr.Row(): with gr.Column(): table_dropdown = gr.Dropdown( choices=get_table_names(), label="选择表" ) refresh_btn = gr.Button("刷新表列表") preview_btn = gr.Button("预览表数据") with gr.Column(): table_output = gr.Dataframe(label="表数据预览") refresh_btn.click( fn=get_table_names, inputs=[], outputs=table_dropdown ) preview_btn.click( fn=preview_table, inputs=table_dropdown, outputs=table_output ) with gr.Tab("report_file_link管理"): gr.Markdown("## report_file_link表管理") with gr.Row(): with gr.Column(): # 插入功能 gr.Markdown("### 插入新记录") insert_title = gr.Textbox(label="标题") insert_btn = gr.Button("插入记录") insert_result = gr.Textbox(label="插入结果") # 更新功能 gr.Markdown("### 更新记录") update_id = gr.Textbox(label="记录ID") update_title = gr.Textbox(label="新标题") update_btn = gr.Button("更新记录") update_result = gr.Textbox(label="更新结果") # 删除功能 gr.Markdown("### 删除记录") delete_id = gr.Textbox(label="记录ID") delete_btn = gr.Button("删除记录") delete_result = gr.Textbox(label="删除结果") with gr.Column(): # 显示当前数据 gr.Markdown("### 当前数据") refresh_data_btn = gr.Button("刷新数据") data_output = gr.Dataframe(label="report_file_link表数据") # 设置按钮点击事件 insert_btn.click( fn=insert_record, inputs=insert_title, outputs=insert_result ) update_btn.click( fn=update_record, inputs=[update_id, update_title], outputs=update_result ) delete_btn.click( fn=delete_record, inputs=delete_id, outputs=delete_result ) refresh_data_btn.click( fn=refresh_report_file_link, inputs=[], outputs=data_output ) # 页面加载时自动显示数据 demo.load( fn=refresh_report_file_link, inputs=[], outputs=data_output ) if __name__ == "__main__": # demo.launch(server_name="0.0.0.0", server_port=7861) demo.launch(ssr_mode=False, share=True)