AgentPDF / main_graph.py
rwayz's picture
Deploy
6b29104
"""
Grafo principal do AgentPDF usando LangGraph.
Este módulo define o grafo principal que orquestra todos os nós
para processar PDFs e responder perguntas usando LLM.
"""
from typing import Literal
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_core.messages import HumanMessage
from agents.state import PDFState, ProcessingStatus
from nodes.pdf_loader import load_pdf_node
from nodes.text_processor import text_processing_node
from nodes.embeddings_creator import embeddings_creation_node
from nodes.context_retriever import context_retrieval_node
from nodes.llm_agent import llm_agent_node
from utils.logger import log_graph_execution, main_logger
from utils.config import Config
class AgentPDFGraph:
"""
Classe principal do grafo AgentPDF.
Gerencia o fluxo de processamento de PDFs e geração de respostas
usando a arquitetura de nós do LangGraph.
"""
def __init__(self):
"""Inicializa o grafo AgentPDF."""
self.graph = None
self._build_graph()
log_graph_execution("INIT", "Grafo AgentPDF inicializado")
def _build_graph(self):
"""Constrói o grafo com todos os nós e conexões."""
# Cria o StateGraph
graph_builder = StateGraph(PDFState)
# Adiciona todos os nós
self._add_nodes(graph_builder)
# Define as conexões entre nós
self._add_edges(graph_builder)
# Compila o grafo
self.graph = graph_builder.compile()
log_graph_execution("BUILD", "Grafo construído e compilado com sucesso")
def _add_nodes(self, builder: StateGraph):
"""
Adiciona todos os nós ao grafo.
Args:
builder: Builder do StateGraph
"""
# Nó de carregamento de PDF
builder.add_node("load_pdf", load_pdf_node)
# Nó de processamento de texto
builder.add_node("process_text", text_processing_node)
# Nó de criação de embeddings
builder.add_node("create_embeddings", embeddings_creation_node)
# Nó de recuperação de contexto
builder.add_node("retrieve_context", context_retrieval_node)
# Nó do agente LLM
builder.add_node("llm_agent", llm_agent_node)
log_graph_execution("NODES", "Todos os nós adicionados ao grafo")
def _add_edges(self, builder: StateGraph):
"""
Define as conexões entre os nós.
Args:
builder: Builder do StateGraph
"""
# Ponto de entrada condicional
builder.add_conditional_edges(
START,
self._route_start,
{
"process_pdf": "load_pdf",
"answer_question": "retrieve_context"
}
)
# Fluxo de processamento de PDF
builder.add_edge("load_pdf", "process_text")
builder.add_edge("process_text", "create_embeddings")
# Após criar embeddings, vai para o fim (PDF processado)
builder.add_edge("create_embeddings", END)
# Fluxo de resposta a perguntas
builder.add_edge("retrieve_context", "llm_agent")
builder.add_edge("llm_agent", END)
log_graph_execution("EDGES", "Todas as conexões definidas")
def _route_start(self, state: PDFState) -> Literal["process_pdf", "answer_question"]:
"""
Determina o ponto de entrada baseado no estado.
Args:
state: Estado atual do grafo
Returns:
str: Próximo nó a ser executado
"""
# Se há um PDF para processar e ainda não foi processado
if state.get("pdf_path") and not state.get("embeddings_created", False):
log_graph_execution("ROUTE", "Direcionando para processamento de PDF")
return "process_pdf"
# Se há uma pergunta e o PDF já foi processado
if state.get("messages") and state.get("embeddings_created", False):
log_graph_execution("ROUTE", "Direcionando para resposta de pergunta")
return "answer_question"
# Fallback: processar PDF
log_graph_execution("ROUTE", "Fallback: direcionando para processamento de PDF")
return "process_pdf"
def process_pdf(self, pdf_path: str) -> dict:
"""
Processa um arquivo PDF.
Args:
pdf_path: Caminho para o arquivo PDF
Returns:
dict: Resultado do processamento
"""
log_graph_execution("PROCESS_PDF", f"Iniciando processamento: {pdf_path}")
try:
# Estado inicial para processamento
initial_state = {
"pdf_path": pdf_path,
"messages": [],
"embeddings_created": False,
"processing_status": ProcessingStatus.LOADING_PDF
}
# Executa o grafo
result = self.graph.invoke(initial_state)
# Verifica se o processamento foi bem-sucedido
if result.get("processing_status") == ProcessingStatus.ERROR:
error_msg = result.get("error_message", "Erro desconhecido")
log_graph_execution("PROCESS_PDF", f"ERRO: {error_msg}")
return {
"success": False,
"error": error_msg,
"result": result
}
log_graph_execution("PROCESS_PDF", "PDF processado com sucesso")
return {
"success": True,
"message": "PDF processado e indexado com sucesso!",
"result": result
}
except Exception as e:
error_msg = f"Erro no processamento do PDF: {str(e)}"
log_graph_execution("PROCESS_PDF", f"ERRO: {error_msg}")
main_logger.exception("Erro detalhado no processamento:")
return {
"success": False,
"error": error_msg,
"result": None
}
def ask_question(self, question: str, current_state: dict = None) -> dict:
"""
Faz uma pergunta sobre o PDF processado.
Args:
question: Pergunta do usuário
current_state: Estado atual (opcional)
Returns:
dict: Resposta gerada
"""
log_graph_execution("ASK_QUESTION", f"Pergunta: {question[:100]}...")
try:
# Verifica se há estado atual ou cria um novo
if current_state is None:
log_graph_execution("ASK_QUESTION", "ERRO: Nenhum estado fornecido")
return {
"success": False,
"error": "PDF não foi processado. Faça upload de um PDF primeiro.",
"answer": None
}
# Verifica se o PDF foi processado
if not current_state.get("embeddings_created", False):
return {
"success": False,
"error": "PDF não foi processado completamente. Tente novamente.",
"answer": None
}
# Adiciona a pergunta às mensagens
human_message = HumanMessage(content=question)
messages = current_state.get("messages", [])
messages.append(human_message)
# Estado para a pergunta
question_state = {
**current_state,
"messages": messages,
"user_question": question,
"processing_status": ProcessingStatus.RETRIEVING_CONTEXT
}
# Executa o grafo
result = self.graph.invoke(question_state)
# Verifica se houve erro
if result.get("processing_status") == ProcessingStatus.ERROR:
error_msg = result.get("error_message", "Erro desconhecido")
log_graph_execution("ASK_QUESTION", f"ERRO: {error_msg}")
return {
"success": False,
"error": error_msg,
"answer": None
}
# Extrai a resposta
answer = result.get("final_answer", "Não foi possível gerar uma resposta.")
log_graph_execution("ASK_QUESTION", f"Resposta gerada: {len(answer)} caracteres")
return {
"success": True,
"answer": answer,
"result": result
}
except Exception as e:
error_msg = f"Erro ao processar pergunta: {str(e)}"
log_graph_execution("ASK_QUESTION", f"ERRO: {error_msg}")
main_logger.exception("Erro detalhado na pergunta:")
return {
"success": False,
"error": error_msg,
"answer": None
}
def get_graph_visualization(self) -> str:
"""
Retorna uma representação visual do grafo.
Returns:
str: Representação do grafo
"""
try:
# Tenta gerar visualização se disponível
if hasattr(self.graph, 'get_graph'):
return str(self.graph.get_graph())
else:
return "Visualização não disponível"
except Exception as e:
main_logger.warning(f"Erro ao gerar visualização: {e}")
return "Erro na visualização do grafo"
def get_status(self) -> dict:
"""
Retorna o status atual do grafo.
Returns:
dict: Status do grafo
"""
return {
"graph_compiled": self.graph is not None,
"config_valid": Config.validate_config(),
"nodes_count": 5, # Número de nós no grafo
"ready": self.graph is not None and Config.validate_config()
}
# Instância global do grafo
agent_pdf_graph = AgentPDFGraph()
def get_agent_graph() -> AgentPDFGraph:
"""
Retorna a instância global do grafo.
Returns:
AgentPDFGraph: Instância do grafo
"""
return agent_pdf_graph
def process_pdf_file(pdf_path: str) -> dict:
"""
Função de conveniência para processar um PDF.
Args:
pdf_path: Caminho para o arquivo PDF
Returns:
dict: Resultado do processamento
"""
return agent_pdf_graph.process_pdf(pdf_path)
def ask_pdf_question(question: str, state: dict = None) -> dict:
"""
Função de conveniência para fazer perguntas.
Args:
question: Pergunta do usuário
state: Estado atual do processamento
Returns:
dict: Resposta gerada
"""
return agent_pdf_graph.ask_question(question, state)