| | import os |
| | import gradio as gr |
| | import json |
| | from typing import List, Dict, Any, Optional, Tuple |
| | import logging |
| |
|
| | try: |
| | |
| | from langchain_community.agent_toolkits import create_sql_agent |
| | from langchain_community.utilities import SQLDatabase |
| | from langchain_google_genai import ChatGoogleGenerativeAI |
| | from langchain.agents.agent_types import AgentType |
| | import pymysql |
| | from dotenv import load_dotenv |
| | |
| | DEPENDENCIES_AVAILABLE = True |
| | except ImportError: |
| | |
| | DEPENDENCIES_AVAILABLE = False |
| |
|
| | |
| | logging.basicConfig(level=logging.INFO) |
| | logger = logging.getLogger(__name__) |
| |
|
| | |
| | logging.basicConfig(level=logging.INFO) |
| | logger = logging.getLogger(__name__) |
| |
|
| | def check_environment(): |
| | """Verifica si el entorno está configurado correctamente.""" |
| | if not DEPENDENCIES_AVAILABLE: |
| | return False, "Missing required Python packages. Please install them with: pip install -r requirements.txt" |
| | |
| | |
| | required_vars = ["DB_USER", "DB_PASSWORD", "DB_HOST", "DB_NAME", "GOOGLE_API_KEY"] |
| | missing_vars = [var for var in required_vars if not os.getenv(var)] |
| | |
| | if missing_vars: |
| | return False, f"Missing required environment variables: {', '.join(missing_vars)}" |
| | |
| | return True, "Environment is properly configured" |
| |
|
| | def setup_database_connection(): |
| | """Intenta establecer una conexión a la base de datos.""" |
| | if not DEPENDENCIES_AVAILABLE: |
| | return None, "Dependencies not available" |
| | |
| | try: |
| | load_dotenv(override=True) |
| | |
| | db_user = os.getenv("DB_USER") |
| | db_password = os.getenv("DB_PASSWORD") |
| | db_host = os.getenv("DB_HOST") |
| | db_name = os.getenv("DB_NAME") |
| | |
| | if not all([db_user, db_password, db_host, db_name]): |
| | return None, "Missing database configuration" |
| | |
| | logger.info(f"Connecting to database: {db_user}@{db_host}/{db_name}") |
| | |
| | |
| | connection = pymysql.connect( |
| | host=db_host, |
| | user=db_user, |
| | password=db_password, |
| | database=db_name, |
| | connect_timeout=5, |
| | cursorclass=pymysql.cursors.DictCursor |
| | ) |
| | connection.close() |
| | |
| | |
| | db_uri = f"mysql+pymysql://{db_user}:{db_password}@{db_host}/{db_name}" |
| | logger.info("Database connection successful") |
| | return SQLDatabase.from_uri(db_uri), "" |
| | |
| | except Exception as e: |
| | error_msg = f"Error connecting to database: {str(e)}" |
| | logger.error(error_msg) |
| | return None, error_msg |
| |
|
| | def initialize_llm(): |
| | """Inicializa el modelo de lenguaje.""" |
| | if not DEPENDENCIES_AVAILABLE: |
| | return None, "Dependencies not available" |
| | |
| | google_api_key = os.getenv("GOOGLE_API_KEY") |
| | if not google_api_key: |
| | return None, "GOOGLE_API_KEY not found in environment variables" |
| | |
| | try: |
| | llm = ChatGoogleGenerativeAI( |
| | model="gemini-2.0-flash", |
| | temperature=0, |
| | google_api_key=google_api_key |
| | ) |
| | logger.info("Google Generative AI initialized successfully") |
| | return llm, "" |
| | except Exception as e: |
| | error_msg = f"Error initializing Google Generative AI: {str(e)}" |
| | logger.error(error_msg) |
| | return None, error_msg |
| |
|
| | def create_agent(): |
| | """Crea el agente SQL si es posible.""" |
| | if not DEPENDENCIES_AVAILABLE: |
| | return None, "Dependencies not available" |
| | |
| | db, db_error = setup_database_connection() |
| | llm, llm_error = initialize_llm() |
| | |
| | if not db or not llm: |
| | error_msg = " | ".join(filter(None, [db_error, llm_error])) |
| | return None, f"Cannot create agent: {error_msg}" |
| | |
| | try: |
| | logger.info("Creating SQL agent...") |
| | agent = create_sql_agent( |
| | llm=llm, |
| | db=db, |
| | agent_type=AgentType.OPENAI_FUNCTIONS, |
| | verbose=True |
| | ) |
| | logger.info("SQL agent created successfully") |
| | return agent, "" |
| | except Exception as e: |
| | error_msg = f"Error creating SQL agent: {str(e)}" |
| | logger.error(error_msg) |
| | return None, error_msg |
| |
|
| | |
| | agent, agent_error = create_agent() |
| | db_connected = agent is not None |
| |
|
| | def extract_sql_query(text): |
| | """Extrae consultas SQL del texto usando expresiones regulares.""" |
| | if not text: |
| | return None |
| | |
| | |
| | sql_match = re.search(r'```(?:sql)?\s*(.*?)```', text, re.DOTALL) |
| | if sql_match: |
| | return sql_match.group(1).strip() |
| | |
| | |
| | sql_match = re.search(r'(SELECT|INSERT|UPDATE|DELETE|CREATE|ALTER|DROP|TRUNCATE).*?;', text, re.IGNORECASE | re.DOTALL) |
| | if sql_match: |
| | return sql_match.group(0).strip() |
| | |
| | return None |
| |
|
| | def execute_sql_query(query, db_connection): |
| | """Ejecuta una consulta SQL y devuelve los resultados como una cadena.""" |
| | if not db_connection: |
| | return "Error: No hay conexión a la base de datos" |
| | |
| | try: |
| | with db_connection._engine.connect() as connection: |
| | result = connection.execute(query) |
| | rows = result.fetchall() |
| | |
| | |
| | if not rows: |
| | return "La consulta no devolvió resultados" |
| | |
| | |
| | if len(rows) == 1 and len(rows[0]) == 1: |
| | return str(rows[0][0]) |
| | |
| | |
| | try: |
| | import pandas as pd |
| | df = pd.DataFrame(rows) |
| | return df.to_markdown(index=False) |
| | except ImportError: |
| | |
| | return "\n".join([str(row) for row in rows]) |
| | |
| | except Exception as e: |
| | return f"Error ejecutando la consulta: {str(e)}" |
| |
|
| | def generate_plot(data, x_col, y_col, title, x_label, y_label): |
| | """Generate a plot from data and return the file path.""" |
| | plt.figure(figsize=(10, 6)) |
| | plt.bar(data[x_col], data[y_col]) |
| | plt.title(title) |
| | plt.xlabel(x_label) |
| | plt.ylabel(y_label) |
| | plt.xticks(rotation=45) |
| | plt.tight_layout() |
| | |
| | |
| | temp_dir = tempfile.mkdtemp() |
| | plot_path = os.path.join(temp_dir, "plot.png") |
| | plt.savefig(plot_path) |
| | plt.close() |
| | |
| | return plot_path |
| |
|
| | async def stream_agent_response(question: str, chat_history: List) -> Tuple[List, Dict]: |
| | """Procesa la pregunta del usuario y devuelve la respuesta del agente.""" |
| | if not agent: |
| | error_msg = ( |
| | "## ⚠️ Error: Agente no inicializado\n\n" |
| | "No se pudo inicializar el agente de base de datos. Por favor, verifica que:\n" |
| | "1. Todas las variables de entorno estén configuradas correctamente\n" |
| | "2. La base de datos esté accesible\n" |
| | f"3. El modelo de lenguaje esté disponible\n\n" |
| | f"Error: {agent_error}" |
| | ) |
| | chat_history = chat_history + [[question, error_msg]] |
| | yield chat_history, gr.update(visible=False) |
| | return |
| | |
| | try: |
| | |
| | chat_history = chat_history + [[question, None]] |
| | yield chat_history, gr.update(visible=False) |
| | |
| | |
| | response = await agent.ainvoke({"input": question, "chat_history": chat_history[:-1]}) |
| | |
| | |
| | if hasattr(response, 'output'): |
| | response_text = response.output |
| | |
| | |
| | sql_query = extract_sql_query(response_text) |
| | if sql_query: |
| | |
| | db_connection, _ = setup_database_connection() |
| | query_result = execute_sql_query(sql_query, db_connection) |
| | response_text += f"\n\n### 🔍 Resultado de la consulta:\n```sql\n{sql_query}\n```\n\n{query_result}" |
| | else: |
| | response_text = "Error: No se recibió respuesta del agente." |
| | |
| | |
| | chat_history[-1][1] = response_text |
| | yield chat_history, gr.update(visible=False) |
| | |
| | except Exception as e: |
| | error_msg = f"## ❌ Error\n\nOcurrió un error al procesar tu solicitud:\n\n```\n{str(e)}\n```" |
| | chat_history[-1][1] = error_msg |
| | yield chat_history, gr.update(visible=False) |
| |
|
| | |
| | custom_css = """ |
| | .gradio-container { |
| | max-width: 1200px !important; |
| | margin: 0 auto !important; |
| | font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Oxygen, Ubuntu, Cantarell, sans-serif; |
| | } |
| | |
| | #chatbot { |
| | min-height: 500px; |
| | border: 1px solid #e0e0e0; |
| | border-radius: 8px; |
| | margin-bottom: 20px; |
| | padding: 20px; |
| | background-color: #f9f9f9; |
| | } |
| | |
| | .user-message, .bot-message { |
| | padding: 12px 16px; |
| | border-radius: 18px; |
| | margin: 8px 0; |
| | max-width: 80%; |
| | line-height: 1.5; |
| | } |
| | |
| | .user-message { |
| | background-color: #007bff; |
| | color: white; |
| | margin-left: auto; |
| | border-bottom-right-radius: 4px; |
| | } |
| | |
| | .bot-message { |
| | background-color: #f1f1f1; |
| | color: #333; |
| | margin-right: auto; |
| | border-bottom-left-radius: 4px; |
| | } |
| | |
| | #question-input textarea { |
| | min-height: 50px !important; |
| | border-radius: 8px !important; |
| | padding: 12px !important; |
| | font-size: 16px !important; |
| | } |
| | |
| | #send-button { |
| | height: 100%; |
| | background-color: #007bff !important; |
| | color: white !important; |
| | border: none !important; |
| | border-radius: 8px !important; |
| | font-weight: 500 !important; |
| | transition: background-color 0.2s !important; |
| | } |
| | |
| | #send-button:hover { |
| | background-color: #0056b3 !important; |
| | } |
| | |
| | .status-message { |
| | text-align: center; |
| | color: #666; |
| | font-style: italic; |
| | margin: 10px 0; |
| | } |
| | """ |
| |
|
| | def create_ui(): |
| | """Crea y devuelve los componentes de la interfaz de usuario de Gradio.""" |
| | |
| | env_ok, env_message = check_environment() |
| | |
| | |
| | theme = gr.themes.Soft( |
| | primary_hue="blue", |
| | secondary_hue="indigo", |
| | neutral_hue="slate" |
| | ) |
| | |
| | with gr.Blocks( |
| | css=custom_css, |
| | title="Asistente de Base de Datos SQL", |
| | theme=theme |
| | ) as demo: |
| | |
| | gr.Markdown(""" |
| | # 🤖 Asistente de Base de Datos SQL |
| | |
| | Haz preguntas en lenguaje natural sobre tu base de datos y obtén resultados de consultas SQL. |
| | """) |
| | |
| | |
| | if not env_ok: |
| | gr.Warning("⚠️ " + env_message) |
| | |
| | with gr.Accordion("ℹ️ Estado del sistema", open=not env_ok): |
| | if not DEPENDENCIES_AVAILABLE: |
| | gr.Markdown(""" |
| | ## ❌ Dependencias faltantes |
| | |
| | Para ejecutar esta aplicación localmente, necesitas instalar las dependencias: |
| | |
| | ```bash |
| | pip install -r requirements.txt |
| | ``` |
| | """) |
| | else: |
| | if not agent: |
| | gr.Markdown(f""" |
| | ## ⚠️ Configuración incompleta |
| | |
| | No se pudo inicializar el agente de base de datos. Por favor, verifica que: |
| | |
| | 1. Todas las variables de entorno estén configuradas correctamente |
| | 2. La base de datos esté accesible |
| | 3. La API de Google Gemini esté configurada |
| | |
| | **Error:** {agent_error if agent_error else 'No se pudo determinar el error'} |
| | |
| | ### Configuración local |
| | |
| | Crea un archivo `.env` en la raíz del proyecto con las siguientes variables: |
| | |
| | ``` |
| | DB_USER=tu_usuario |
| | DB_PASSWORD=tu_contraseña |
| | DB_HOST=tu_servidor |
| | DB_NAME=tu_base_de_datos |
| | GOOGLE_API_KEY=tu_api_key_de_google |
| | ``` |
| | """) |
| | else: |
| | gr.Markdown(""" |
| | ## ✅ Sistema listo |
| | |
| | El asistente está listo para responder tus preguntas sobre la base de datos. |
| | """) |
| | |
| | |
| | chatbot = gr.Chatbot( |
| | elem_id="chatbot", |
| | label="Chat", |
| | height=500, |
| | avatar_images=( |
| | "https://i.imgur.com/8O1mCJx.png", |
| | "https://i.imgur.com/7I12Ybh.png" |
| | ), |
| | render_markdown=True, |
| | show_copy_button=True, |
| | show_share_button=True |
| | ) |
| | |
| | |
| | with gr.Row(): |
| | question_input = gr.Textbox( |
| | label="", |
| | placeholder="Escribe tu pregunta sobre la base de datos...", |
| | elem_id="question-input", |
| | container=False, |
| | scale=5, |
| | min_width=300, |
| | max_lines=3, |
| | autofocus=True |
| | ) |
| | submit_button = gr.Button( |
| | "Enviar", |
| | elem_id="send-button", |
| | min_width=100, |
| | scale=1, |
| | variant="primary" |
| | ) |
| | |
| | |
| | with gr.Accordion("🔍 Información de depuración", open=False): |
| | gr.Markdown(""" |
| | ### Estado del sistema |
| | - **Base de datos**: {} |
| | - **Modelo**: {} |
| | - **Modo**: {} |
| | """.format( |
| | f"Conectado a {os.getenv('DB_HOST')}/{os.getenv('DB_NAME')}" if db_connected else "No conectado", |
| | "gemini-2.0-flash" if agent else "No disponible", |
| | "Completo" if agent else "Demo (sin conexión a base de datos)" |
| | )) |
| | |
| | |
| | if os.getenv("SHOW_ENV_DEBUG", "false").lower() == "true": |
| | env_vars = {k: "***" if "PASS" in k or "KEY" in k else v |
| | for k, v in os.environ.items() |
| | if k.startswith(('DB_', 'GOOGLE_'))} |
| | gr.Code( |
| | json.dumps(env_vars, indent=2, ensure_ascii=False), |
| | language="json", |
| | label="Variables de entorno" |
| | ) |
| | |
| | |
| | streaming_output_display = gr.Textbox(visible=False) |
| | |
| | return demo, chatbot, question_input, submit_button, streaming_output_display |
| |
|
| | |
| | demo, chatbot, question_input, submit_button, streaming_output_display = create_ui() |
| |
|
| | def user_message(user_input: str, chat_history: List) -> Tuple[str, List]: |
| | """Add user message to chat history and clear input.""" |
| | if not user_input.strip(): |
| | return "", chat_history |
| | logger.info(f"User message: {user_input}") |
| | return "", chat_history + [[user_input, None]] |
| |
|
| | def bot_response(chat_history: List) -> Tuple[List, Dict]: |
| | """Get bot response and update chat history.""" |
| | if not chat_history or not chat_history[-1][0]: |
| | return chat_history, gr.update(visible=False) |
| | |
| | question = chat_history[-1][0] |
| | logger.info(f"Processing question: {question}") |
| | return stream_agent_response(question, chat_history[:-1]) |
| |
|
| | |
| | submit_click = submit_button.click( |
| | fn=user_message, |
| | inputs=[question_input, chatbot], |
| | outputs=[question_input, chatbot], |
| | queue=True |
| | ).then( |
| | fn=bot_response, |
| | inputs=[chatbot], |
| | outputs=[chatbot, streaming_output_display], |
| | api_name="ask" |
| | ) |
| |
|
| | question_input.submit( |
| | fn=user_message, |
| | inputs=[question_input, chatbot], |
| | outputs=[question_input, chatbot], |
| | queue=True |
| | ).then( |
| | fn=bot_response, |
| | inputs=[chatbot], |
| | outputs=[chatbot, streaming_output_display] |
| | ) |
| |
|
| | |
| | def get_app(): |
| | """Obtiene la instancia de la aplicación Gradio para Hugging Face Spaces.""" |
| | |
| | if os.getenv('SPACE_ID'): |
| | |
| | demo.title = "🤖 Asistente de Base de Datos SQL (Demo)" |
| | demo.description = """ |
| | Este es un demo del asistente de base de datos SQL. |
| | Para usar la versión completa con conexión a base de datos, clona este espacio y configura las variables de entorno. |
| | """ |
| | |
| | return demo |
| |
|
| | |
| | if __name__ == "__main__": |
| | |
| | demo.queue(concurrency_count=5).launch( |
| | server_name="0.0.0.0", |
| | server_port=7860, |
| | debug=True, |
| | share=False, |
| | show_api=True, |
| | favicon_path=None, |
| | show_error=True, |
| | show_tips=True |
| | ) |
| |
|