AgentPDF / nodes /context_retriever.py
rwayz's picture
Deploy
6b29104
"""
Nó de recuperação de contexto para o AgentPDF.
Este nó é responsável por buscar documentos relevantes no vector store
baseado na pergunta do usuário para fornecer contexto ao LLM.
"""
from typing import Dict, Any, List, Tuple
from langchain_community.vectorstores import FAISS
from langchain_core.runnables import RunnableConfig
from langchain_core.documents import Document
from langchain_core.messages import HumanMessage
from agents.state import PDFState, ProcessingStatus
from utils.config import Config
from utils.logger import log_node_execution, main_logger
def context_retrieval_node(state: PDFState, config: RunnableConfig) -> Dict[str, Any]:
"""
Nó responsável por recuperar contexto relevante para a pergunta.
Este nó:
1. Extrai a pergunta do usuário das mensagens
2. Busca documentos relevantes no vector store
3. Seleciona e otimiza o contexto
4. Atualiza o estado com o contexto recuperado
Args:
state: Estado atual do grafo
config: Configuração do LangGraph
Returns:
Dict[str, Any]: Atualizações para o estado
"""
log_node_execution("CONTEXT_RETRIEVER", "START", "Iniciando recuperação de contexto")
try:
# Verifica se o vector store existe
vector_store = state.get("vector_store")
if not vector_store:
error_msg = "Vector store não encontrado. Execute o processamento do PDF primeiro."
log_node_execution("CONTEXT_RETRIEVER", "ERROR", error_msg)
return {
"processing_status": ProcessingStatus.ERROR,
"error_message": error_msg
}
# Extrai a pergunta do usuário
user_question = extract_user_question(state)
if not user_question:
error_msg = "Nenhuma pergunta encontrada nas mensagens"
log_node_execution("CONTEXT_RETRIEVER", "ERROR", error_msg)
return {
"processing_status": ProcessingStatus.ERROR,
"error_message": error_msg
}
log_node_execution(
"CONTEXT_RETRIEVER",
"PROCESSING",
f"Buscando contexto para: '{user_question[:100]}...'"
)
# Busca documentos relevantes
relevant_docs = retrieve_relevant_documents(vector_store, user_question)
if not relevant_docs:
log_node_execution(
"CONTEXT_RETRIEVER",
"SUCCESS",
"Nenhum contexto específico encontrado, usando busca ampla"
)
# Tenta uma busca mais ampla
relevant_docs = retrieve_relevant_documents(
vector_store,
user_question,
k=10,
use_broad_search=True
)
# Processa e otimiza o contexto
context_text = process_retrieved_context(relevant_docs, user_question)
log_node_execution(
"CONTEXT_RETRIEVER",
"SUCCESS",
f"Contexto recuperado: {len(relevant_docs)} documentos, {len(context_text)} caracteres"
)
return {
"retrieved_context": [doc.page_content for doc in relevant_docs],
"user_question": user_question,
"processing_status": ProcessingStatus.GENERATING_RESPONSE,
"error_message": None
}
except Exception as e:
error_msg = f"Erro na recuperação de contexto: {str(e)}"
log_node_execution("CONTEXT_RETRIEVER", "ERROR", error_msg)
main_logger.exception("Erro detalhado na recuperação de contexto:")
return {
"processing_status": ProcessingStatus.ERROR,
"error_message": error_msg
}
def extract_user_question(state: PDFState) -> str:
"""
Extrai a pergunta do usuário das mensagens.
Args:
state: Estado atual contendo as mensagens
Returns:
str: Pergunta do usuário
"""
messages = state.get("messages", [])
# Procura pela última mensagem humana
for message in reversed(messages):
if isinstance(message, HumanMessage):
return message.content.strip()
# Fallback: verifica se há pergunta direta no estado
user_question = state.get("user_question")
if user_question:
return user_question.strip()
return ""
def retrieve_relevant_documents(
vector_store: FAISS,
query: str,
k: int = None,
use_broad_search: bool = False
) -> List[Document]:
"""
Busca documentos relevantes no vector store.
Args:
vector_store: Vector store FAISS
query: Pergunta do usuário
k: Número de documentos para retornar
use_broad_search: Se deve usar busca mais ampla
Returns:
List[Document]: Lista de documentos relevantes
"""
try:
# Configurações de busca
config = Config.get_retrieval_config()
search_k = k or config["k"]
if use_broad_search:
search_k = min(search_k * 2, 15) # Busca mais ampla
# Busca com scores de similaridade
docs_with_scores = vector_store.similarity_search_with_score(
query,
k=search_k
)
# Filtra por threshold de similaridade se não for busca ampla
if not use_broad_search:
threshold = config["score_threshold"]
filtered_docs = [
doc for doc, score in docs_with_scores
if score <= threshold # FAISS usa distância (menor = mais similar)
]
else:
# Na busca ampla, aceita mais documentos
filtered_docs = [doc for doc, score in docs_with_scores]
# Log da busca
main_logger.debug(f"Busca retornou {len(docs_with_scores)} documentos")
main_logger.debug(f"Após filtragem: {len(filtered_docs)} documentos")
if docs_with_scores:
best_score = docs_with_scores[0][1]
main_logger.debug(f"Melhor score de similaridade: {best_score:.4f}")
return filtered_docs
except Exception as e:
main_logger.error(f"Erro na busca de documentos: {e}")
return []
def process_retrieved_context(documents: List[Document], query: str) -> str:
"""
Processa e otimiza o contexto recuperado.
Args:
documents: Lista de documentos recuperados
query: Pergunta original do usuário
Returns:
str: Contexto processado e otimizado
"""
if not documents:
return ""
# Ordena documentos por relevância (se tiver scores)
sorted_docs = rank_documents_by_relevance(documents, query)
# Combina o contexto
context_parts = []
total_length = 0
max_context_length = 4000 # Limite para não sobrecarregar o LLM
for i, doc in enumerate(sorted_docs):
content = doc.page_content.strip()
# Verifica se ainda cabe no limite
if total_length + len(content) > max_context_length:
# Tenta adicionar uma versão truncada
remaining_space = max_context_length - total_length
if remaining_space > 200: # Só adiciona se sobrar espaço significativo
truncated_content = content[:remaining_space-50] + "..."
context_parts.append(f"[Documento {i+1}]\n{truncated_content}")
break
context_parts.append(f"[Documento {i+1}]\n{content}")
total_length += len(content)
# Junta o contexto
final_context = "\n\n".join(context_parts)
main_logger.debug(f"Contexto final: {len(final_context)} caracteres de {len(documents)} documentos")
return final_context
def rank_documents_by_relevance(documents: List[Document], query: str) -> List[Document]:
"""
Ordena documentos por relevância à pergunta.
Args:
documents: Lista de documentos
query: Pergunta do usuário
Returns:
List[Document]: Documentos ordenados por relevância
"""
# Para uma implementação simples, vamos usar a ordem original
# Em uma versão mais avançada, poderíamos implementar re-ranking
# Calcula scores simples baseados em palavras-chave
query_words = set(query.lower().split())
def calculate_relevance_score(doc: Document) -> float:
content_words = set(doc.page_content.lower().split())
# Conta palavras em comum
common_words = query_words.intersection(content_words)
# Score baseado na proporção de palavras em comum
if len(query_words) == 0:
return 0.0
return len(common_words) / len(query_words)
# Ordena por score de relevância (decrescente)
scored_docs = [(doc, calculate_relevance_score(doc)) for doc in documents]
scored_docs.sort(key=lambda x: x[1], reverse=True)
# Log dos scores para debug
for i, (doc, score) in enumerate(scored_docs[:3]):
main_logger.debug(f"Doc {i+1} relevance score: {score:.3f}")
return [doc for doc, score in scored_docs]
def enhance_query_for_retrieval(query: str) -> str:
"""
Melhora a query para melhor recuperação.
Args:
query: Query original
Returns:
str: Query melhorada
"""
# Remove palavras muito comuns que podem atrapalhar a busca
stop_words = {
'o', 'a', 'os', 'as', 'um', 'uma', 'uns', 'umas',
'de', 'do', 'da', 'dos', 'das', 'em', 'no', 'na',
'nos', 'nas', 'por', 'para', 'com', 'sem', 'sobre',
'que', 'qual', 'quais', 'como', 'quando', 'onde',
'é', 'são', 'foi', 'foram', 'ser', 'estar'
}
# Mantém apenas palavras significativas
words = query.lower().split()
meaningful_words = [word for word in words if word not in stop_words and len(word) > 2]
enhanced_query = ' '.join(meaningful_words)
if enhanced_query != query.lower():
main_logger.debug(f"Query melhorada: '{query}' -> '{enhanced_query}'")
return enhanced_query if enhanced_query else query
def get_retrieval_statistics(documents: List[Document]) -> Dict[str, Any]:
"""
Calcula estatísticas da recuperação.
Args:
documents: Documentos recuperados
Returns:
Dict[str, Any]: Estatísticas da recuperação
"""
if not documents:
return {
"total_documents": 0,
"total_characters": 0,
"average_length": 0
}
lengths = [len(doc.page_content) for doc in documents]
return {
"total_documents": len(documents),
"total_characters": sum(lengths),
"average_length": sum(lengths) / len(lengths),
"min_length": min(lengths),
"max_length": max(lengths)
}