| """ |
| Nó de recuperação de contexto para o AgentPDF. |
| |
| Este nó é responsável por buscar documentos relevantes no vector store |
| baseado na pergunta do usuário para fornecer contexto ao LLM. |
| """ |
|
|
| from typing import Dict, Any, List, Tuple |
| from langchain_community.vectorstores import FAISS |
| from langchain_core.runnables import RunnableConfig |
| from langchain_core.documents import Document |
| from langchain_core.messages import HumanMessage |
|
|
| from agents.state import PDFState, ProcessingStatus |
| from utils.config import Config |
| from utils.logger import log_node_execution, main_logger |
|
|
|
|
| def context_retrieval_node(state: PDFState, config: RunnableConfig) -> Dict[str, Any]: |
| """ |
| Nó responsável por recuperar contexto relevante para a pergunta. |
| |
| Este nó: |
| 1. Extrai a pergunta do usuário das mensagens |
| 2. Busca documentos relevantes no vector store |
| 3. Seleciona e otimiza o contexto |
| 4. Atualiza o estado com o contexto recuperado |
| |
| Args: |
| state: Estado atual do grafo |
| config: Configuração do LangGraph |
| |
| Returns: |
| Dict[str, Any]: Atualizações para o estado |
| """ |
| log_node_execution("CONTEXT_RETRIEVER", "START", "Iniciando recuperação de contexto") |
| |
| try: |
| |
| vector_store = state.get("vector_store") |
| if not vector_store: |
| error_msg = "Vector store não encontrado. Execute o processamento do PDF primeiro." |
| log_node_execution("CONTEXT_RETRIEVER", "ERROR", error_msg) |
| return { |
| "processing_status": ProcessingStatus.ERROR, |
| "error_message": error_msg |
| } |
| |
| |
| user_question = extract_user_question(state) |
| if not user_question: |
| error_msg = "Nenhuma pergunta encontrada nas mensagens" |
| log_node_execution("CONTEXT_RETRIEVER", "ERROR", error_msg) |
| return { |
| "processing_status": ProcessingStatus.ERROR, |
| "error_message": error_msg |
| } |
| |
| log_node_execution( |
| "CONTEXT_RETRIEVER", |
| "PROCESSING", |
| f"Buscando contexto para: '{user_question[:100]}...'" |
| ) |
| |
| |
| relevant_docs = retrieve_relevant_documents(vector_store, user_question) |
| |
| if not relevant_docs: |
| log_node_execution( |
| "CONTEXT_RETRIEVER", |
| "SUCCESS", |
| "Nenhum contexto específico encontrado, usando busca ampla" |
| ) |
| |
| relevant_docs = retrieve_relevant_documents( |
| vector_store, |
| user_question, |
| k=10, |
| use_broad_search=True |
| ) |
| |
| |
| context_text = process_retrieved_context(relevant_docs, user_question) |
| |
| log_node_execution( |
| "CONTEXT_RETRIEVER", |
| "SUCCESS", |
| f"Contexto recuperado: {len(relevant_docs)} documentos, {len(context_text)} caracteres" |
| ) |
| |
| return { |
| "retrieved_context": [doc.page_content for doc in relevant_docs], |
| "user_question": user_question, |
| "processing_status": ProcessingStatus.GENERATING_RESPONSE, |
| "error_message": None |
| } |
| |
| except Exception as e: |
| error_msg = f"Erro na recuperação de contexto: {str(e)}" |
| log_node_execution("CONTEXT_RETRIEVER", "ERROR", error_msg) |
| main_logger.exception("Erro detalhado na recuperação de contexto:") |
| |
| return { |
| "processing_status": ProcessingStatus.ERROR, |
| "error_message": error_msg |
| } |
|
|
|
|
| def extract_user_question(state: PDFState) -> str: |
| """ |
| Extrai a pergunta do usuário das mensagens. |
| |
| Args: |
| state: Estado atual contendo as mensagens |
| |
| Returns: |
| str: Pergunta do usuário |
| """ |
| messages = state.get("messages", []) |
| |
| |
| for message in reversed(messages): |
| if isinstance(message, HumanMessage): |
| return message.content.strip() |
| |
| |
| user_question = state.get("user_question") |
| if user_question: |
| return user_question.strip() |
| |
| return "" |
|
|
|
|
| def retrieve_relevant_documents( |
| vector_store: FAISS, |
| query: str, |
| k: int = None, |
| use_broad_search: bool = False |
| ) -> List[Document]: |
| """ |
| Busca documentos relevantes no vector store. |
| |
| Args: |
| vector_store: Vector store FAISS |
| query: Pergunta do usuário |
| k: Número de documentos para retornar |
| use_broad_search: Se deve usar busca mais ampla |
| |
| Returns: |
| List[Document]: Lista de documentos relevantes |
| """ |
| try: |
| |
| config = Config.get_retrieval_config() |
| search_k = k or config["k"] |
| |
| if use_broad_search: |
| search_k = min(search_k * 2, 15) |
| |
| |
| docs_with_scores = vector_store.similarity_search_with_score( |
| query, |
| k=search_k |
| ) |
| |
| |
| if not use_broad_search: |
| threshold = config["score_threshold"] |
| filtered_docs = [ |
| doc for doc, score in docs_with_scores |
| if score <= threshold |
| ] |
| else: |
| |
| filtered_docs = [doc for doc, score in docs_with_scores] |
| |
| |
| main_logger.debug(f"Busca retornou {len(docs_with_scores)} documentos") |
| main_logger.debug(f"Após filtragem: {len(filtered_docs)} documentos") |
| |
| if docs_with_scores: |
| best_score = docs_with_scores[0][1] |
| main_logger.debug(f"Melhor score de similaridade: {best_score:.4f}") |
| |
| return filtered_docs |
| |
| except Exception as e: |
| main_logger.error(f"Erro na busca de documentos: {e}") |
| return [] |
|
|
|
|
| def process_retrieved_context(documents: List[Document], query: str) -> str: |
| """ |
| Processa e otimiza o contexto recuperado. |
| |
| Args: |
| documents: Lista de documentos recuperados |
| query: Pergunta original do usuário |
| |
| Returns: |
| str: Contexto processado e otimizado |
| """ |
| if not documents: |
| return "" |
| |
| |
| sorted_docs = rank_documents_by_relevance(documents, query) |
| |
| |
| context_parts = [] |
| total_length = 0 |
| max_context_length = 4000 |
| |
| for i, doc in enumerate(sorted_docs): |
| content = doc.page_content.strip() |
| |
| |
| if total_length + len(content) > max_context_length: |
| |
| remaining_space = max_context_length - total_length |
| if remaining_space > 200: |
| truncated_content = content[:remaining_space-50] + "..." |
| context_parts.append(f"[Documento {i+1}]\n{truncated_content}") |
| break |
| |
| context_parts.append(f"[Documento {i+1}]\n{content}") |
| total_length += len(content) |
| |
| |
| final_context = "\n\n".join(context_parts) |
| |
| main_logger.debug(f"Contexto final: {len(final_context)} caracteres de {len(documents)} documentos") |
| |
| return final_context |
|
|
|
|
| def rank_documents_by_relevance(documents: List[Document], query: str) -> List[Document]: |
| """ |
| Ordena documentos por relevância à pergunta. |
| |
| Args: |
| documents: Lista de documentos |
| query: Pergunta do usuário |
| |
| Returns: |
| List[Document]: Documentos ordenados por relevância |
| """ |
| |
| |
| |
| |
| query_words = set(query.lower().split()) |
| |
| def calculate_relevance_score(doc: Document) -> float: |
| content_words = set(doc.page_content.lower().split()) |
| |
| |
| common_words = query_words.intersection(content_words) |
| |
| |
| if len(query_words) == 0: |
| return 0.0 |
| |
| return len(common_words) / len(query_words) |
| |
| |
| scored_docs = [(doc, calculate_relevance_score(doc)) for doc in documents] |
| scored_docs.sort(key=lambda x: x[1], reverse=True) |
| |
| |
| for i, (doc, score) in enumerate(scored_docs[:3]): |
| main_logger.debug(f"Doc {i+1} relevance score: {score:.3f}") |
| |
| return [doc for doc, score in scored_docs] |
|
|
|
|
| def enhance_query_for_retrieval(query: str) -> str: |
| """ |
| Melhora a query para melhor recuperação. |
| |
| Args: |
| query: Query original |
| |
| Returns: |
| str: Query melhorada |
| """ |
| |
| stop_words = { |
| 'o', 'a', 'os', 'as', 'um', 'uma', 'uns', 'umas', |
| 'de', 'do', 'da', 'dos', 'das', 'em', 'no', 'na', |
| 'nos', 'nas', 'por', 'para', 'com', 'sem', 'sobre', |
| 'que', 'qual', 'quais', 'como', 'quando', 'onde', |
| 'é', 'são', 'foi', 'foram', 'ser', 'estar' |
| } |
| |
| |
| words = query.lower().split() |
| meaningful_words = [word for word in words if word not in stop_words and len(word) > 2] |
| |
| enhanced_query = ' '.join(meaningful_words) |
| |
| if enhanced_query != query.lower(): |
| main_logger.debug(f"Query melhorada: '{query}' -> '{enhanced_query}'") |
| |
| return enhanced_query if enhanced_query else query |
|
|
|
|
| def get_retrieval_statistics(documents: List[Document]) -> Dict[str, Any]: |
| """ |
| Calcula estatísticas da recuperação. |
| |
| Args: |
| documents: Documentos recuperados |
| |
| Returns: |
| Dict[str, Any]: Estatísticas da recuperação |
| """ |
| if not documents: |
| return { |
| "total_documents": 0, |
| "total_characters": 0, |
| "average_length": 0 |
| } |
| |
| lengths = [len(doc.page_content) for doc in documents] |
| |
| return { |
| "total_documents": len(documents), |
| "total_characters": sum(lengths), |
| "average_length": sum(lengths) / len(lengths), |
| "min_length": min(lengths), |
| "max_length": max(lengths) |
| } |
|
|