AgentPDF / nodes /text_processor.py
rwayz's picture
Deploy
6b29104
"""
Nó de processamento de texto para o AgentPDF.
Este nó é responsável por dividir o texto extraído do PDF em chunks
menores usando RecursiveCharacterTextSplitter para otimizar a recuperação.
"""
from typing import Dict, Any, List
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.runnables import RunnableConfig
from agents.state import PDFState, ProcessingStatus
from utils.config import Config
from utils.logger import log_node_execution, main_logger
def text_processing_node(state: PDFState, config: RunnableConfig) -> Dict[str, Any]:
"""
Nó responsável por processar e dividir o texto em chunks.
Este nó:
1. Recebe o texto extraído do PDF
2. Divide o texto em chunks usando RecursiveCharacterTextSplitter
3. Otimiza os chunks para melhor recuperação
4. Atualiza o estado com os chunks processados
Args:
state: Estado atual do grafo contendo o texto do PDF
config: Configuração do LangGraph
Returns:
Dict[str, Any]: Atualizações para o estado
"""
log_node_execution("TEXT_PROCESSOR", "START", "Iniciando processamento de texto")
try:
# Verifica se há texto para processar
pdf_text = state.get("pdf_text")
if not pdf_text:
error_msg = "Nenhum texto encontrado para processar"
log_node_execution("TEXT_PROCESSOR", "ERROR", error_msg)
return {
"processing_status": ProcessingStatus.ERROR,
"error_message": error_msg
}
log_node_execution(
"TEXT_PROCESSOR",
"PROCESSING",
f"Processando texto de {len(pdf_text)} caracteres"
)
# Configura o text splitter
text_splitter = create_text_splitter()
# Divide o texto em chunks
chunks = text_splitter.split_text(pdf_text)
if not chunks:
error_msg = "Nenhum chunk foi gerado do texto"
log_node_execution("TEXT_PROCESSOR", "ERROR", error_msg)
return {
"processing_status": ProcessingStatus.ERROR,
"error_message": error_msg
}
# Processa e otimiza os chunks
processed_chunks = process_chunks(chunks)
log_node_execution(
"TEXT_PROCESSOR",
"SUCCESS",
f"Texto dividido em {len(processed_chunks)} chunks"
)
return {
"pdf_chunks": processed_chunks,
"processing_status": ProcessingStatus.CREATING_EMBEDDINGS,
"error_message": None
}
except Exception as e:
error_msg = f"Erro ao processar texto: {str(e)}"
log_node_execution("TEXT_PROCESSOR", "ERROR", error_msg)
main_logger.exception("Erro detalhado no processamento de texto:")
return {
"processing_status": ProcessingStatus.ERROR,
"error_message": error_msg
}
def create_text_splitter() -> RecursiveCharacterTextSplitter:
"""
Cria e configura o RecursiveCharacterTextSplitter.
Returns:
RecursiveCharacterTextSplitter: Splitter configurado
"""
# Obtém configurações
config = Config.get_text_splitter_config()
# Separadores hierárquicos para melhor divisão
separators = [
"\n\n", # Parágrafos
"\n", # Quebras de linha
". ", # Frases
"! ", # Exclamações
"? ", # Perguntas
"; ", # Ponto e vírgula
", ", # Vírgulas
" ", # Espaços
"" # Caracteres individuais
]
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=config["chunk_size"],
chunk_overlap=config["chunk_overlap"],
separators=separators,
length_function=len,
is_separator_regex=False,
)
main_logger.debug(f"Text splitter configurado: chunk_size={config['chunk_size']}, overlap={config['chunk_overlap']}")
return text_splitter
def process_chunks(chunks: List[str]) -> List[str]:
"""
Processa e otimiza os chunks de texto.
Args:
chunks: Lista de chunks brutos
Returns:
List[str]: Lista de chunks processados e otimizados
"""
processed_chunks = []
for i, chunk in enumerate(chunks):
# Limpa o chunk
cleaned_chunk = clean_chunk(chunk)
# Só adiciona chunks com conteúdo significativo
if is_meaningful_chunk(cleaned_chunk):
processed_chunks.append(cleaned_chunk)
main_logger.debug(f"Chunk {i+1} processado: {len(cleaned_chunk)} caracteres")
else:
main_logger.debug(f"Chunk {i+1} descartado por falta de conteúdo significativo")
# Log estatísticas
main_logger.info(f"Chunks processados: {len(processed_chunks)} de {len(chunks)} originais")
if processed_chunks:
avg_length = sum(len(chunk) for chunk in processed_chunks) / len(processed_chunks)
main_logger.info(f"Tamanho médio dos chunks: {avg_length:.0f} caracteres")
return processed_chunks
def clean_chunk(chunk: str) -> str:
"""
Limpa e normaliza um chunk de texto.
Args:
chunk: Chunk bruto
Returns:
str: Chunk limpo
"""
if not chunk:
return ""
# Remove espaços extras no início e fim
chunk = chunk.strip()
# Normaliza quebras de linha
chunk = chunk.replace('\r\n', '\n').replace('\r', '\n')
# Remove quebras de linha excessivas
while '\n\n\n' in chunk:
chunk = chunk.replace('\n\n\n', '\n\n')
# Remove espaços extras entre palavras
lines = []
for line in chunk.split('\n'):
cleaned_line = ' '.join(line.split())
if cleaned_line:
lines.append(cleaned_line)
return '\n'.join(lines)
def is_meaningful_chunk(chunk: str) -> bool:
"""
Verifica se um chunk contém conteúdo significativo.
Args:
chunk: Chunk para verificar
Returns:
bool: True se o chunk é significativo
"""
if not chunk or len(chunk.strip()) < 50: # Muito pequeno
return False
# Conta palavras
words = chunk.split()
if len(words) < 10: # Muito poucas palavras
return False
# Verifica se não é só números ou caracteres especiais
alpha_chars = sum(1 for c in chunk if c.isalpha())
if alpha_chars < len(chunk) * 0.5: # Menos de 50% são letras
return False
return True
def get_chunk_statistics(chunks: List[str]) -> Dict[str, Any]:
"""
Calcula estatísticas dos chunks processados.
Args:
chunks: Lista de chunks
Returns:
Dict[str, Any]: Estatísticas dos chunks
"""
if not chunks:
return {
"total_chunks": 0,
"total_characters": 0,
"average_length": 0,
"min_length": 0,
"max_length": 0
}
lengths = [len(chunk) for chunk in chunks]
return {
"total_chunks": len(chunks),
"total_characters": sum(lengths),
"average_length": sum(lengths) / len(lengths),
"min_length": min(lengths),
"max_length": max(lengths)
}
def optimize_chunks_for_retrieval(chunks: List[str]) -> List[str]:
"""
Otimiza chunks para melhor performance na recuperação.
Args:
chunks: Lista de chunks originais
Returns:
List[str]: Lista de chunks otimizados
"""
optimized = []
for chunk in chunks:
# Adiciona contexto se necessário
if len(chunk) < 200: # Chunks muito pequenos
# Tenta combinar com o próximo chunk se possível
continue
# Garante que chunks importantes sejam preservados
if contains_important_content(chunk):
optimized.append(chunk)
return optimized if optimized else chunks # Fallback para chunks originais
def contains_important_content(chunk: str) -> bool:
"""
Verifica se um chunk contém conteúdo importante.
Args:
chunk: Chunk para verificar
Returns:
bool: True se contém conteúdo importante
"""
# Palavras-chave que indicam conteúdo importante
important_keywords = [
'definição', 'conceito', 'importante', 'fundamental',
'princípio', 'regra', 'lei', 'teoria', 'método',
'processo', 'procedimento', 'resultado', 'conclusão'
]
chunk_lower = chunk.lower()
# Verifica presença de palavras-chave importantes
for keyword in important_keywords:
if keyword in chunk_lower:
return True
# Verifica se contém listas ou enumerações
if any(marker in chunk for marker in ['1.', '2.', '•', '-', 'a)', 'b)']):
return True
return True # Por padrão, considera importante