""" Nó de processamento de texto para o AgentPDF. Este nó é responsável por dividir o texto extraído do PDF em chunks menores usando RecursiveCharacterTextSplitter para otimizar a recuperação. """ from typing import Dict, Any, List from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_core.runnables import RunnableConfig from agents.state import PDFState, ProcessingStatus from utils.config import Config from utils.logger import log_node_execution, main_logger def text_processing_node(state: PDFState, config: RunnableConfig) -> Dict[str, Any]: """ Nó responsável por processar e dividir o texto em chunks. Este nó: 1. Recebe o texto extraído do PDF 2. Divide o texto em chunks usando RecursiveCharacterTextSplitter 3. Otimiza os chunks para melhor recuperação 4. Atualiza o estado com os chunks processados Args: state: Estado atual do grafo contendo o texto do PDF config: Configuração do LangGraph Returns: Dict[str, Any]: Atualizações para o estado """ log_node_execution("TEXT_PROCESSOR", "START", "Iniciando processamento de texto") try: # Verifica se há texto para processar pdf_text = state.get("pdf_text") if not pdf_text: error_msg = "Nenhum texto encontrado para processar" log_node_execution("TEXT_PROCESSOR", "ERROR", error_msg) return { "processing_status": ProcessingStatus.ERROR, "error_message": error_msg } log_node_execution( "TEXT_PROCESSOR", "PROCESSING", f"Processando texto de {len(pdf_text)} caracteres" ) # Configura o text splitter text_splitter = create_text_splitter() # Divide o texto em chunks chunks = text_splitter.split_text(pdf_text) if not chunks: error_msg = "Nenhum chunk foi gerado do texto" log_node_execution("TEXT_PROCESSOR", "ERROR", error_msg) return { "processing_status": ProcessingStatus.ERROR, "error_message": error_msg } # Processa e otimiza os chunks processed_chunks = process_chunks(chunks) log_node_execution( "TEXT_PROCESSOR", "SUCCESS", f"Texto dividido em {len(processed_chunks)} chunks" ) return { "pdf_chunks": processed_chunks, "processing_status": ProcessingStatus.CREATING_EMBEDDINGS, "error_message": None } except Exception as e: error_msg = f"Erro ao processar texto: {str(e)}" log_node_execution("TEXT_PROCESSOR", "ERROR", error_msg) main_logger.exception("Erro detalhado no processamento de texto:") return { "processing_status": ProcessingStatus.ERROR, "error_message": error_msg } def create_text_splitter() -> RecursiveCharacterTextSplitter: """ Cria e configura o RecursiveCharacterTextSplitter. Returns: RecursiveCharacterTextSplitter: Splitter configurado """ # Obtém configurações config = Config.get_text_splitter_config() # Separadores hierárquicos para melhor divisão separators = [ "\n\n", # Parágrafos "\n", # Quebras de linha ". ", # Frases "! ", # Exclamações "? ", # Perguntas "; ", # Ponto e vírgula ", ", # Vírgulas " ", # Espaços "" # Caracteres individuais ] text_splitter = RecursiveCharacterTextSplitter( chunk_size=config["chunk_size"], chunk_overlap=config["chunk_overlap"], separators=separators, length_function=len, is_separator_regex=False, ) main_logger.debug(f"Text splitter configurado: chunk_size={config['chunk_size']}, overlap={config['chunk_overlap']}") return text_splitter def process_chunks(chunks: List[str]) -> List[str]: """ Processa e otimiza os chunks de texto. Args: chunks: Lista de chunks brutos Returns: List[str]: Lista de chunks processados e otimizados """ processed_chunks = [] for i, chunk in enumerate(chunks): # Limpa o chunk cleaned_chunk = clean_chunk(chunk) # Só adiciona chunks com conteúdo significativo if is_meaningful_chunk(cleaned_chunk): processed_chunks.append(cleaned_chunk) main_logger.debug(f"Chunk {i+1} processado: {len(cleaned_chunk)} caracteres") else: main_logger.debug(f"Chunk {i+1} descartado por falta de conteúdo significativo") # Log estatísticas main_logger.info(f"Chunks processados: {len(processed_chunks)} de {len(chunks)} originais") if processed_chunks: avg_length = sum(len(chunk) for chunk in processed_chunks) / len(processed_chunks) main_logger.info(f"Tamanho médio dos chunks: {avg_length:.0f} caracteres") return processed_chunks def clean_chunk(chunk: str) -> str: """ Limpa e normaliza um chunk de texto. Args: chunk: Chunk bruto Returns: str: Chunk limpo """ if not chunk: return "" # Remove espaços extras no início e fim chunk = chunk.strip() # Normaliza quebras de linha chunk = chunk.replace('\r\n', '\n').replace('\r', '\n') # Remove quebras de linha excessivas while '\n\n\n' in chunk: chunk = chunk.replace('\n\n\n', '\n\n') # Remove espaços extras entre palavras lines = [] for line in chunk.split('\n'): cleaned_line = ' '.join(line.split()) if cleaned_line: lines.append(cleaned_line) return '\n'.join(lines) def is_meaningful_chunk(chunk: str) -> bool: """ Verifica se um chunk contém conteúdo significativo. Args: chunk: Chunk para verificar Returns: bool: True se o chunk é significativo """ if not chunk or len(chunk.strip()) < 50: # Muito pequeno return False # Conta palavras words = chunk.split() if len(words) < 10: # Muito poucas palavras return False # Verifica se não é só números ou caracteres especiais alpha_chars = sum(1 for c in chunk if c.isalpha()) if alpha_chars < len(chunk) * 0.5: # Menos de 50% são letras return False return True def get_chunk_statistics(chunks: List[str]) -> Dict[str, Any]: """ Calcula estatísticas dos chunks processados. Args: chunks: Lista de chunks Returns: Dict[str, Any]: Estatísticas dos chunks """ if not chunks: return { "total_chunks": 0, "total_characters": 0, "average_length": 0, "min_length": 0, "max_length": 0 } lengths = [len(chunk) for chunk in chunks] return { "total_chunks": len(chunks), "total_characters": sum(lengths), "average_length": sum(lengths) / len(lengths), "min_length": min(lengths), "max_length": max(lengths) } def optimize_chunks_for_retrieval(chunks: List[str]) -> List[str]: """ Otimiza chunks para melhor performance na recuperação. Args: chunks: Lista de chunks originais Returns: List[str]: Lista de chunks otimizados """ optimized = [] for chunk in chunks: # Adiciona contexto se necessário if len(chunk) < 200: # Chunks muito pequenos # Tenta combinar com o próximo chunk se possível continue # Garante que chunks importantes sejam preservados if contains_important_content(chunk): optimized.append(chunk) return optimized if optimized else chunks # Fallback para chunks originais def contains_important_content(chunk: str) -> bool: """ Verifica se um chunk contém conteúdo importante. Args: chunk: Chunk para verificar Returns: bool: True se contém conteúdo importante """ # Palavras-chave que indicam conteúdo importante important_keywords = [ 'definição', 'conceito', 'importante', 'fundamental', 'princípio', 'regra', 'lei', 'teoria', 'método', 'processo', 'procedimento', 'resultado', 'conclusão' ] chunk_lower = chunk.lower() # Verifica presença de palavras-chave importantes for keyword in important_keywords: if keyword in chunk_lower: return True # Verifica se contém listas ou enumerações if any(marker in chunk for marker in ['1.', '2.', '•', '-', 'a)', 'b)']): return True return True # Por padrão, considera importante