""" Nó de criação de embeddings e vector store para o AgentPDF. Este nó é responsável por gerar embeddings dos chunks de texto e criar um vector store FAISS para recuperação eficiente. """ from typing import Dict, Any, List from langchain_openai import OpenAIEmbeddings from langchain_community.vectorstores import FAISS from langchain_core.runnables import RunnableConfig from langchain_core.documents import Document from agents.state import PDFState, ProcessingStatus from utils.config import Config, get_openai_api_key from utils.logger import log_node_execution, main_logger def embeddings_creation_node(state: PDFState, config: RunnableConfig) -> Dict[str, Any]: """ Nó responsável por criar embeddings e vector store. Este nó: 1. Recebe os chunks de texto processados 2. Gera embeddings usando OpenAI 3. Cria um vector store FAISS 4. Atualiza o estado com o vector store Args: state: Estado atual do grafo contendo os chunks config: Configuração do LangGraph Returns: Dict[str, Any]: Atualizações para o estado """ log_node_execution("EMBEDDINGS_CREATOR", "START", "Iniciando criação de embeddings") try: # Verifica se há chunks para processar pdf_chunks = state.get("pdf_chunks") if not pdf_chunks: error_msg = "Nenhum chunk encontrado para criar embeddings" log_node_execution("EMBEDDINGS_CREATOR", "ERROR", error_msg) return { "processing_status": ProcessingStatus.ERROR, "error_message": error_msg } # Verifica se a API key está configurada api_key = get_openai_api_key() if not api_key: error_msg = "Chave da API OpenAI não configurada" log_node_execution("EMBEDDINGS_CREATOR", "ERROR", error_msg) return { "processing_status": ProcessingStatus.ERROR, "error_message": error_msg } log_node_execution( "EMBEDDINGS_CREATOR", "PROCESSING", f"Criando embeddings para {len(pdf_chunks)} chunks" ) # Cria o modelo de embeddings embeddings_model = create_embeddings_model() # Converte chunks em documentos documents = create_documents_from_chunks(pdf_chunks) # Cria o vector store vector_store = create_vector_store(documents, embeddings_model) log_node_execution( "EMBEDDINGS_CREATOR", "SUCCESS", f"Vector store criado com {len(documents)} documentos" ) return { "vector_store": vector_store, "embeddings_created": True, "processing_status": ProcessingStatus.IDLE, # Pronto para perguntas "error_message": None } except Exception as e: error_msg = f"Erro ao criar embeddings: {str(e)}" log_node_execution("EMBEDDINGS_CREATOR", "ERROR", error_msg) main_logger.exception("Erro detalhado na criação de embeddings:") return { "processing_status": ProcessingStatus.ERROR, "error_message": error_msg, "embeddings_created": False } def create_embeddings_model() -> OpenAIEmbeddings: """ Cria e configura o modelo de embeddings OpenAI. Returns: OpenAIEmbeddings: Modelo de embeddings configurado """ try: embeddings = OpenAIEmbeddings( openai_api_key=get_openai_api_key(), model="text-embedding-3-small", # Modelo mais eficiente chunk_size=1000, # Tamanho do chunk para embeddings max_retries=3, timeout=30 ) main_logger.debug("Modelo de embeddings OpenAI criado com sucesso") return embeddings except Exception as e: main_logger.error(f"Erro ao criar modelo de embeddings: {e}") raise def create_documents_from_chunks(chunks: List[str]) -> List[Document]: """ Converte chunks de texto em objetos Document do LangChain. Args: chunks: Lista de chunks de texto Returns: List[Document]: Lista de documentos LangChain """ documents = [] for i, chunk in enumerate(chunks): # Cria metadados para cada documento metadata = { "chunk_id": i, "chunk_size": len(chunk), "source": "pdf_upload", "chunk_index": i } # Cria o documento doc = Document( page_content=chunk, metadata=metadata ) documents.append(doc) main_logger.debug(f"Criados {len(documents)} documentos a partir dos chunks") return documents def create_vector_store(documents: List[Document], embeddings_model: OpenAIEmbeddings) -> FAISS: """ Cria um vector store FAISS a partir dos documentos. Args: documents: Lista de documentos embeddings_model: Modelo de embeddings Returns: FAISS: Vector store criado """ try: main_logger.info("Criando vector store FAISS...") # Cria o vector store vector_store = FAISS.from_documents( documents=documents, embedding=embeddings_model ) main_logger.info(f"Vector store FAISS criado com {len(documents)} documentos") # Log estatísticas log_vector_store_stats(vector_store, documents) return vector_store except Exception as e: main_logger.error(f"Erro ao criar vector store FAISS: {e}") raise def log_vector_store_stats(vector_store: FAISS, documents: List[Document]): """ Registra estatísticas do vector store criado. Args: vector_store: Vector store FAISS documents: Lista de documentos """ try: # Estatísticas básicas total_docs = len(documents) total_chars = sum(len(doc.page_content) for doc in documents) avg_doc_size = total_chars / total_docs if total_docs > 0 else 0 main_logger.info(f"📊 Estatísticas do Vector Store:") main_logger.info(f" • Total de documentos: {total_docs}") main_logger.info(f" • Total de caracteres: {total_chars:,}") main_logger.info(f" • Tamanho médio por documento: {avg_doc_size:.0f} caracteres") # Testa uma busca simples para verificar funcionamento test_results = vector_store.similarity_search("teste", k=1) main_logger.debug(f"Teste de busca retornou {len(test_results)} resultado(s)") except Exception as e: main_logger.warning(f"Erro ao calcular estatísticas do vector store: {e}") def test_vector_store(vector_store: FAISS, test_query: str = "informação") -> bool: """ Testa o funcionamento do vector store. Args: vector_store: Vector store para testar test_query: Query de teste Returns: bool: True se o teste passou """ try: # Testa busca por similaridade results = vector_store.similarity_search(test_query, k=3) if not results: main_logger.warning("Vector store não retornou resultados para query de teste") return False # Testa busca com score results_with_score = vector_store.similarity_search_with_score(test_query, k=3) if not results_with_score: main_logger.warning("Vector store não retornou scores para query de teste") return False main_logger.debug(f"Teste do vector store passou: {len(results)} resultados encontrados") return True except Exception as e: main_logger.error(f"Erro no teste do vector store: {e}") return False def optimize_vector_store(vector_store: FAISS) -> FAISS: """ Otimiza o vector store para melhor performance. Args: vector_store: Vector store original Returns: FAISS: Vector store otimizado """ try: # Para FAISS, podemos otimizar o índice # Isso é especialmente útil para grandes volumes de dados main_logger.debug("Otimizando vector store FAISS...") # O FAISS já é otimizado por padrão para volumes pequenos/médios # Para volumes maiores, poderíamos usar índices mais sofisticados return vector_store except Exception as e: main_logger.warning(f"Erro na otimização do vector store: {e}") return vector_store # Retorna o original se a otimização falhar def get_vector_store_info(vector_store: FAISS) -> Dict[str, Any]: """ Obtém informações sobre o vector store. Args: vector_store: Vector store FAISS Returns: Dict[str, Any]: Informações do vector store """ try: # Informações básicas do FAISS index = vector_store.index return { "total_vectors": index.ntotal, "vector_dimension": index.d, "index_type": type(index).__name__, "is_trained": index.is_trained if hasattr(index, 'is_trained') else True } except Exception as e: main_logger.warning(f"Erro ao obter informações do vector store: {e}") return { "total_vectors": 0, "vector_dimension": 0, "index_type": "unknown", "is_trained": False }