|
|
""" |
|
|
Nó de processamento de texto para o AgentPDF. |
|
|
|
|
|
Este nó é responsável por dividir o texto extraído do PDF em chunks |
|
|
menores usando RecursiveCharacterTextSplitter para otimizar a recuperação. |
|
|
""" |
|
|
|
|
|
from typing import Dict, Any, List |
|
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
|
from langchain_core.runnables import RunnableConfig |
|
|
|
|
|
from agents.state import PDFState, ProcessingStatus |
|
|
from utils.config import Config |
|
|
from utils.logger import log_node_execution, main_logger |
|
|
|
|
|
|
|
|
def text_processing_node(state: PDFState, config: RunnableConfig) -> Dict[str, Any]: |
|
|
""" |
|
|
Nó responsável por processar e dividir o texto em chunks. |
|
|
|
|
|
Este nó: |
|
|
1. Recebe o texto extraído do PDF |
|
|
2. Divide o texto em chunks usando RecursiveCharacterTextSplitter |
|
|
3. Otimiza os chunks para melhor recuperação |
|
|
4. Atualiza o estado com os chunks processados |
|
|
|
|
|
Args: |
|
|
state: Estado atual do grafo contendo o texto do PDF |
|
|
config: Configuração do LangGraph |
|
|
|
|
|
Returns: |
|
|
Dict[str, Any]: Atualizações para o estado |
|
|
""" |
|
|
log_node_execution("TEXT_PROCESSOR", "START", "Iniciando processamento de texto") |
|
|
|
|
|
try: |
|
|
|
|
|
pdf_text = state.get("pdf_text") |
|
|
if not pdf_text: |
|
|
error_msg = "Nenhum texto encontrado para processar" |
|
|
log_node_execution("TEXT_PROCESSOR", "ERROR", error_msg) |
|
|
return { |
|
|
"processing_status": ProcessingStatus.ERROR, |
|
|
"error_message": error_msg |
|
|
} |
|
|
|
|
|
log_node_execution( |
|
|
"TEXT_PROCESSOR", |
|
|
"PROCESSING", |
|
|
f"Processando texto de {len(pdf_text)} caracteres" |
|
|
) |
|
|
|
|
|
|
|
|
text_splitter = create_text_splitter() |
|
|
|
|
|
|
|
|
chunks = text_splitter.split_text(pdf_text) |
|
|
|
|
|
if not chunks: |
|
|
error_msg = "Nenhum chunk foi gerado do texto" |
|
|
log_node_execution("TEXT_PROCESSOR", "ERROR", error_msg) |
|
|
return { |
|
|
"processing_status": ProcessingStatus.ERROR, |
|
|
"error_message": error_msg |
|
|
} |
|
|
|
|
|
|
|
|
processed_chunks = process_chunks(chunks) |
|
|
|
|
|
log_node_execution( |
|
|
"TEXT_PROCESSOR", |
|
|
"SUCCESS", |
|
|
f"Texto dividido em {len(processed_chunks)} chunks" |
|
|
) |
|
|
|
|
|
return { |
|
|
"pdf_chunks": processed_chunks, |
|
|
"processing_status": ProcessingStatus.CREATING_EMBEDDINGS, |
|
|
"error_message": None |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Erro ao processar texto: {str(e)}" |
|
|
log_node_execution("TEXT_PROCESSOR", "ERROR", error_msg) |
|
|
main_logger.exception("Erro detalhado no processamento de texto:") |
|
|
|
|
|
return { |
|
|
"processing_status": ProcessingStatus.ERROR, |
|
|
"error_message": error_msg |
|
|
} |
|
|
|
|
|
|
|
|
def create_text_splitter() -> RecursiveCharacterTextSplitter: |
|
|
""" |
|
|
Cria e configura o RecursiveCharacterTextSplitter. |
|
|
|
|
|
Returns: |
|
|
RecursiveCharacterTextSplitter: Splitter configurado |
|
|
""" |
|
|
|
|
|
config = Config.get_text_splitter_config() |
|
|
|
|
|
|
|
|
separators = [ |
|
|
"\n\n", |
|
|
"\n", |
|
|
". ", |
|
|
"! ", |
|
|
"? ", |
|
|
"; ", |
|
|
", ", |
|
|
" ", |
|
|
"" |
|
|
] |
|
|
|
|
|
text_splitter = RecursiveCharacterTextSplitter( |
|
|
chunk_size=config["chunk_size"], |
|
|
chunk_overlap=config["chunk_overlap"], |
|
|
separators=separators, |
|
|
length_function=len, |
|
|
is_separator_regex=False, |
|
|
) |
|
|
|
|
|
main_logger.debug(f"Text splitter configurado: chunk_size={config['chunk_size']}, overlap={config['chunk_overlap']}") |
|
|
|
|
|
return text_splitter |
|
|
|
|
|
|
|
|
def process_chunks(chunks: List[str]) -> List[str]: |
|
|
""" |
|
|
Processa e otimiza os chunks de texto. |
|
|
|
|
|
Args: |
|
|
chunks: Lista de chunks brutos |
|
|
|
|
|
Returns: |
|
|
List[str]: Lista de chunks processados e otimizados |
|
|
""" |
|
|
processed_chunks = [] |
|
|
|
|
|
for i, chunk in enumerate(chunks): |
|
|
|
|
|
cleaned_chunk = clean_chunk(chunk) |
|
|
|
|
|
|
|
|
if is_meaningful_chunk(cleaned_chunk): |
|
|
processed_chunks.append(cleaned_chunk) |
|
|
main_logger.debug(f"Chunk {i+1} processado: {len(cleaned_chunk)} caracteres") |
|
|
else: |
|
|
main_logger.debug(f"Chunk {i+1} descartado por falta de conteúdo significativo") |
|
|
|
|
|
|
|
|
main_logger.info(f"Chunks processados: {len(processed_chunks)} de {len(chunks)} originais") |
|
|
|
|
|
if processed_chunks: |
|
|
avg_length = sum(len(chunk) for chunk in processed_chunks) / len(processed_chunks) |
|
|
main_logger.info(f"Tamanho médio dos chunks: {avg_length:.0f} caracteres") |
|
|
|
|
|
return processed_chunks |
|
|
|
|
|
|
|
|
def clean_chunk(chunk: str) -> str: |
|
|
""" |
|
|
Limpa e normaliza um chunk de texto. |
|
|
|
|
|
Args: |
|
|
chunk: Chunk bruto |
|
|
|
|
|
Returns: |
|
|
str: Chunk limpo |
|
|
""" |
|
|
if not chunk: |
|
|
return "" |
|
|
|
|
|
|
|
|
chunk = chunk.strip() |
|
|
|
|
|
|
|
|
chunk = chunk.replace('\r\n', '\n').replace('\r', '\n') |
|
|
|
|
|
|
|
|
while '\n\n\n' in chunk: |
|
|
chunk = chunk.replace('\n\n\n', '\n\n') |
|
|
|
|
|
|
|
|
lines = [] |
|
|
for line in chunk.split('\n'): |
|
|
cleaned_line = ' '.join(line.split()) |
|
|
if cleaned_line: |
|
|
lines.append(cleaned_line) |
|
|
|
|
|
return '\n'.join(lines) |
|
|
|
|
|
|
|
|
def is_meaningful_chunk(chunk: str) -> bool: |
|
|
""" |
|
|
Verifica se um chunk contém conteúdo significativo. |
|
|
|
|
|
Args: |
|
|
chunk: Chunk para verificar |
|
|
|
|
|
Returns: |
|
|
bool: True se o chunk é significativo |
|
|
""" |
|
|
if not chunk or len(chunk.strip()) < 50: |
|
|
return False |
|
|
|
|
|
|
|
|
words = chunk.split() |
|
|
if len(words) < 10: |
|
|
return False |
|
|
|
|
|
|
|
|
alpha_chars = sum(1 for c in chunk if c.isalpha()) |
|
|
if alpha_chars < len(chunk) * 0.5: |
|
|
return False |
|
|
|
|
|
return True |
|
|
|
|
|
|
|
|
def get_chunk_statistics(chunks: List[str]) -> Dict[str, Any]: |
|
|
""" |
|
|
Calcula estatísticas dos chunks processados. |
|
|
|
|
|
Args: |
|
|
chunks: Lista de chunks |
|
|
|
|
|
Returns: |
|
|
Dict[str, Any]: Estatísticas dos chunks |
|
|
""" |
|
|
if not chunks: |
|
|
return { |
|
|
"total_chunks": 0, |
|
|
"total_characters": 0, |
|
|
"average_length": 0, |
|
|
"min_length": 0, |
|
|
"max_length": 0 |
|
|
} |
|
|
|
|
|
lengths = [len(chunk) for chunk in chunks] |
|
|
|
|
|
return { |
|
|
"total_chunks": len(chunks), |
|
|
"total_characters": sum(lengths), |
|
|
"average_length": sum(lengths) / len(lengths), |
|
|
"min_length": min(lengths), |
|
|
"max_length": max(lengths) |
|
|
} |
|
|
|
|
|
|
|
|
def optimize_chunks_for_retrieval(chunks: List[str]) -> List[str]: |
|
|
""" |
|
|
Otimiza chunks para melhor performance na recuperação. |
|
|
|
|
|
Args: |
|
|
chunks: Lista de chunks originais |
|
|
|
|
|
Returns: |
|
|
List[str]: Lista de chunks otimizados |
|
|
""" |
|
|
optimized = [] |
|
|
|
|
|
for chunk in chunks: |
|
|
|
|
|
if len(chunk) < 200: |
|
|
|
|
|
continue |
|
|
|
|
|
|
|
|
if contains_important_content(chunk): |
|
|
optimized.append(chunk) |
|
|
|
|
|
return optimized if optimized else chunks |
|
|
|
|
|
|
|
|
def contains_important_content(chunk: str) -> bool: |
|
|
""" |
|
|
Verifica se um chunk contém conteúdo importante. |
|
|
|
|
|
Args: |
|
|
chunk: Chunk para verificar |
|
|
|
|
|
Returns: |
|
|
bool: True se contém conteúdo importante |
|
|
""" |
|
|
|
|
|
important_keywords = [ |
|
|
'definição', 'conceito', 'importante', 'fundamental', |
|
|
'princípio', 'regra', 'lei', 'teoria', 'método', |
|
|
'processo', 'procedimento', 'resultado', 'conclusão' |
|
|
] |
|
|
|
|
|
chunk_lower = chunk.lower() |
|
|
|
|
|
|
|
|
for keyword in important_keywords: |
|
|
if keyword in chunk_lower: |
|
|
return True |
|
|
|
|
|
|
|
|
if any(marker in chunk for marker in ['1.', '2.', '•', '-', 'a)', 'b)']): |
|
|
return True |
|
|
|
|
|
return True |
|
|
|