Spaces:
Sleeping
Sleeping
| import os | |
| from typing import Optional, Tuple | |
| import gradio as gr | |
| from langchain_community.document_loaders import PyPDFLoader, DirectoryLoader | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from transformers import AutoTokenizer, AutoModelForSeq2SeqLM | |
| import torch | |
| import tempfile | |
| import time | |
| # Configurações | |
| LLM_MODEL = "google/flan-t5-large" | |
| DOCS_DIR = "documents" | |
| class DocumentQA: | |
| def __init__(self): | |
| # Carrega o modelo e o tokenizador | |
| self.tokenizer = AutoTokenizer.from_pretrained(LLM_MODEL) | |
| self.model = AutoModelForSeq2SeqLM.from_pretrained( | |
| LLM_MODEL, | |
| device_map="auto", | |
| torch_dtype=torch.float32 | |
| ) | |
| # Carrega a base de conhecimento | |
| self.base_texts = self.load_base_knowledge() | |
| def load_base_knowledge(self) -> Optional[list]: | |
| try: | |
| if not os.path.exists(DOCS_DIR): | |
| os.makedirs(DOCS_DIR) | |
| return None | |
| # Carrega documentos da pasta | |
| loader = DirectoryLoader( | |
| DOCS_DIR, | |
| glob="**/*.pdf", | |
| loader_cls=PyPDFLoader | |
| ) | |
| documents = loader.load() | |
| if not documents: | |
| return None | |
| # Divide os documentos em trechos menores | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=500, | |
| chunk_overlap=100, | |
| length_function=len, | |
| separators=["\n\n", "\n", ".", " ", ""] | |
| ) | |
| texts = text_splitter.split_documents(documents) | |
| # Extrai o texto dos trechos | |
| return [doc.page_content for doc in texts] | |
| except Exception as e: | |
| print(f"Erro ao carregar base de conhecimento: {str(e)}") | |
| return None | |
| def process_pdf(self, file_content: bytes) -> Optional[list]: | |
| try: | |
| # Salva o PDF temporariamente | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file: | |
| tmp_file.write(file_content) | |
| tmp_path = tmp_file.name | |
| # Carrega o PDF | |
| loader = PyPDFLoader(tmp_path) | |
| documents = loader.load() | |
| os.unlink(tmp_path) | |
| if not documents: | |
| return None | |
| # Divide o PDF em trechos menores | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=500, | |
| chunk_overlap=100, | |
| length_function=len, | |
| separators=["\n\n", "\n", ".", " ", ""] | |
| ) | |
| texts = text_splitter.split_documents(documents) | |
| # Extrai o texto dos trechos | |
| return [doc.page_content for doc in texts] | |
| except Exception as e: | |
| print(f"Erro ao processar PDF: {str(e)}") | |
| return None | |
| def find_relevant_texts(self, query: str, texts: list) -> list: | |
| """Encontra trechos relevantes com base em palavras-chave da pergunta.""" | |
| relevant_texts = [] | |
| query_keywords = set(query.lower().split()) | |
| for text in texts: | |
| text_keywords = set(text.lower().split()) | |
| if query_keywords.intersection(text_keywords): | |
| relevant_texts.append(text) | |
| return relevant_texts | |
| def generate_response(self, file_obj, query: str, progress=gr.Progress()) -> Tuple[str, str, str]: | |
| """Retorna (resposta, status, tempo_decorrido)""" | |
| if not query.strip(): | |
| return "Por favor, insira uma pergunta.", "⚠️ Aguardando pergunta", "0s" | |
| start_time = time.time() | |
| try: | |
| progress(0.2, desc="Processando documentos...") | |
| # Determina a fonte dos documentos | |
| has_pdf = file_obj is not None | |
| has_base = self.base_texts is not None | |
| source_type = "both" if has_pdf and has_base else "pdf" if has_pdf else "base" if has_base else None | |
| if not source_type: | |
| return "Nenhuma fonte de documentos disponível.", "❌ Sem documentos", "0s" | |
| # Processa documento | |
| if has_pdf: | |
| pdf_texts = self.process_pdf(file_obj) | |
| if pdf_texts is None: | |
| return "Não foi possível processar o PDF.", "❌ Erro no processamento", "0s" | |
| else: | |
| pdf_texts = [] | |
| # Combina os textos | |
| all_texts = pdf_texts + (self.base_texts if self.base_texts else []) | |
| progress(0.4, desc="Buscando informações relevantes...") | |
| # Encontra trechos relevantes | |
| relevant_texts = self.find_relevant_texts(query, all_texts) | |
| # Verifica se há trechos relevantes | |
| if not relevant_texts: | |
| return "🔍 Não foram encontradas informações suficientes nos documentos para responder esta pergunta.", "⚠️ Contexto insuficiente", f"{time.time() - start_time:.1f}s" | |
| # Prepara o contexto para o prompt | |
| context = "\n\n".join(relevant_texts) | |
| progress(0.6, desc="Gerando resposta...") | |
| # Cria o prompt | |
| prompt = f"""Instruções: | |
| 1. Analise cuidadosamente o contexto fornecido. | |
| 2. Responda à seguinte pergunta em português de forma clara e direta: {query} | |
| 3. Use apenas informações encontradas no contexto. | |
| 4. Se não houver informações suficientes, indique explicitamente. | |
| 5. Mantenha a resposta objetiva e baseada em fatos. | |
| 6. Cite exemplos específicos do contexto quando relevante. | |
| Contexto: | |
| {context} | |
| Pergunta: {query}""" | |
| # Gera a resposta usando o modelo diretamente | |
| inputs = self.tokenizer(prompt, return_tensors="pt", max_length=512, truncation=True) | |
| outputs = self.model.generate( | |
| inputs["input_ids"], | |
| max_length=512, | |
| temperature=0.3, | |
| top_p=0.9, | |
| repetition_penalty=1.2 | |
| ) | |
| response = self.tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| elapsed_time = f"{time.time() - start_time:.1f}s" | |
| progress(1.0, desc="Concluído!") | |
| return response, "✅ Sucesso", elapsed_time | |
| except Exception as e: | |
| elapsed_time = f"{time.time() - start_time:.1f}s" | |
| return f"Erro ao gerar resposta: {str(e)}", "❌ Erro", elapsed_time | |
| def create_demo(): | |
| qa_system = DocumentQA() | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| with gr.Column(elem_id="container"): | |
| # Cabeçalho | |
| gr.Markdown( | |
| """ | |
| # 🤖 Assistente de Documentos Inteligente | |
| Sistema de consulta avançada que responde perguntas sobre seus documentos. | |
| """ | |
| ) | |
| # Área principal | |
| with gr.Row(): | |
| # Coluna de entrada | |
| with gr.Column(): | |
| with gr.Group(): | |
| gr.Markdown("### 📄 Documentos") | |
| file_input = gr.File( | |
| label="Upload de PDF (opcional)", | |
| type="binary", | |
| file_types=[".pdf"], | |
| height=100, | |
| ) | |
| info = gr.Markdown( | |
| f""" | |
| ℹ️ O sistema consulta: | |
| - PDFs enviados por você | |
| - Documentos na pasta `{DOCS_DIR}` | |
| """ | |
| ) | |
| with gr.Group(): | |
| gr.Markdown("### ❓ Sua Pergunta") | |
| query_input = gr.Textbox( | |
| placeholder="Digite sua pergunta sobre os documentos...", | |
| lines=3, | |
| max_lines=6, | |
| show_label=False, | |
| ) | |
| with gr.Row(): | |
| clear_btn = gr.Button("🗑️ Limpar", variant="secondary") | |
| submit_btn = gr.Button("🔍 Enviar Pergunta", variant="primary") | |
| # Coluna de saída | |
| with gr.Column(): | |
| with gr.Group(): | |
| gr.Markdown("### 📝 Resposta") | |
| with gr.Row(): | |
| status_output = gr.Textbox( | |
| label="Status", | |
| value="⏳ Aguardando...", | |
| interactive=False, | |
| show_label=False, | |
| ) | |
| time_output = gr.Textbox( | |
| label="Tempo", | |
| value="0s", | |
| interactive=False, | |
| show_label=False, | |
| ) | |
| response_output = gr.Textbox( | |
| label="Resposta", | |
| placeholder="A resposta aparecerá aqui...", | |
| interactive=False, | |
| lines=12, | |
| show_label=False, | |
| ) | |
| # Exemplos | |
| with gr.Accordion("📚 Exemplos de Perguntas", open=False): | |
| gr.Examples( | |
| examples=[ | |
| [None, "Quais são os principais tópicos abordados neste documento?"], | |
| [None, "Resuma as conclusões mais importantes."], | |
| [None, "O que o documento diz sobre [tema específico]?"], | |
| [None, "Quais são as recomendações apresentadas?"], | |
| ], | |
| inputs=[file_input, query_input], | |
| ) | |
| # Rodapé | |
| gr.Markdown( | |
| """ | |
| --- | |
| ### 🔧 Informações do Sistema | |
| * Respostas geradas usando tecnologia de processamento de linguagem natural | |
| * Processamento inteligente de documentos PDF | |
| * Respostas baseadas exclusivamente no conteúdo dos documentos | |
| * Suporte a múltiplos documentos e contextos | |
| """ | |
| ) | |
| # Eventos | |
| submit_btn.click( | |
| fn=qa_system.generate_response, | |
| inputs=[file_input, query_input], | |
| outputs=[response_output, status_output, time_output], | |
| ) | |
| clear_btn.click( | |
| lambda: (None, "", "⏳ Aguardando...", "0s"), | |
| outputs=[file_input, query_input, status_output, time_output], | |
| ) | |
| # Limpa a resposta quando a pergunta muda | |
| query_input.change( | |
| lambda: ("", "⏳ Aguardando...", "0s"), | |
| outputs=[response_output, status_output, time_output], | |
| ) | |
| return demo | |
| if __name__ == "__main__": | |
| demo = create_demo() | |
| demo.launch() |