|
|
import os |
|
|
import sqlite3 |
|
|
import pandas as pd |
|
|
from dotenv import load_dotenv |
|
|
from bs4 import BeautifulSoup |
|
|
import requests |
|
|
import gradio as gr |
|
|
import traceback |
|
|
import tempfile |
|
|
|
|
|
|
|
|
from langchain_community.document_loaders import WebBaseLoader, PyPDFLoader |
|
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
|
from langchain_community.vectorstores import FAISS |
|
|
from langchain_community.embeddings import HuggingFaceEmbeddings |
|
|
from langchain_openai import ChatOpenAI |
|
|
from langchain.chains import ConversationalRetrievalChain |
|
|
from langchain.memory import ConversationBufferMemory |
|
|
|
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
api_key = os.getenv("OPENROUTER_API_KEY") or os.getenv("OPENAI_API_KEY") |
|
|
if not api_key: |
|
|
print("⚠️ Atenção: Nenhuma chave de API encontrada nas variáveis de ambiente (OPENROUTER_API_KEY ou OPENAI_API_KEY).") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
openai_api_key = os.getenv("OPENROUTER_API_KEY") |
|
|
openai_api_base = os.getenv("OPENROUTER_API_BASE") |
|
|
|
|
|
|
|
|
print("Carregando modelo de embeddings (pode levar um tempo)...") |
|
|
embeddings_model_name = "all-MiniLM-L6-v2" |
|
|
try: |
|
|
embeddings = HuggingFaceEmbeddings(model_name=embeddings_model_name) |
|
|
print(f"Modelo de embeddings '{embeddings_model_name}' carregado.") |
|
|
except Exception as e: |
|
|
print(f"❌ Erro ao carregar embeddings: {e}") |
|
|
print("Verifique sua conexão com a internet ou se o modelo está disponível.") |
|
|
embeddings = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
llm_model_name = "deepseek/deepseek-r1:free" |
|
|
try: |
|
|
llm = ChatOpenAI( |
|
|
model=llm_model_name, |
|
|
temperature=0.5, |
|
|
openai_api_key=openai_api_key, |
|
|
base_url=openai_api_base |
|
|
) |
|
|
print(f"LLM '{llm_model_name}' configurado.") |
|
|
except Exception as e: |
|
|
print(f"❌ Erro ao configurar LLM: {e}") |
|
|
llm = None |
|
|
|
|
|
|
|
|
memoria = ConversationBufferMemory(memory_key="chat_history", return_messages=True, output_key="answer") |
|
|
|
|
|
|
|
|
DB_FILE = "historico_conversas_multidoc.db" |
|
|
|
|
|
def inicializar_db(): |
|
|
conn = sqlite3.connect(DB_FILE) |
|
|
cursor = conn.cursor() |
|
|
cursor.execute(''' |
|
|
CREATE TABLE IF NOT EXISTS conversas ( |
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
|
aluno TEXT, |
|
|
documento TEXT, -- Nova coluna |
|
|
pergunta TEXT, |
|
|
resposta TEXT, |
|
|
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP |
|
|
) |
|
|
''') |
|
|
conn.commit() |
|
|
conn.close() |
|
|
print(f"Banco de dados '{DB_FILE}' inicializado/verificado.") |
|
|
|
|
|
inicializar_db() |
|
|
|
|
|
def salvar_conversa(nome, documento, pergunta, resposta): |
|
|
if not documento: |
|
|
documento = "Nenhum Documento Carregado" |
|
|
try: |
|
|
conn = sqlite3.connect(DB_FILE) |
|
|
cursor = conn.cursor() |
|
|
cursor.execute("INSERT INTO conversas (aluno, documento, pergunta, resposta) VALUES (?, ?, ?, ?)", |
|
|
(nome or "Anônimo", documento, pergunta, resposta)) |
|
|
conn.commit() |
|
|
conn.close() |
|
|
except Exception as e: |
|
|
print(f"❌ Erro ao salvar conversa no DB: {e}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def processar_documento(arquivo_pdf, url, progress=gr.Progress(track_tqdm=True)): |
|
|
print("Arquivo recebido:", arquivo_pdf) |
|
|
if arquivo_pdf is not None: |
|
|
print("Nome do arquivo temporário:", getattr(arquivo_pdf, 'name', None)) |
|
|
|
|
|
def processar_documento(arquivo_pdf, url, progress=gr.Progress(track_tqdm=True)): |
|
|
"""Carrega, divide e cria o vector store para um PDF ou URL.""" |
|
|
if not embeddings or not llm: |
|
|
return None, None, "❌ Erro: Embeddings ou LLM não foram carregados corretamente. Verifique o console.", "" |
|
|
|
|
|
docs = [] |
|
|
documento_nome = None |
|
|
temp_dir = None |
|
|
|
|
|
progress(0, desc="Iniciando...") |
|
|
try: |
|
|
if arquivo_pdf is not None: |
|
|
documento_nome = os.path.basename(arquivo_pdf.name) |
|
|
progress(0.1, desc=f"Carregando PDF: {documento_nome}") |
|
|
|
|
|
|
|
|
loader = PyPDFLoader(arquivo_pdf.name) |
|
|
docs = loader.load() |
|
|
print(f"PDF '{documento_nome}' carregado, {len(docs)} páginas.") |
|
|
|
|
|
elif url and url.strip(): |
|
|
documento_nome = url.strip() |
|
|
progress(0.1, desc=f"Carregando URL: {documento_nome}") |
|
|
loader = WebBaseLoader(documento_nome) |
|
|
docs = loader.load() |
|
|
print(f"URL '{documento_nome}' carregada, {len(docs)} documentos (partes).") |
|
|
|
|
|
else: |
|
|
return None, None, "⚠️ Por favor, forneça um arquivo PDF ou uma URL.", "" |
|
|
|
|
|
if not docs: |
|
|
return None, None, f"❌ Erro: Não foi possível extrair conteúdo de '{documento_nome}'.", documento_nome |
|
|
|
|
|
progress(0.4, desc="Dividindo documento...") |
|
|
text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=100) |
|
|
documents = text_splitter.split_documents(docs) |
|
|
print(f"Documento dividido em {len(documents)} chunks.") |
|
|
|
|
|
if not documents: |
|
|
return None, None, "❌ Erro: Documento vazio após divisão.", documento_nome |
|
|
|
|
|
progress(0.6, desc="Criando embeddings e vector store (pode levar tempo)...") |
|
|
vectordb = FAISS.from_documents(documents, embeddings) |
|
|
retriever = vectordb.as_retriever() |
|
|
print("Vector store FAISS criado.") |
|
|
|
|
|
progress(0.9, desc="Limpando memória da conversa anterior...") |
|
|
memoria.clear() |
|
|
print("Memória da conversa resetada.") |
|
|
|
|
|
progress(1, desc="Documento processado!") |
|
|
status = f"✅ Documento '{documento_nome}' carregado e pronto para consulta." |
|
|
return retriever, documento_nome, status, "" |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ Erro detalhado no processamento: {traceback.format_exc()}") |
|
|
return None, None, f"❌ Erro ao processar o documento: {e}", "" |
|
|
finally: |
|
|
|
|
|
|
|
|
if arquivo_pdf is not None and hasattr(arquivo_pdf, 'name') and os.path.exists(arquivo_pdf.name): |
|
|
|
|
|
|
|
|
pass |
|
|
|
|
|
|
|
|
def responder(pergunta, nome_aluno, state_retriever, state_doc_nome): |
|
|
"""Responde a pergunta usando o RAG com o documento carregado.""" |
|
|
if not state_retriever: |
|
|
return "⚠️ Por favor, carregue um documento (PDF ou URL) primeiro usando o botão 'Carregar Documento'." |
|
|
if not pergunta or not pergunta.strip(): |
|
|
return "⚠️ Por favor, digite sua pergunta." |
|
|
if not llm: |
|
|
return "❌ Erro: LLM não está configurado corretamente." |
|
|
|
|
|
print(f"\nRecebida pergunta sobre '{state_doc_nome}': {pergunta}") |
|
|
|
|
|
try: |
|
|
|
|
|
qa_chain = ConversationalRetrievalChain.from_llm( |
|
|
llm=llm, |
|
|
retriever=state_retriever, |
|
|
memory=memoria, |
|
|
return_source_documents=True, |
|
|
output_key="answer" |
|
|
) |
|
|
|
|
|
|
|
|
resultado = qa_chain.invoke({"question": pergunta}) |
|
|
resposta_bruta = resultado.get("answer", "Desculpe, não consegui gerar uma resposta.") |
|
|
fontes = resultado.get("source_documents", []) |
|
|
|
|
|
|
|
|
resposta = resposta_bruta.content if hasattr(resposta_bruta, "content") else str(resposta_bruta) |
|
|
|
|
|
print(f"Resposta gerada: {resposta}") |
|
|
if fontes: |
|
|
print(f"Fontes encontradas: {len(fontes)} chunks.") |
|
|
|
|
|
|
|
|
|
|
|
salvar_conversa(nome_aluno, state_doc_nome, pergunta, resposta) |
|
|
|
|
|
return resposta |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ Erro detalhado ao responder: {traceback.format_exc()}") |
|
|
|
|
|
return f"❌ **Erro ao gerar resposta:**\n```\n{traceback.format_exc()}\n```" |
|
|
|
|
|
|
|
|
def resetar_memoria_app(): |
|
|
"""Reseta a memória da conversa.""" |
|
|
memoria.clear() |
|
|
print("Memória resetada manualmente.") |
|
|
return "✅ Memória da conversa atual resetada!" |
|
|
|
|
|
def exportar_conversas(): |
|
|
"""Exporta o histórico de conversas para CSV e Excel.""" |
|
|
try: |
|
|
conn = sqlite3.connect(DB_FILE) |
|
|
|
|
|
df = pd.read_sql_query("SELECT id, timestamp, aluno, documento, pergunta, resposta FROM conversas ORDER BY timestamp DESC", conn) |
|
|
csv_file = "conversas_exportadas.csv" |
|
|
excel_file = "conversas_exportadas.xlsx" |
|
|
df.to_csv(csv_file, index=False, encoding='utf-8') |
|
|
df.to_excel(excel_file, index=False, engine="openpyxl") |
|
|
conn.close() |
|
|
print(f"Histórico exportado para '{csv_file}' e '{excel_file}'.") |
|
|
return f"✅ Histórico exportado para '{csv_file}' e '{excel_file}'!" |
|
|
except Exception as e: |
|
|
print(f"❌ Erro ao exportar histórico: {e}") |
|
|
return f"❌ Erro ao exportar histórico: {e}" |
|
|
|
|
|
|
|
|
with gr.Blocks(theme=gr.themes.Soft()) as app: |
|
|
gr.Markdown("# 🧠 Tutor Multidisciplinar / Analista de Documentos Genérico 📄") |
|
|
gr.Markdown("Faça upload de um PDF ou insira uma URL para começar a conversar sobre o conteúdo.") |
|
|
|
|
|
|
|
|
state_retriever = gr.State(None) |
|
|
state_doc_nome = gr.State(None) |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
pdf_upload = gr.File(label="Upload de PDF", file_types=[".pdf"]) |
|
|
url_input = gr.Textbox(label="Ou Insira a URL do Documento") |
|
|
btn_carregar = gr.Button("🚀 Carregar Documento", variant="primary") |
|
|
status_carregamento = gr.Markdown("") |
|
|
|
|
|
with gr.Column(scale=2): |
|
|
chatbot_display = gr.Textbox(label="Resposta do Assistente", lines=15, interactive=False) |
|
|
nome_aluno = gr.Textbox(label="Seu nome (opcional)", placeholder="Ex: Maria") |
|
|
pergunta_input = gr.Textbox(label="Sua Pergunta sobre o Documento Carregado", placeholder="Faça sua pergunta aqui...") |
|
|
with gr.Row(): |
|
|
btn_enviar = gr.Button("✉️ Enviar Pergunta", variant="primary") |
|
|
btn_resetar = gr.Button("🔁 Resetar Memória") |
|
|
btn_exportar = gr.Button("📤 Exportar Histórico") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
btn_carregar.click( |
|
|
fn=processar_documento, |
|
|
inputs=[pdf_upload, url_input], |
|
|
outputs=[state_retriever, state_doc_nome, status_carregamento, pergunta_input] |
|
|
) |
|
|
|
|
|
|
|
|
btn_enviar.click( |
|
|
fn=responder, |
|
|
inputs=[pergunta_input, nome_aluno, state_retriever, state_doc_nome], |
|
|
outputs=chatbot_display |
|
|
).then(lambda: "", outputs=pergunta_input) |
|
|
|
|
|
|
|
|
btn_resetar.click( |
|
|
fn=resetar_memoria_app, |
|
|
outputs=chatbot_display |
|
|
) |
|
|
|
|
|
|
|
|
btn_exportar.click( |
|
|
fn=exportar_conversas, |
|
|
outputs=chatbot_display |
|
|
) |
|
|
|
|
|
|
|
|
def limpar_outro_input(input_data): |
|
|
|
|
|
if input_data is not None: |
|
|
return None |
|
|
return gr.update() |
|
|
|
|
|
pdf_upload.change(fn=limpar_outro_input, inputs=pdf_upload, outputs=url_input) |
|
|
url_input.change(fn=limpar_outro_input, inputs=url_input, outputs=pdf_upload) |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
if embeddings and llm: |
|
|
print("Iniciando interface Gradio...") |
|
|
app.launch(share=True, debug=True) |
|
|
else: |
|
|
print("❌ Aplicação não iniciada devido a falha no carregamento de Embeddings ou LLM.") |