Spaces:
Build error
Build error
| import os | |
| import time | |
| import json | |
| import pandas as pd | |
| import numpy as np | |
| from sqlalchemy import create_engine | |
| from langchain_openai import ChatOpenAI | |
| from langchain_community.agent_toolkits import create_sql_agent | |
| from langchain_community.utilities import SQLDatabase | |
| from huggingface_hub import InferenceClient | |
| import gradio as gr | |
| from dotenv import load_dotenv | |
| import logging | |
| load_dotenv() | |
| JSON_FILE_PATH = "model_training_part1.json" | |
| SQL_DB_PATH = "data.db" | |
| HF_API_KEY = os.getenv("HF_API_KEY") | |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
| LLAMA_MODEL = "meta-llama/Llama-3.3-70B-Instruct" | |
| LLAMA_MODEL_ALTERNATIVO = "meta-llama/Llama-3.1-8B-Instruct" | |
| hf_client = InferenceClient(provider="together", api_key=HF_API_KEY) | |
| os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY | |
| query_cache = {} | |
| history_log = [] | |
| recent_history = [] | |
| show_history_flag = False | |
| advanced_mode_enabled = False | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| def create_or_load_sql_database(json_path, sql_db_path): | |
| if os.path.exists(sql_db_path): | |
| print("Banco de dados SQL já existe. Carregando...") | |
| return create_engine(f"sqlite:///{sql_db_path}") | |
| else: | |
| print("Banco de dados SQL não encontrado. Criando...") | |
| engine = create_engine(f"sqlite:///{sql_db_path}") | |
| with open(json_path, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| df = pd.DataFrame(data) | |
| df.to_sql("perguntas_respostas", engine, index=False, if_exists="replace") | |
| print("Banco de dados SQL criado com sucesso!") | |
| return engine | |
| engine = create_or_load_sql_database(JSON_FILE_PATH, SQL_DB_PATH) | |
| db = SQLDatabase(engine=engine) | |
| llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.2) | |
| sql_agent = create_sql_agent(llm, db=db, agent_type="openai-tools", verbose=True, max_iterations=50, return_intermediate_steps=True) | |
| def generate_initial_context(db_sample): | |
| return ( | |
| f"Você é um assistente que gera queries SQL objetivas e eficientes. Sempre inclua `LIMIT 20` nas queries. Aqui está o banco de dados:\n\n" | |
| f"Exemplos de valores:\n{db_sample.head(3).to_string(index=False)}\n\n" | |
| "**📌 Estrutura do Banco de Dados:**\n" | |
| "A base de dados contém informações sobre perguntas e respostas relacionadas à reforma tributária, bem como artigos e leis tributárias.\n\n" | |
| "**📍 Colunas e seus Significados:**\n" | |
| "- `instruction` → Pergunta do usuário\n" | |
| "- `response` → Resposta correspondente à pergunta\n" | |
| "- `text` → Explicação detalhada da resposta\n" | |
| "- `title` → Título da lei\n" | |
| "- `chapter` → Capítulo da lei\n" | |
| "- `section` → Seção da lei\n" | |
| "- `content` → Artigo e texto completo da lei\n\n" | |
| "**📌 Importante:**\n" | |
| "- Nem todas as colunas estarão preenchidas em todos os registros. Por exemplo, leis não têm `instruction` ou `response`, enquanto perguntas não têm `title`, `chapter`, `section` ou `content`.\n" | |
| "- Ao buscar perguntas e respostas, utilize as colunas `instruction` e `response`.\n" | |
| "- Ao buscar informações sobre leis, use `title`, `chapter`, `section` e `content`.\n\n" | |
| "⚠️ **ATENÇÃO**: As palavras devem ser escritas com acento, como aparecem na base de dados. Exemplo correto: `reforma tributária`, `isenção`, `crédito`. Nunca use formas sem acento como `tributaria`, `isencao` etc.\n\n" | |
| "**📝 Exemplo de Queries SQL Bem Formadas:**\n" | |
| "1️⃣ **Buscar por uma informação relacionada à pergunta do usuário, esteja ela em uma resposta direta ou em um artigo de lei:**\n" | |
| "```sql\n" | |
| "SELECT * FROM perguntas_respostas \n" | |
| "WHERE instruction LIKE '%isenção%' \n" | |
| " OR response LIKE '%isenção%' \n" | |
| " OR content LIKE '%isenção%' \n" | |
| "LIMIT 20;\n" | |
| "```\n\n" | |
| "2️⃣ **Buscar especificamente por um artigo ou trecho de lei que mencione um termo legal:**\n" | |
| "```sql\n" | |
| "SELECT content FROM perguntas_respostas \n" | |
| "WHERE content LIKE '%Art. 146%' \n" | |
| "LIMIT 20;\n" | |
| "```\n\n" | |
| "**🔎 Instruções para a Geração da Query SQL:**\n" | |
| "- Sempre que possível, busque em todas as colunas relevantes: `instruction`, `response` e `content`, para maximizar a chance de encontrar uma resposta útil.\n" | |
| "- Use `LIKE` para buscas aproximadas com palavras-chave do usuário.\n\n" | |
| "**🔄 Formato de Saída Esperado:**\n" | |
| "Pergunta: <pergunta do usuário>\n" | |
| "Opção de Query SQL:\n<query SQL>\n" | |
| "Idioma: Português" | |
| ) | |
| def is_greeting(user_query): | |
| greetings = ["olá", "oi", "bom dia", "boa tarde", "boa noite", "oi, tudo bem?"] | |
| return user_query.lower().strip() in greetings | |
| def query_with_llama(user_query, db_sample): | |
| initial_context = generate_initial_context(db_sample) | |
| formatted_history = "\n".join([ | |
| f"{msg['role'].capitalize()}: {msg['content']}" for msg in recent_history[-2:] | |
| ]) | |
| full_prompt = f"{initial_context}\n\nHistórico recente:\n{formatted_history}\n\nPergunta do usuário:\n{user_query}" | |
| logging.info(f"[DEBUG] Contexto enviado ao Llama:\n{full_prompt}\n") | |
| start_time = time.time() | |
| try: | |
| response = hf_client.chat.completions.create( | |
| model=LLAMA_MODEL, | |
| messages=[{"role": "system", "content": full_prompt}], | |
| max_tokens=900, | |
| stream=False | |
| ) | |
| llama_response = response["choices"][0]["message"]["content"] | |
| end_time = time.time() | |
| logging.info(f"[DEBUG] Resposta do Llama para o Agent SQL:\n{llama_response.strip()}\n[Tempo de execução: {end_time - start_time:.2f}s]\n") | |
| return llama_response.strip() | |
| except Exception as e: | |
| logging.error(f"[ERRO] Falha ao interagir com o Llama: {e}") | |
| return None | |
| def refine_response_with_llm(user_question, sql_response): | |
| prompt = ( | |
| f"Pergunta do usuário:\n{user_question}\n\n" | |
| f"Resposta gerada pelo agente SQL:\n{sql_response}\n\n" | |
| "Sua tarefa é refinar, complementar e melhorar a resposta." | |
| ) | |
| logging.info(f"[DEBUG] Prompt enviado ao modelo de refinamento:\n{prompt}\n") | |
| try: | |
| response = hf_client.chat.completions.create( | |
| model=LLAMA_MODEL, | |
| messages=[{"role": "system", "content": prompt}], | |
| max_tokens=1200, | |
| stream=False | |
| ) | |
| improved_response = response["choices"][0]["message"]["content"] | |
| logging.info(f"[DEBUG] Resposta do modelo de refinamento:\n{improved_response}\n") | |
| return improved_response | |
| except Exception as e: | |
| logging.error(f"[ERRO] Falha ao refinar resposta com LLM: {e}") | |
| return sql_response | |
| def query_sql_agent(user_query): | |
| try: | |
| if user_query in query_cache: | |
| print(f"[CACHE] Retornando resposta do cache para a consulta: {user_query}") | |
| return query_cache[user_query] | |
| if is_greeting(user_query): | |
| greeting_response = "Olá! Estou aqui para ajudar com suas consultas. Pergunte algo relacionado aos dados carregados no agente!" | |
| query_cache[user_query] = greeting_response | |
| return greeting_response | |
| column_data = pd.read_sql_query("SELECT * FROM perguntas_respostas LIMIT 10", engine) | |
| llama_instruction = query_with_llama(user_query, column_data) | |
| if not llama_instruction: | |
| return "Erro: O modelo Llama não conseguiu gerar uma instrução válida." | |
| print("------- Agent SQL: Executando query -------") | |
| response = sql_agent.invoke({"input": llama_instruction}) | |
| sql_response = response.get("output", "Erro ao obter a resposta do agente.") | |
| if advanced_mode_enabled: | |
| sql_response = refine_response_with_llm(user_query, sql_response) | |
| query_cache[user_query] = sql_response | |
| return sql_response | |
| except Exception as e: | |
| return f"Erro ao consultar o agente SQL: {e}" | |
| def chatbot_response(user_input): | |
| start_time = time.time() | |
| response = query_sql_agent(user_input) | |
| end_time = time.time() | |
| history_log.append({"Pergunta": user_input, "Resposta": response, "Tempo de Resposta (s)": round(end_time - start_time, 2)}) | |
| recent_history.append({"role": "user", "content": user_input}) | |
| recent_history.append({"role": "assistant", "content": response}) | |
| if len(recent_history) > 4: | |
| recent_history.pop(0) | |
| recent_history.pop(0) | |
| return response | |
| def toggle_history(): | |
| global show_history_flag | |
| show_history_flag = not show_history_flag | |
| return history_log if show_history_flag else {} | |
| def toggle_advanced_mode(state): | |
| global advanced_mode_enabled | |
| advanced_mode_enabled = state | |
| logging.info(f"[MODO AVANÇADO] {'Ativado' if state else 'Desativado'}") | |
| return "Modo avançado ativado." if state else "Modo avançado desativado." | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# Tributario Agent") | |
| chatbot = gr.Chatbot(height=600) | |
| msg = gr.Textbox(placeholder="Digite sua pergunta aqui...", label=" ", lines=1) | |
| def respond(message, chat_history): | |
| response = chatbot_response(message) | |
| chat_history.append((message, response)) | |
| return "", chat_history | |
| with gr.Row(): | |
| btn = gr.Button("Enviar", variant="primary") | |
| history_btn = gr.Button("Histórico", variant="secondary") | |
| advanced_toggle = gr.Checkbox(label="Refinar Resposta", value=False) | |
| msg.submit(respond, [msg, chatbot], [msg, chatbot]) | |
| btn.click(respond, [msg, chatbot], [msg, chatbot]) | |
| advanced_toggle.change(toggle_advanced_mode, inputs=[advanced_toggle], outputs=[]) | |
| history_output = gr.JSON() | |
| history_btn.click(toggle_history, outputs=history_output) | |
| if __name__ == "__main__": | |
| demo.launch(share=False) |